Add dispatch worker that: 1. Validates remote hermes binary paths before dispatching 2. Only marks dispatch as success when remote hermes command actually launches 3. Keeps failed dispatches in queue or marks them as failed 4. Provides configurable agent paths and validation Resolves #350
552 lines
19 KiB
Python
552 lines
19 KiB
Python
"""
|
|
VPS Agent Dispatch Worker for Hermes Cron System
|
|
|
|
This module provides a dispatch worker that SSHs into remote VPS machines
|
|
and runs hermes commands. It ensures that:
|
|
|
|
1. Remote dispatch only counts as success when the remote hermes command actually launches
|
|
2. Stale per-agent hermes binary paths are configurable/validated before queue drain
|
|
3. Failed remote launches remain in the queue (or are marked failed) instead of being reported as OK
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Optional, Dict, Any, List
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DispatchStatus(Enum):
|
|
"""Status of a dispatch operation."""
|
|
PENDING = "pending"
|
|
VALIDATING = "validating"
|
|
DISPATCHING = "dispatching"
|
|
SUCCESS = "success"
|
|
FAILED = "failed"
|
|
RETRYING = "retrying"
|
|
|
|
|
|
@dataclass
|
|
class DispatchResult:
|
|
"""Result of a dispatch operation."""
|
|
status: DispatchStatus
|
|
message: str
|
|
exit_code: Optional[int] = None
|
|
stdout: Optional[str] = None
|
|
stderr: Optional[str] = None
|
|
execution_time: Optional[float] = None
|
|
hermes_path: Optional[str] = None
|
|
validated: bool = False
|
|
|
|
|
|
class HermesPathValidator:
|
|
"""Validates hermes binary paths on remote VPS machines."""
|
|
|
|
def __init__(self, ssh_key_path: Optional[str] = None):
|
|
self.ssh_key_path = ssh_key_path or os.path.expanduser("~/.ssh/id_rsa")
|
|
self.timeout = 30 # SSH timeout in seconds
|
|
|
|
def validate_hermes_path(self, host: str, hermes_path: str,
|
|
username: str = "root") -> DispatchResult:
|
|
"""
|
|
Validate that the hermes binary exists and is executable on the remote host.
|
|
|
|
Args:
|
|
host: Remote host IP or hostname
|
|
hermes_path: Path to hermes binary on remote host
|
|
username: SSH username
|
|
|
|
Returns:
|
|
DispatchResult with validation status
|
|
"""
|
|
start_time = time.time()
|
|
|
|
# Build SSH command to check hermes binary
|
|
ssh_cmd = [
|
|
"ssh",
|
|
"-i", self.ssh_key_path,
|
|
"-o", "StrictHostKeyChecking=no",
|
|
"-o", "ConnectTimeout=10",
|
|
"-o", "BatchMode=yes",
|
|
f"{username}@{host}",
|
|
f"test -x {hermes_path} && echo 'VALID' || echo 'INVALID'"
|
|
]
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
ssh_cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=self.timeout
|
|
)
|
|
|
|
execution_time = time.time() - start_time
|
|
|
|
if result.returncode == 0 and "VALID" in result.stdout:
|
|
return DispatchResult(
|
|
status=DispatchStatus.SUCCESS,
|
|
message=f"Hermes binary validated at {hermes_path}",
|
|
exit_code=0,
|
|
execution_time=execution_time,
|
|
hermes_path=hermes_path,
|
|
validated=True
|
|
)
|
|
else:
|
|
return DispatchResult(
|
|
status=DispatchStatus.FAILED,
|
|
message=f"Hermes binary not found or not executable: {hermes_path}",
|
|
exit_code=result.returncode,
|
|
stdout=result.stdout,
|
|
stderr=result.stderr,
|
|
execution_time=execution_time,
|
|
hermes_path=hermes_path,
|
|
validated=False
|
|
)
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return DispatchResult(
|
|
status=DispatchStatus.FAILED,
|
|
message=f"SSH timeout validating hermes path on {host}",
|
|
execution_time=time.time() - start_time,
|
|
hermes_path=hermes_path,
|
|
validated=False
|
|
)
|
|
except Exception as e:
|
|
return DispatchResult(
|
|
status=DispatchStatus.FAILED,
|
|
message=f"Error validating hermes path: {str(e)}",
|
|
execution_time=time.time() - start_time,
|
|
hermes_path=hermes_path,
|
|
validated=False
|
|
)
|
|
|
|
|
|
class VPSAgentDispatcher:
|
|
"""Dispatches hermes commands to remote VPS agents."""
|
|
|
|
def __init__(self, config_path: Optional[str] = None):
|
|
self.config_path = config_path or os.path.expanduser("~/.hermes/dispatch_config.json")
|
|
self.validator = HermesPathValidator()
|
|
self.config = self._load_config()
|
|
|
|
def _load_config(self) -> Dict[str, Any]:
|
|
"""Load dispatch configuration."""
|
|
try:
|
|
if os.path.exists(self.config_path):
|
|
with open(self.config_path, 'r') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to load dispatch config: {e}")
|
|
|
|
# Default configuration
|
|
return {
|
|
"agents": {
|
|
"ezra": {
|
|
"host": "143.198.27.163",
|
|
"hermes_path": "/root/wizards/ezra/hermes-agent/venv/bin/hermes",
|
|
"username": "root"
|
|
},
|
|
"timmy": {
|
|
"host": "timmy",
|
|
"hermes_path": "/root/wizards/timmy/hermes-agent/venv/bin/hermes",
|
|
"username": "root"
|
|
}
|
|
},
|
|
"validation_timeout": 30,
|
|
"command_timeout": 300,
|
|
"max_retries": 2,
|
|
"retry_delay": 5
|
|
}
|
|
|
|
def save_config(self):
|
|
"""Save dispatch configuration."""
|
|
try:
|
|
config_dir = Path(self.config_path).parent
|
|
config_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(self.config_path, 'w') as f:
|
|
json.dump(self.config, f, indent=2)
|
|
|
|
# Set secure permissions
|
|
os.chmod(self.config_path, 0o600)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to save dispatch config: {e}")
|
|
|
|
def get_agent_config(self, agent_name: str) -> Optional[Dict[str, Any]]:
|
|
"""Get configuration for a specific agent."""
|
|
return self.config.get("agents", {}).get(agent_name)
|
|
|
|
def update_agent_config(self, agent_name: str, host: str, hermes_path: str,
|
|
username: str = "root"):
|
|
"""Update configuration for a specific agent."""
|
|
if "agents" not in self.config:
|
|
self.config["agents"] = {}
|
|
|
|
self.config["agents"][agent_name] = {
|
|
"host": host,
|
|
"hermes_path": hermes_path,
|
|
"username": username
|
|
}
|
|
|
|
self.save_config()
|
|
|
|
def validate_agent(self, agent_name: str) -> DispatchResult:
|
|
"""Validate that an agent's hermes binary is accessible."""
|
|
agent_config = self.get_agent_config(agent_name)
|
|
if not agent_config:
|
|
return DispatchResult(
|
|
status=DispatchStatus.FAILED,
|
|
message=f"Agent configuration not found: {agent_name}"
|
|
)
|
|
|
|
return self.validator.validate_hermes_path(
|
|
host=agent_config["host"],
|
|
hermes_path=agent_config["hermes_path"],
|
|
username=agent_config.get("username", "root")
|
|
)
|
|
|
|
def dispatch_command(self, agent_name: str, command: str,
|
|
validate_first: bool = True) -> DispatchResult:
|
|
"""
|
|
Dispatch a command to a remote VPS agent.
|
|
|
|
Args:
|
|
agent_name: Name of the agent to dispatch to
|
|
command: Command to execute
|
|
validate_first: Whether to validate hermes path before dispatching
|
|
|
|
Returns:
|
|
DispatchResult with execution status
|
|
"""
|
|
agent_config = self.get_agent_config(agent_name)
|
|
if not agent_config:
|
|
return DispatchResult(
|
|
status=DispatchStatus.FAILED,
|
|
message=f"Agent configuration not found: {agent_name}"
|
|
)
|
|
|
|
# Validate hermes path if requested
|
|
if validate_first:
|
|
validation_result = self.validate_agent(agent_name)
|
|
if validation_result.status != DispatchStatus.SUCCESS:
|
|
return DispatchResult(
|
|
status=DispatchStatus.FAILED,
|
|
message=f"Validation failed: {validation_result.message}",
|
|
hermes_path=agent_config["hermes_path"],
|
|
validated=False
|
|
)
|
|
|
|
# Build SSH command to execute hermes command
|
|
ssh_cmd = [
|
|
"ssh",
|
|
"-i", self.validator.ssh_key_path,
|
|
"-o", "StrictHostKeyChecking=no",
|
|
"-o", "ConnectTimeout=10",
|
|
f"{agent_config.get('username', 'root')}@{agent_config['host']}",
|
|
f"cd /root/wizards/{agent_name}/hermes-agent && source venv/bin/activate && {command}"
|
|
]
|
|
|
|
start_time = time.time()
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
ssh_cmd,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=self.config.get("command_timeout", 300)
|
|
)
|
|
|
|
execution_time = time.time() - start_time
|
|
|
|
if result.returncode == 0:
|
|
return DispatchResult(
|
|
status=DispatchStatus.SUCCESS,
|
|
message=f"Command executed successfully on {agent_name}",
|
|
exit_code=0,
|
|
stdout=result.stdout,
|
|
stderr=result.stderr,
|
|
execution_time=execution_time,
|
|
hermes_path=agent_config["hermes_path"],
|
|
validated=validate_first
|
|
)
|
|
else:
|
|
return DispatchResult(
|
|
status=DispatchStatus.FAILED,
|
|
message=f"Command failed on {agent_name}: {result.stderr}",
|
|
exit_code=result.returncode,
|
|
stdout=result.stdout,
|
|
stderr=result.stderr,
|
|
execution_time=execution_time,
|
|
hermes_path=agent_config["hermes_path"],
|
|
validated=validate_first
|
|
)
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return DispatchResult(
|
|
status=DispatchStatus.FAILED,
|
|
message=f"Command timeout on {agent_name}",
|
|
execution_time=time.time() - start_time,
|
|
hermes_path=agent_config["hermes_path"],
|
|
validated=validate_first
|
|
)
|
|
except Exception as e:
|
|
return DispatchResult(
|
|
status=DispatchStatus.FAILED,
|
|
message=f"Error executing command on {agent_name}: {str(e)}",
|
|
execution_time=time.time() - start_time,
|
|
hermes_path=agent_config["hermes_path"],
|
|
validated=validate_first
|
|
)
|
|
|
|
def dispatch_hermes_command(self, agent_name: str, hermes_command: str,
|
|
validate_first: bool = True) -> DispatchResult:
|
|
"""
|
|
Dispatch a hermes command to a remote VPS agent.
|
|
|
|
Args:
|
|
agent_name: Name of the agent to dispatch to
|
|
hermes_command: Hermes command to execute (e.g., "hermes cron list")
|
|
validate_first: Whether to validate hermes path before dispatching
|
|
|
|
Returns:
|
|
DispatchResult with execution status
|
|
"""
|
|
agent_config = self.get_agent_config(agent_name)
|
|
if not agent_config:
|
|
return DispatchResult(
|
|
status=DispatchStatus.FAILED,
|
|
message=f"Agent configuration not found: {agent_name}"
|
|
)
|
|
|
|
# Build full hermes command
|
|
full_command = f"{agent_config['hermes_path']} {hermes_command}"
|
|
|
|
return self.dispatch_command(agent_name, full_command, validate_first)
|
|
|
|
|
|
class DispatchQueue:
|
|
"""Queue for managing dispatch operations."""
|
|
|
|
def __init__(self, queue_file: Optional[str] = None):
|
|
self.queue_file = queue_file or os.path.expanduser("~/.hermes/dispatch_queue.json")
|
|
self.queue: List[Dict[str, Any]] = self._load_queue()
|
|
|
|
def _load_queue(self) -> List[Dict[str, Any]]:
|
|
"""Load queue from file."""
|
|
try:
|
|
if os.path.exists(self.queue_file):
|
|
with open(self.queue_file, 'r') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to load dispatch queue: {e}")
|
|
|
|
return []
|
|
|
|
def save_queue(self):
|
|
"""Save queue to file."""
|
|
try:
|
|
queue_dir = Path(self.queue_file).parent
|
|
queue_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(self.queue_file, 'w') as f:
|
|
json.dump(self.queue, f, indent=2)
|
|
|
|
# Set secure permissions
|
|
os.chmod(self.queue_file, 0o600)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to save dispatch queue: {e}")
|
|
|
|
def add_item(self, agent_name: str, command: str, priority: int = 0,
|
|
max_retries: int = 3) -> str:
|
|
"""
|
|
Add an item to the dispatch queue.
|
|
|
|
Returns:
|
|
Queue item ID
|
|
"""
|
|
item_id = f"dispatch_{int(time.time())}_{len(self.queue)}"
|
|
|
|
item = {
|
|
"id": item_id,
|
|
"agent_name": agent_name,
|
|
"command": command,
|
|
"priority": priority,
|
|
"max_retries": max_retries,
|
|
"retry_count": 0,
|
|
"status": DispatchStatus.PENDING.value,
|
|
"created_at": time.time(),
|
|
"last_attempt": None,
|
|
"result": None
|
|
}
|
|
|
|
self.queue.append(item)
|
|
self.save_queue()
|
|
|
|
return item_id
|
|
|
|
def get_next_item(self) -> Optional[Dict[str, Any]]:
|
|
"""Get the next item from the queue (highest priority, oldest first)."""
|
|
if not self.queue:
|
|
return None
|
|
|
|
# Sort by priority (descending) and created_at (ascending)
|
|
sorted_queue = sorted(
|
|
self.queue,
|
|
key=lambda x: (-x.get("priority", 0), x.get("created_at", 0))
|
|
)
|
|
|
|
# Find first pending item
|
|
for item in sorted_queue:
|
|
if item.get("status") == DispatchStatus.PENDING.value:
|
|
return item
|
|
|
|
return None
|
|
|
|
def update_item(self, item_id: str, status: DispatchStatus,
|
|
result: Optional[DispatchResult] = None):
|
|
"""Update a queue item."""
|
|
for item in self.queue:
|
|
if item.get("id") == item_id:
|
|
item["status"] = status.value
|
|
item["last_attempt"] = time.time()
|
|
|
|
if result:
|
|
item["result"] = {
|
|
"status": result.status.value,
|
|
"message": result.message,
|
|
"exit_code": result.exit_code,
|
|
"stdout": result.stdout,
|
|
"stderr": result.stderr,
|
|
"execution_time": result.execution_time,
|
|
"hermes_path": result.hermes_path,
|
|
"validated": result.validated
|
|
}
|
|
|
|
# Update retry count if failed
|
|
if status == DispatchStatus.FAILED:
|
|
item["retry_count"] = item.get("retry_count", 0) + 1
|
|
|
|
self.save_queue()
|
|
break
|
|
|
|
def remove_item(self, item_id: str):
|
|
"""Remove an item from the queue."""
|
|
self.queue = [item for item in self.queue if item.get("id") != item_id]
|
|
self.save_queue()
|
|
|
|
def get_failed_items(self) -> List[Dict[str, Any]]:
|
|
"""Get all failed items that can be retried."""
|
|
return [
|
|
item for item in self.queue
|
|
if item.get("status") == DispatchStatus.FAILED.value
|
|
and item.get("retry_count", 0) < item.get("max_retries", 3)
|
|
]
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
"""Get queue statistics."""
|
|
total = len(self.queue)
|
|
pending = sum(1 for item in self.queue if item.get("status") == DispatchStatus.PENDING.value)
|
|
success = sum(1 for item in self.queue if item.get("status") == DispatchStatus.SUCCESS.value)
|
|
failed = sum(1 for item in self.queue if item.get("status") == DispatchStatus.FAILED.value)
|
|
|
|
return {
|
|
"total": total,
|
|
"pending": pending,
|
|
"success": success,
|
|
"failed": failed,
|
|
"retryable": len(self.get_failed_items())
|
|
}
|
|
|
|
|
|
def process_dispatch_queue(dispatcher: VPSAgentDispatcher,
|
|
queue: DispatchQueue,
|
|
batch_size: int = 5) -> Dict[str, Any]:
|
|
"""
|
|
Process items from the dispatch queue.
|
|
|
|
Args:
|
|
dispatcher: VPS agent dispatcher
|
|
queue: Dispatch queue
|
|
batch_size: Number of items to process in this batch
|
|
|
|
Returns:
|
|
Processing statistics
|
|
"""
|
|
processed = 0
|
|
success = 0
|
|
failed = 0
|
|
|
|
for _ in range(batch_size):
|
|
item = queue.get_next_item()
|
|
if not item:
|
|
break
|
|
|
|
item_id = item["id"]
|
|
agent_name = item["agent_name"]
|
|
command = item["command"]
|
|
|
|
# Update status to dispatching
|
|
queue.update_item(item_id, DispatchStatus.DISPATCHING)
|
|
|
|
# Dispatch the command
|
|
result = dispatcher.dispatch_hermes_command(
|
|
agent_name=agent_name,
|
|
hermes_command=command,
|
|
validate_first=True
|
|
)
|
|
|
|
# Update queue with result
|
|
if result.status == DispatchStatus.SUCCESS:
|
|
queue.update_item(item_id, DispatchStatus.SUCCESS, result)
|
|
success += 1
|
|
else:
|
|
# Check if we should retry
|
|
item_data = next((i for i in queue.queue if i.get("id") == item_id), None)
|
|
if item_data and item_data.get("retry_count", 0) < item_data.get("max_retries", 3):
|
|
queue.update_item(item_id, DispatchStatus.FAILED, result)
|
|
failed += 1
|
|
else:
|
|
# Max retries reached, remove from queue
|
|
queue.remove_item(item_id)
|
|
failed += 1
|
|
|
|
processed += 1
|
|
|
|
return {
|
|
"processed": processed,
|
|
"success": success,
|
|
"failed": failed,
|
|
"queue_stats": queue.get_stats()
|
|
}
|
|
|
|
|
|
# Example usage and testing
|
|
if __name__ == "__main__":
|
|
# Set up logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
# Create dispatcher and queue
|
|
dispatcher = VPSAgentDispatcher()
|
|
queue = DispatchQueue()
|
|
|
|
# Example: Add items to queue
|
|
queue.add_item("ezra", "cron list")
|
|
queue.add_item("timmy", "cron status")
|
|
|
|
# Process queue
|
|
stats = process_dispatch_queue(dispatcher, queue)
|
|
print(f"Processing stats: {stats}")
|
|
|
|
# Show queue stats
|
|
queue_stats = queue.get_stats()
|
|
print(f"Queue stats: {queue_stats}")
|