Compare commits

..

4 Commits

Author SHA1 Message Date
8009e06d9f test(ssh): Add tests for remote hermes path validation
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m1s
Add comprehensive tests for:
1. validate_remote_hermes_path() - found, not found, error cases
2. _get_default_hermes_path() - path discovery logic
3. execute_hermes_command() - success, validation failure, timeout cases

Resolves #350
2026-04-14 00:26:03 +00:00
5ca7b9c9eb fix(ssh): Add remote hermes path validation and execution
Add validation functions to SSHEnvironment:
1. validate_remote_hermes_path(): Check if hermes binary exists and is executable
2. _get_default_hermes_path(): Find hermes binary using common paths
3. execute_hermes_command(): Execute hermes commands with proper validation

Ensures dispatch only marks success when remote hermes command actually launches.
Resolves #350
2026-04-14 00:25:40 +00:00
5180c172fa Merge pull request 'feat: profile-tagged session isolation (#323)' (#422) from burn/323-1776120221 into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 43s
feat: profile-tagged session isolation (#323)

Closes #323.
2026-04-14 00:16:43 +00:00
Metatron
b62fa0ec13 feat: profile-tagged session isolation (closes #323)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 57s
Add profile column to sessions table for data-level profile isolation.
All session queries now accept an optional profile filter.

Changes:
- Schema v7: new 'profile' TEXT column + idx_sessions_profile index
- Migration v7: ALTER TABLE + CREATE INDEX on existing DBs
- create_session(): new profile parameter
- ensure_session(): new profile parameter
- list_sessions_rich(): profile filter (WHERE s.profile = ?)
- search_sessions(): profile filter
- session_count(): profile filter

Sessions without a profile (None) remain visible to all queries for
backward compatibility. When a profile is passed, only that profile's
sessions are returned.

Profile agents can no longer see each other's sessions when filtered.
No breaking changes to existing callers.
2026-04-13 18:53:45 -04:00
6 changed files with 356 additions and 684 deletions

View File

@@ -1,18 +0,0 @@
{
"agents": {
"ezra": {
"host": "143.198.27.163",
"hermes_path": "/root/wizards/ezra/hermes-agent/venv/bin/hermes",
"username": "root"
},
"timmy": {
"host": "timmy",
"hermes_path": "/root/wizards/timmy/hermes-agent/venv/bin/hermes",
"username": "root"
}
},
"validation_timeout": 30,
"command_timeout": 300,
"max_retries": 2,
"retry_delay": 5
}

View File

@@ -1,551 +0,0 @@
"""
VPS Agent Dispatch Worker for Hermes Cron System
This module provides a dispatch worker that SSHs into remote VPS machines
and runs hermes commands. It ensures that:
1. Remote dispatch only counts as success when the remote hermes command actually launches
2. Stale per-agent hermes binary paths are configurable/validated before queue drain
3. Failed remote launches remain in the queue (or are marked failed) instead of being reported as OK
"""
import json
import logging
import os
import subprocess
import sys
import time
from pathlib import Path
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
from enum import Enum
logger = logging.getLogger(__name__)
class DispatchStatus(Enum):
"""Status of a dispatch operation."""
PENDING = "pending"
VALIDATING = "validating"
DISPATCHING = "dispatching"
SUCCESS = "success"
FAILED = "failed"
RETRYING = "retrying"
@dataclass
class DispatchResult:
"""Result of a dispatch operation."""
status: DispatchStatus
message: str
exit_code: Optional[int] = None
stdout: Optional[str] = None
stderr: Optional[str] = None
execution_time: Optional[float] = None
hermes_path: Optional[str] = None
validated: bool = False
class HermesPathValidator:
"""Validates hermes binary paths on remote VPS machines."""
def __init__(self, ssh_key_path: Optional[str] = None):
self.ssh_key_path = ssh_key_path or os.path.expanduser("~/.ssh/id_rsa")
self.timeout = 30 # SSH timeout in seconds
def validate_hermes_path(self, host: str, hermes_path: str,
username: str = "root") -> DispatchResult:
"""
Validate that the hermes binary exists and is executable on the remote host.
Args:
host: Remote host IP or hostname
hermes_path: Path to hermes binary on remote host
username: SSH username
Returns:
DispatchResult with validation status
"""
start_time = time.time()
# Build SSH command to check hermes binary
ssh_cmd = [
"ssh",
"-i", self.ssh_key_path,
"-o", "StrictHostKeyChecking=no",
"-o", "ConnectTimeout=10",
"-o", "BatchMode=yes",
f"{username}@{host}",
f"test -x {hermes_path} && echo 'VALID' || echo 'INVALID'"
]
try:
result = subprocess.run(
ssh_cmd,
capture_output=True,
text=True,
timeout=self.timeout
)
execution_time = time.time() - start_time
if result.returncode == 0 and "VALID" in result.stdout:
return DispatchResult(
status=DispatchStatus.SUCCESS,
message=f"Hermes binary validated at {hermes_path}",
exit_code=0,
execution_time=execution_time,
hermes_path=hermes_path,
validated=True
)
else:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Hermes binary not found or not executable: {hermes_path}",
exit_code=result.returncode,
stdout=result.stdout,
stderr=result.stderr,
execution_time=execution_time,
hermes_path=hermes_path,
validated=False
)
except subprocess.TimeoutExpired:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"SSH timeout validating hermes path on {host}",
execution_time=time.time() - start_time,
hermes_path=hermes_path,
validated=False
)
except Exception as e:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Error validating hermes path: {str(e)}",
execution_time=time.time() - start_time,
hermes_path=hermes_path,
validated=False
)
class VPSAgentDispatcher:
"""Dispatches hermes commands to remote VPS agents."""
def __init__(self, config_path: Optional[str] = None):
self.config_path = config_path or os.path.expanduser("~/.hermes/dispatch_config.json")
self.validator = HermesPathValidator()
self.config = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""Load dispatch configuration."""
try:
if os.path.exists(self.config_path):
with open(self.config_path, 'r') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load dispatch config: {e}")
# Default configuration
return {
"agents": {
"ezra": {
"host": "143.198.27.163",
"hermes_path": "/root/wizards/ezra/hermes-agent/venv/bin/hermes",
"username": "root"
},
"timmy": {
"host": "timmy",
"hermes_path": "/root/wizards/timmy/hermes-agent/venv/bin/hermes",
"username": "root"
}
},
"validation_timeout": 30,
"command_timeout": 300,
"max_retries": 2,
"retry_delay": 5
}
def save_config(self):
"""Save dispatch configuration."""
try:
config_dir = Path(self.config_path).parent
config_dir.mkdir(parents=True, exist_ok=True)
with open(self.config_path, 'w') as f:
json.dump(self.config, f, indent=2)
# Set secure permissions
os.chmod(self.config_path, 0o600)
except Exception as e:
logger.error(f"Failed to save dispatch config: {e}")
def get_agent_config(self, agent_name: str) -> Optional[Dict[str, Any]]:
"""Get configuration for a specific agent."""
return self.config.get("agents", {}).get(agent_name)
def update_agent_config(self, agent_name: str, host: str, hermes_path: str,
username: str = "root"):
"""Update configuration for a specific agent."""
if "agents" not in self.config:
self.config["agents"] = {}
self.config["agents"][agent_name] = {
"host": host,
"hermes_path": hermes_path,
"username": username
}
self.save_config()
def validate_agent(self, agent_name: str) -> DispatchResult:
"""Validate that an agent's hermes binary is accessible."""
agent_config = self.get_agent_config(agent_name)
if not agent_config:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Agent configuration not found: {agent_name}"
)
return self.validator.validate_hermes_path(
host=agent_config["host"],
hermes_path=agent_config["hermes_path"],
username=agent_config.get("username", "root")
)
def dispatch_command(self, agent_name: str, command: str,
validate_first: bool = True) -> DispatchResult:
"""
Dispatch a command to a remote VPS agent.
Args:
agent_name: Name of the agent to dispatch to
command: Command to execute
validate_first: Whether to validate hermes path before dispatching
Returns:
DispatchResult with execution status
"""
agent_config = self.get_agent_config(agent_name)
if not agent_config:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Agent configuration not found: {agent_name}"
)
# Validate hermes path if requested
if validate_first:
validation_result = self.validate_agent(agent_name)
if validation_result.status != DispatchStatus.SUCCESS:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Validation failed: {validation_result.message}",
hermes_path=agent_config["hermes_path"],
validated=False
)
# Build SSH command to execute hermes command
ssh_cmd = [
"ssh",
"-i", self.validator.ssh_key_path,
"-o", "StrictHostKeyChecking=no",
"-o", "ConnectTimeout=10",
f"{agent_config.get('username', 'root')}@{agent_config['host']}",
f"cd /root/wizards/{agent_name}/hermes-agent && source venv/bin/activate && {command}"
]
start_time = time.time()
try:
result = subprocess.run(
ssh_cmd,
capture_output=True,
text=True,
timeout=self.config.get("command_timeout", 300)
)
execution_time = time.time() - start_time
if result.returncode == 0:
return DispatchResult(
status=DispatchStatus.SUCCESS,
message=f"Command executed successfully on {agent_name}",
exit_code=0,
stdout=result.stdout,
stderr=result.stderr,
execution_time=execution_time,
hermes_path=agent_config["hermes_path"],
validated=validate_first
)
else:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Command failed on {agent_name}: {result.stderr}",
exit_code=result.returncode,
stdout=result.stdout,
stderr=result.stderr,
execution_time=execution_time,
hermes_path=agent_config["hermes_path"],
validated=validate_first
)
except subprocess.TimeoutExpired:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Command timeout on {agent_name}",
execution_time=time.time() - start_time,
hermes_path=agent_config["hermes_path"],
validated=validate_first
)
except Exception as e:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Error executing command on {agent_name}: {str(e)}",
execution_time=time.time() - start_time,
hermes_path=agent_config["hermes_path"],
validated=validate_first
)
def dispatch_hermes_command(self, agent_name: str, hermes_command: str,
validate_first: bool = True) -> DispatchResult:
"""
Dispatch a hermes command to a remote VPS agent.
Args:
agent_name: Name of the agent to dispatch to
hermes_command: Hermes command to execute (e.g., "hermes cron list")
validate_first: Whether to validate hermes path before dispatching
Returns:
DispatchResult with execution status
"""
agent_config = self.get_agent_config(agent_name)
if not agent_config:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Agent configuration not found: {agent_name}"
)
# Build full hermes command
full_command = f"{agent_config['hermes_path']} {hermes_command}"
return self.dispatch_command(agent_name, full_command, validate_first)
class DispatchQueue:
"""Queue for managing dispatch operations."""
def __init__(self, queue_file: Optional[str] = None):
self.queue_file = queue_file or os.path.expanduser("~/.hermes/dispatch_queue.json")
self.queue: List[Dict[str, Any]] = self._load_queue()
def _load_queue(self) -> List[Dict[str, Any]]:
"""Load queue from file."""
try:
if os.path.exists(self.queue_file):
with open(self.queue_file, 'r') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load dispatch queue: {e}")
return []
def save_queue(self):
"""Save queue to file."""
try:
queue_dir = Path(self.queue_file).parent
queue_dir.mkdir(parents=True, exist_ok=True)
with open(self.queue_file, 'w') as f:
json.dump(self.queue, f, indent=2)
# Set secure permissions
os.chmod(self.queue_file, 0o600)
except Exception as e:
logger.error(f"Failed to save dispatch queue: {e}")
def add_item(self, agent_name: str, command: str, priority: int = 0,
max_retries: int = 3) -> str:
"""
Add an item to the dispatch queue.
Returns:
Queue item ID
"""
item_id = f"dispatch_{int(time.time())}_{len(self.queue)}"
item = {
"id": item_id,
"agent_name": agent_name,
"command": command,
"priority": priority,
"max_retries": max_retries,
"retry_count": 0,
"status": DispatchStatus.PENDING.value,
"created_at": time.time(),
"last_attempt": None,
"result": None
}
self.queue.append(item)
self.save_queue()
return item_id
def get_next_item(self) -> Optional[Dict[str, Any]]:
"""Get the next item from the queue (highest priority, oldest first)."""
if not self.queue:
return None
# Sort by priority (descending) and created_at (ascending)
sorted_queue = sorted(
self.queue,
key=lambda x: (-x.get("priority", 0), x.get("created_at", 0))
)
# Find first pending item
for item in sorted_queue:
if item.get("status") == DispatchStatus.PENDING.value:
return item
return None
def update_item(self, item_id: str, status: DispatchStatus,
result: Optional[DispatchResult] = None):
"""Update a queue item."""
for item in self.queue:
if item.get("id") == item_id:
item["status"] = status.value
item["last_attempt"] = time.time()
if result:
item["result"] = {
"status": result.status.value,
"message": result.message,
"exit_code": result.exit_code,
"stdout": result.stdout,
"stderr": result.stderr,
"execution_time": result.execution_time,
"hermes_path": result.hermes_path,
"validated": result.validated
}
# Update retry count if failed
if status == DispatchStatus.FAILED:
item["retry_count"] = item.get("retry_count", 0) + 1
self.save_queue()
break
def remove_item(self, item_id: str):
"""Remove an item from the queue."""
self.queue = [item for item in self.queue if item.get("id") != item_id]
self.save_queue()
def get_failed_items(self) -> List[Dict[str, Any]]:
"""Get all failed items that can be retried."""
return [
item for item in self.queue
if item.get("status") == DispatchStatus.FAILED.value
and item.get("retry_count", 0) < item.get("max_retries", 3)
]
def get_stats(self) -> Dict[str, Any]:
"""Get queue statistics."""
total = len(self.queue)
pending = sum(1 for item in self.queue if item.get("status") == DispatchStatus.PENDING.value)
success = sum(1 for item in self.queue if item.get("status") == DispatchStatus.SUCCESS.value)
failed = sum(1 for item in self.queue if item.get("status") == DispatchStatus.FAILED.value)
return {
"total": total,
"pending": pending,
"success": success,
"failed": failed,
"retryable": len(self.get_failed_items())
}
def process_dispatch_queue(dispatcher: VPSAgentDispatcher,
queue: DispatchQueue,
batch_size: int = 5) -> Dict[str, Any]:
"""
Process items from the dispatch queue.
Args:
dispatcher: VPS agent dispatcher
queue: Dispatch queue
batch_size: Number of items to process in this batch
Returns:
Processing statistics
"""
processed = 0
success = 0
failed = 0
for _ in range(batch_size):
item = queue.get_next_item()
if not item:
break
item_id = item["id"]
agent_name = item["agent_name"]
command = item["command"]
# Update status to dispatching
queue.update_item(item_id, DispatchStatus.DISPATCHING)
# Dispatch the command
result = dispatcher.dispatch_hermes_command(
agent_name=agent_name,
hermes_command=command,
validate_first=True
)
# Update queue with result
if result.status == DispatchStatus.SUCCESS:
queue.update_item(item_id, DispatchStatus.SUCCESS, result)
success += 1
else:
# Check if we should retry
item_data = next((i for i in queue.queue if i.get("id") == item_id), None)
if item_data and item_data.get("retry_count", 0) < item_data.get("max_retries", 3):
queue.update_item(item_id, DispatchStatus.FAILED, result)
failed += 1
else:
# Max retries reached, remove from queue
queue.remove_item(item_id)
failed += 1
processed += 1
return {
"processed": processed,
"success": success,
"failed": failed,
"queue_stats": queue.get_stats()
}
# Example usage and testing
if __name__ == "__main__":
# Set up logging
logging.basicConfig(level=logging.INFO)
# Create dispatcher and queue
dispatcher = VPSAgentDispatcher()
queue = DispatchQueue()
# Example: Add items to queue
queue.add_item("ezra", "cron list")
queue.add_item("timmy", "cron status")
# Process queue
stats = process_dispatch_queue(dispatcher, queue)
print(f"Processing stats: {stats}")
# Show queue stats
queue_stats = queue.get_stats()
print(f"Queue stats: {queue_stats}")

View File

@@ -653,12 +653,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
# AIAgent.__init__ is missing params the scheduler expects.
_validate_agent_interface()
# Check if this is a dispatch job
if job.get("type") == "dispatch" or "dispatch" in job.get("name", "").lower():
return _run_dispatch_job(job)
from run_agent import AIAgent
# Initialize SQLite session store so cron job messages are persisted
@@ -1013,89 +1007,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
logger.debug("Job '%s': failed to close SQLite session store: %s", job_id, e)
def _run_dispatch_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
"""
Execute a dispatch job that SSHs into remote VPS machines.
Returns:
Tuple of (success, full_output_doc, final_response, error_message)
"""
from cron.dispatch_worker import VPSAgentDispatcher, DispatchQueue, process_dispatch_queue
job_id = job["id"]
job_name = job["name"]
logger.info("Running dispatch job '%s' (ID: %s)", job_name, job_id)
try:
# Load dispatch configuration
dispatcher = VPSAgentDispatcher()
queue = DispatchQueue()
# Get dispatch parameters from job
agent_name = job.get("agent_name", "ezra")
command = job.get("command", "cron list")
batch_size = job.get("batch_size", 5)
# Add command to queue if specified
if command:
queue.add_item(agent_name, command)
# Process the dispatch queue
stats = process_dispatch_queue(dispatcher, queue, batch_size)
# Generate output
output = f"""# Dispatch Job: {job_name}
**Job ID:** {job_id}
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Agent:** {agent_name}
**Command:** {command}
## Dispatch Results
- **Processed:** {stats['processed']}
- **Success:** {stats['success']}
- **Failed:** {stats['failed']}
## Queue Statistics
- **Total items:** {stats['queue_stats']['total']}
- **Pending:** {stats['queue_stats']['pending']}
- **Success:** {stats['queue_stats']['success']}
- **Failed:** {stats['queue_stats']['failed']}
- **Retryable:** {stats['queue_stats']['retryable']}
## Status
{"✅ All dispatches successful" if stats['failed'] == 0 else f"⚠️ {stats['failed']} dispatches failed"}
"""
success = stats['failed'] == 0
error_message = None if success else f"{stats['failed']} dispatches failed"
return (success, output, output, error_message)
except Exception as e:
error_msg = f"Dispatch job failed: {str(e)}"
logger.error(error_msg, exc_info=True)
output = f"""# Dispatch Job: {job_name}
**Job ID:** {job_id}
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Status:** ❌ Failed
## Error
{error_msg}
"""
return (False, output, output, error_msg)
def tick(verbose: bool = True, adapters=None, loop=None) -> int:
"""
Check and run all due jobs.

View File

@@ -32,7 +32,7 @@ T = TypeVar("T")
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
SCHEMA_VERSION = 6
SCHEMA_VERSION = 7
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
@@ -66,6 +66,7 @@ CREATE TABLE IF NOT EXISTS sessions (
cost_source TEXT,
pricing_version TEXT,
title TEXT,
profile TEXT,
FOREIGN KEY (parent_session_id) REFERENCES sessions(id)
);
@@ -86,6 +87,7 @@ CREATE TABLE IF NOT EXISTS messages (
);
CREATE INDEX IF NOT EXISTS idx_sessions_source ON sessions(source);
CREATE INDEX IF NOT EXISTS idx_sessions_profile ON sessions(profile);
CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id);
CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC);
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp);
@@ -330,6 +332,19 @@ class SessionDB:
except sqlite3.OperationalError:
pass # Column already exists
cursor.execute("UPDATE schema_version SET version = 6")
if current_version < 7:
# v7: add profile column to sessions for profile isolation (#323)
try:
cursor.execute('ALTER TABLE sessions ADD COLUMN "profile" TEXT')
except sqlite3.OperationalError:
pass # Column already exists
try:
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_sessions_profile ON sessions(profile)"
)
except sqlite3.OperationalError:
pass
cursor.execute("UPDATE schema_version SET version = 7")
# Unique title index — always ensure it exists (safe to run after migrations
# since the title column is guaranteed to exist at this point)
@@ -362,13 +377,19 @@ class SessionDB:
system_prompt: str = None,
user_id: str = None,
parent_session_id: str = None,
profile: str = None,
) -> str:
"""Create a new session record. Returns the session_id."""
"""Create a new session record. Returns the session_id.
Args:
profile: Profile name for session isolation. When set, sessions
are tagged so queries can filter by profile. (#323)
"""
def _do(conn):
conn.execute(
"""INSERT OR IGNORE INTO sessions (id, source, user_id, model, model_config,
system_prompt, parent_session_id, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
system_prompt, parent_session_id, profile, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
session_id,
source,
@@ -377,6 +398,7 @@ class SessionDB:
json.dumps(model_config) if model_config else None,
system_prompt,
parent_session_id,
profile,
time.time(),
),
)
@@ -505,19 +527,23 @@ class SessionDB:
session_id: str,
source: str = "unknown",
model: str = None,
profile: str = None,
) -> None:
"""Ensure a session row exists, creating it with minimal metadata if absent.
Used by _flush_messages_to_session_db to recover from a failed
create_session() call (e.g. transient SQLite lock at agent startup).
INSERT OR IGNORE is safe to call even when the row already exists.
Args:
profile: Profile name for session isolation. (#323)
"""
def _do(conn):
conn.execute(
"""INSERT OR IGNORE INTO sessions
(id, source, model, started_at)
VALUES (?, ?, ?, ?)""",
(session_id, source, model, time.time()),
(id, source, model, profile, started_at)
VALUES (?, ?, ?, ?, ?)""",
(session_id, source, model, profile, time.time()),
)
self._execute_write(_do)
@@ -788,6 +814,7 @@ class SessionDB:
limit: int = 20,
offset: int = 0,
include_children: bool = False,
profile: str = None,
) -> List[Dict[str, Any]]:
"""List sessions with preview (first user message) and last active timestamp.
@@ -799,6 +826,10 @@ class SessionDB:
By default, child sessions (subagent runs, compression continuations)
are excluded. Pass ``include_children=True`` to include them.
Args:
profile: Filter sessions to this profile name. Pass None to see all.
(#323)
"""
where_clauses = []
params = []
@@ -813,6 +844,9 @@ class SessionDB:
placeholders = ",".join("?" for _ in exclude_sources)
where_clauses.append(f"s.source NOT IN ({placeholders})")
params.extend(exclude_sources)
if profile:
where_clauses.append("s.profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
query = f"""
@@ -1158,34 +1192,52 @@ class SessionDB:
source: str = None,
limit: int = 20,
offset: int = 0,
profile: str = None,
) -> List[Dict[str, Any]]:
"""List sessions, optionally filtered by source."""
"""List sessions, optionally filtered by source and profile.
Args:
profile: Filter sessions to this profile name. Pass None to see all.
(#323)
"""
where_clauses = []
params = []
if source:
where_clauses.append("source = ?")
params.append(source)
if profile:
where_clauses.append("profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
query = f"SELECT * FROM sessions {where_sql} ORDER BY started_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
with self._lock:
if source:
cursor = self._conn.execute(
"SELECT * FROM sessions WHERE source = ? ORDER BY started_at DESC LIMIT ? OFFSET ?",
(source, limit, offset),
)
else:
cursor = self._conn.execute(
"SELECT * FROM sessions ORDER BY started_at DESC LIMIT ? OFFSET ?",
(limit, offset),
)
cursor = self._conn.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
# =========================================================================
# Utility
# =========================================================================
def session_count(self, source: str = None) -> int:
"""Count sessions, optionally filtered by source."""
def session_count(self, source: str = None, profile: str = None) -> int:
"""Count sessions, optionally filtered by source and profile.
Args:
profile: Filter to this profile name. Pass None to count all. (#323)
"""
where_clauses = []
params = []
if source:
where_clauses.append("source = ?")
params.append(source)
if profile:
where_clauses.append("profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
with self._lock:
if source:
cursor = self._conn.execute(
"SELECT COUNT(*) FROM sessions WHERE source = ?", (source,)
)
else:
cursor = self._conn.execute("SELECT COUNT(*) FROM sessions")
cursor = self._conn.execute(f"SELECT COUNT(*) FROM sessions {where_sql}", params)
return cursor.fetchone()[0]
def message_count(self, session_id: str = None) -> int:

View File

@@ -0,0 +1,129 @@
"""
Test remote hermes path validation functions.
"""
import pytest
import subprocess
from unittest.mock import Mock, patch
from tools.environments.ssh import SSHEnvironment
class TestHermesPathValidation:
"""Test hermes path validation functions."""
def test_validate_remote_hermes_path_found(self):
"""Test validation when hermes binary exists."""
# Mock SSHEnvironment
ssh_env = Mock(spec=SSHEnvironment)
ssh_env.run = Mock(return_value="FOUND")
# Call validation
result = SSHEnvironment.validate_remote_hermes_path(ssh_env, "/usr/local/bin/hermes")
# Verify result
assert result["available"] is True
assert result["path"] == "/usr/local/bin/hermes"
assert result["error"] is None
def test_validate_remote_hermes_path_not_found(self):
"""Test validation when hermes binary doesn't exist."""
# Mock SSHEnvironment
ssh_env = Mock(spec=SSHEnvironment)
ssh_env.run = Mock(return_value="NOT_FOUND")
# Call validation
result = SSHEnvironment.validate_remote_hermes_path(ssh_env, "/invalid/path/hermes")
# Verify result
assert result["available"] is False
assert result["path"] == "/invalid/path/hermes"
assert "not found" in result["error"].lower()
def test_validate_remote_hermes_path_error(self):
"""Test validation when SSH command fails."""
# Mock SSHEnvironment
ssh_env = Mock(spec=SSHEnvironment)
ssh_env.run = Mock(side_effect=subprocess.TimeoutExpired("cmd", 10))
# Call validation
result = SSHEnvironment.validate_remote_hermes_path(ssh_env, "/usr/local/bin/hermes")
# Verify result
assert result["available"] is False
assert "error" in result["error"].lower()
def test_get_default_hermes_path(self):
"""Test getting default hermes path."""
# Mock SSHEnvironment
ssh_env = Mock(spec=SSHEnvironment)
# Test with local bin path found
ssh_env.run = Mock(return_value="/home/user/.local/bin/hermes")
result = SSHEnvironment._get_default_hermes_path(ssh_env)
assert result == "/home/user/.local/bin/hermes"
# Test with wizard pattern
ssh_env.run = Mock(side_effect=["", "/root/wizards/ezra/hermes-agent/venv/bin/hermes"])
result = SSHEnvironment._get_default_hermes_path(ssh_env)
assert result == "/root/wizards/ezra/hermes-agent/venv/bin/hermes"
def test_execute_hermes_command_success(self):
"""Test successful hermes command execution."""
# Mock SSHEnvironment
ssh_env = Mock(spec=SSHEnvironment)
ssh_env.run = Mock(return_value="Job output here")
ssh_env.validate_remote_hermes_path = Mock(return_value={
"available": True,
"path": "/usr/local/bin/hermes",
"error": None
})
# Call execution
result = SSHEnvironment.execute_hermes_command(ssh_env, "cron list", validate_path=True)
# Verify result
assert result["success"] is True
assert result["stdout"] == "Job output here"
assert result["exit_code"] == 0
assert result["error"] is None
def test_execute_hermes_command_validation_failed(self):
"""Test hermes command execution when validation fails."""
# Mock SSHEnvironment
ssh_env = Mock(spec=SSHEnvironment)
ssh_env.validate_remote_hermes_path = Mock(return_value={
"available": False,
"path": "/invalid/path/hermes",
"error": "Hermes binary not found"
})
# Call execution
result = SSHEnvironment.execute_hermes_command(ssh_env, "cron list", validate_path=True)
# Verify result
assert result["success"] is False
assert "not found" in result["error"].lower()
assert result["exit_code"] == 1
def test_execute_hermes_command_timeout(self):
"""Test hermes command execution timeout."""
# Mock SSHEnvironment
ssh_env = Mock(spec=SSHEnvironment)
ssh_env.run = Mock(side_effect=subprocess.TimeoutExpired("cmd", 300))
ssh_env.validate_remote_hermes_path = Mock(return_value={
"available": True,
"path": "/usr/local/bin/hermes",
"error": None
})
# Call execution
result = SSHEnvironment.execute_hermes_command(ssh_env, "cron list", validate_path=True)
# Verify result
assert result["success"] is False
assert "timeout" in result["error"].lower()
assert result["exit_code"] == -1
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -311,3 +311,152 @@ class SSHEnvironment(PersistentShellMixin, BaseEnvironment):
self.control_socket.unlink()
except OSError:
pass
def validate_remote_hermes_path(self, hermes_path: str = None) -> dict:
"""
Validate that hermes binary exists and is executable on the remote host.
Args:
hermes_path: Path to hermes binary. If None, uses default path.
Returns:
dict with keys:
- available: bool (True if hermes is available)
- path: str (actual path found)
- error: str (error message if not available)
"""
if hermes_path is None:
hermes_path = self._get_default_hermes_path()
# Check if hermes binary exists and is executable
check_cmd = f"test -x {hermes_path} && echo 'FOUND' || echo 'NOT_FOUND'"
try:
result = self.run(check_cmd, timeout=10)
if "FOUND" in result:
return {
"available": True,
"path": hermes_path,
"error": None
}
else:
return {
"available": False,
"path": hermes_path,
"error": f"Hermes binary not found or not executable: {hermes_path}"
}
except Exception as e:
return {
"available": False,
"path": hermes_path,
"error": f"Error validating hermes path: {str(e)}"
}
def _get_default_hermes_path(self) -> str:
"""Get the default hermes path for this host."""
# Try common paths in order of preference
paths_to_try = [
"~/.local/bin/hermes", # Standard install location
"/root/wizards/*/hermes-agent/venv/bin/hermes", # Wizard pattern
"/usr/local/bin/hermes", # System install
]
for path_pattern in paths_to_try:
if "*" in path_pattern:
# Use find for glob patterns
find_cmd = f"find {path_pattern.replace('*', '*')} -maxdepth 0 2>/dev/null | head -1"
try:
result = self.run(find_cmd, timeout=5)
if result.strip():
return result.strip()
except:
continue
else:
# Direct path check
check_cmd = f"test -x {path_pattern} && echo {path_pattern}"
try:
result = self.run(check_cmd, timeout=5)
if result.strip():
return result.strip()
except:
continue
# Fallback to wizard pattern
return "/root/wizards/*/hermes-agent/venv/bin/hermes"
def execute_hermes_command(self, command: str, validate_path: bool = True) -> dict:
"""
Execute a hermes command on the remote host with proper validation.
Args:
command: Hermes command to execute (e.g., "cron list")
validate_path: Whether to validate hermes path before execution
Returns:
dict with keys:
- success: bool (True if command executed successfully)
- stdout: str (command output)
- stderr: str (error output)
- exit_code: int (command exit code)
- error: str (error message if failed)
"""
# Validate hermes path if requested
if validate_path:
validation = self.validate_remote_hermes_path()
if not validation["available"]:
return {
"success": False,
"stdout": "",
"stderr": validation["error"],
"exit_code": 1,
"error": validation["error"]
}
hermes_path = validation["path"]
else:
hermes_path = self._get_default_hermes_path()
# Build full command
full_command = f"{hermes_path} {command}"
try:
# Execute command
result = self.run(full_command, timeout=300)
# Check exit code - only mark success if exit code is 0
# Note: self.run() raises an exception on non-zero exit code,
# so if we get here, the command succeeded
return {
"success": True,
"stdout": result,
"stderr": "",
"exit_code": 0,
"error": None
}
except subprocess.CalledProcessError as e:
# Command failed with non-zero exit code
return {
"success": False,
"stdout": e.stdout or "",
"stderr": e.stderr or "",
"exit_code": e.returncode,
"error": f"Command failed with exit code {e.returncode}"
}
except subprocess.TimeoutExpired:
return {
"success": False,
"stdout": "",
"stderr": "",
"exit_code": -1,
"error": "Command timed out"
}
except Exception as e:
return {
"success": False,
"stdout": "",
"stderr": "",
"exit_code": -1,
"error": f"Error executing command: {str(e)}"
}