Compare commits

..

3 Commits

Author SHA1 Message Date
54590dd627 feat(cron): Add dispatch job handling to scheduler
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 41s
Add dispatch job support to scheduler:
1. Detect dispatch jobs by type or name pattern
2. Route dispatch jobs to specialized handler
3. Integrate with new dispatch worker for VPS agent dispatch
4. Proper error handling and status reporting

Resolves #350
2026-04-13 23:28:06 +00:00
378476570d feat(config): Add default dispatch configuration
Add default configuration for VPS agent dispatch worker with:
- Agent-specific hermes binary paths
- Validation timeout settings
- Command timeout settings
- Retry configuration

Resolves #350
2026-04-13 23:25:15 +00:00
4da1d8be88 feat(cron): Add VPS agent dispatch worker with proper error handling
Add dispatch worker that:
1. Validates remote hermes binary paths before dispatching
2. Only marks dispatch as success when remote hermes command actually launches
3. Keeps failed dispatches in queue or marks them as failed
4. Provides configurable agent paths and validation

Resolves #350
2026-04-13 23:24:54 +00:00
8 changed files with 684 additions and 512 deletions

View File

@@ -1,226 +0,0 @@
#!/usr/bin/env python3
"""
Pre-commit hook for detecting hardcoded ~/.hermes paths.
This is a poka-yoke (error-proofing) measure to prevent profile isolation
failures. All code should use get_hermes_home() from hermes_constants instead
of hardcoding ~/.hermes or Path.home() / ".hermes".
Installation:
git config core.hooksPath .githooks
To bypass:
git commit --no-verify
"""
from __future__ import annotations
import re
import subprocess
import sys
from pathlib import Path
from typing import Iterable, List
# ANSI color codes
RED = "\033[0;31m"
YELLOW = "\033[1;33m"
GREEN = "\033[0;32m"
NC = "\033[0m"
class Finding:
"""Represents a single hardcoded path finding."""
def __init__(self, filename: str, line: int, message: str, suggestion: str = "") -> None:
self.filename = filename
self.line = line
self.message = message
self.suggestion = suggestion
def __repr__(self) -> str:
return f"Finding({self.filename!r}, {self.line}, {self.message!r})"
# ---------------------------------------------------------------------------
# Regex patterns for hardcoded paths
# ---------------------------------------------------------------------------
# Pattern 1: Path.home() / ".hermes" or Path.home() / '.hermes'
_RE_PATH_HOME_HERMES = re.compile(
r"""Path\.home\(\)\s*/\s*['"]\.hermes['"]"""
)
# Pattern 2: Path.home() / ".hermes" / something
_RE_PATH_HOME_HERMES_SUB = re.compile(
r"""Path\.home\(\)\s*/\s*['"]\.hermes['"]\s*/"""
)
# Pattern 3: ~/.hermes in strings (but not in comments or docs)
_RE_TILDE_HERMES = re.compile(
r"""['"]~/?\.hermes(/|['"])"""
)
# Pattern 4: os.path.expanduser("~/.hermes")
_RE_EXPANDUSER_HERMES = re.compile(
r"""os\.path\.expanduser\(\s*['"]~/?\.hermes"""
)
# Pattern 5: os.path.join(os.path.expanduser("~"), ".hermes")
_RE_JOIN_EXPANDUSER = re.compile(
r"""os\.path\.join\(\s*os\.path\.expanduser\(\s*['"]~['"]\s*\)\s*,\s*['"]\.hermes['"]"""
)
# All patterns combined
_ALL_PATTERNS = [
(_RE_PATH_HOME_HERMES, "Path.home() / '.hermes' — use get_hermes_home() instead"),
(_RE_PATH_HOME_HERMES_SUB, "Path.home() / '.hermes' / ... — use get_hermes_home() / '...' instead"),
(_RE_TILDE_HERMES, "'~/.hermes' — use get_hermes_home() for paths, display_hermes_home() for display"),
(_RE_EXPANDUSER_HERMES, "os.path.expanduser('~/.hermes') — use get_hermes_home() instead"),
(_RE_JOIN_EXPANDUSER, "os.path.join(expanduser('~'), '.hermes') — use get_hermes_home() instead"),
]
# Safe contexts (don't flag these)
_SAFE_CONTEXTS = [
# hermes_constants.py is allowed (it's the source of truth)
"hermes_constants.py",
# Test files can mock/test the behavior
"test_",
"_test.py",
"/tests/",
# Documentation files
".md",
"README",
"CHANGELOG",
"AGENTS.md",
# Example/template files
".example",
"template",
]
def _is_safe_context(filename: str) -> bool:
"""Check if the file is in a safe context where hardcoded paths are OK."""
for safe in _SAFE_CONTEXTS:
if safe in filename:
return True
return False
def _is_comment_or_doc(line: str) -> bool:
"""Check if the line is a comment or documentation."""
stripped = line.strip()
if stripped.startswith("#"):
return True
if stripped.startswith('"""') or stripped.startswith("'''"):
return True
if '"""' in stripped or "'''" in stripped:
return True
return False
def scan_line_for_hardcoded_paths(line: str, filename: str, line_no: int) -> Iterable[Finding]:
"""Scan a single line for hardcoded ~/.hermes paths."""
if _is_safe_context(filename):
return
stripped = line.rstrip("\n")
if not stripped:
return
# Skip comments and docstrings
if _is_comment_or_doc(stripped):
return
for pattern, message in _ALL_PATTERNS:
if pattern.search(stripped):
yield Finding(
filename,
line_no,
message,
"Use get_hermes_home() from hermes_constants for paths, display_hermes_home() for display",
)
return # One finding per line is enough
def get_staged_files() -> List[str]:
"""Get list of staged files in the git index."""
try:
result = subprocess.run(
["git", "diff", "--cached", "--name-only", "--diff-filter=ACM"],
capture_output=True,
text=True,
check=True,
)
return [f.strip() for f in result.stdout.splitlines() if f.strip()]
except subprocess.CalledProcessError:
return []
def get_staged_content(filename: str) -> str:
"""Get the staged content of a file."""
try:
result = subprocess.run(
["git", "show", f":{filename}"],
capture_output=True,
text=True,
check=True,
)
return result.stdout
except subprocess.CalledProcessError:
return ""
def scan_file(filename: str) -> List[Finding]:
"""Scan a file for hardcoded ~/.hermes paths."""
if _is_safe_context(filename):
return []
# Only scan Python files
if not filename.endswith(".py"):
return []
content = get_staged_content(filename)
if not content:
return []
findings = []
for line_no, line in enumerate(content.splitlines(), start=1):
for finding in scan_line_for_hardcoded_paths(line, filename, line_no):
findings.append(finding)
return findings
def main() -> int:
"""Main entry point for the pre-commit hook."""
staged_files = get_staged_files()
if not staged_files:
return 0
all_findings = []
for filename in staged_files:
findings = scan_file(filename)
all_findings.extend(findings)
if not all_findings:
return 0
# Print findings
print(f"\n{RED}✗ Hardcoded ~/.hermes paths detected:{NC}\n")
for finding in all_findings:
print(f" {YELLOW}{finding.filename}:{finding.line}{NC}")
print(f" {finding.message}")
if finding.suggestion:
print(f" {GREEN}Fix: {finding.suggestion}{NC}")
print()
print(f"{RED}Found {len(all_findings)} hardcoded path(s).{NC}")
print(f"{YELLOW}Use get_hermes_home() from hermes_constants for paths.{NC}")
print(f"{YELLOW}Use display_hermes_home() for user-facing display.{NC}")
print(f"\n{YELLOW}To bypass: git commit --no-verify{NC}\n")
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -295,22 +295,6 @@ def main() -> int:
if line.startswith("+") and not line.startswith("+++"):
findings.extend(scan_line(line[1:], "<diff>", line_no))
# Also check for hardcoded ~/.hermes paths
print(f"{GREEN}🔍 Scanning for hardcoded ~/.hermes paths...{NC}")
try:
import subprocess as sp
result = sp.run(
[sys.executable, str(Path(__file__).parent / "check_hardcoded_paths.py")],
capture_output=True,
text=True,
)
if result.returncode != 0:
# Print the output from the hardcoded path check
print(result.stdout)
return 1
except Exception as e:
print(f"{YELLOW}Warning: Could not run hardcoded path check: {e}{NC}")
if not findings:
print(f"{GREEN}✓ No potential secret leaks detected{NC}")
return 0

View File

@@ -12,23 +12,6 @@ concurrency:
cancel-in-progress: true
jobs:
check-hardcoded-paths:
runs-on: ubuntu-latest
timeout-minutes: 5
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python 3.11
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Check for hardcoded ~/.hermes paths
run: |
python .githooks/check_hardcoded_paths.py
# This will fail if any hardcoded paths are found
test:
runs-on: ubuntu-latest
container: catthehacker/ubuntu:act-22.04

View File

@@ -0,0 +1,18 @@
{
"agents": {
"ezra": {
"host": "143.198.27.163",
"hermes_path": "/root/wizards/ezra/hermes-agent/venv/bin/hermes",
"username": "root"
},
"timmy": {
"host": "timmy",
"hermes_path": "/root/wizards/timmy/hermes-agent/venv/bin/hermes",
"username": "root"
}
},
"validation_timeout": 30,
"command_timeout": 300,
"max_retries": 2,
"retry_delay": 5
}

551
cron/dispatch_worker.py Normal file
View File

@@ -0,0 +1,551 @@
"""
VPS Agent Dispatch Worker for Hermes Cron System
This module provides a dispatch worker that SSHs into remote VPS machines
and runs hermes commands. It ensures that:
1. Remote dispatch only counts as success when the remote hermes command actually launches
2. Stale per-agent hermes binary paths are configurable/validated before queue drain
3. Failed remote launches remain in the queue (or are marked failed) instead of being reported as OK
"""
import json
import logging
import os
import subprocess
import sys
import time
from pathlib import Path
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
from enum import Enum
logger = logging.getLogger(__name__)
class DispatchStatus(Enum):
"""Status of a dispatch operation."""
PENDING = "pending"
VALIDATING = "validating"
DISPATCHING = "dispatching"
SUCCESS = "success"
FAILED = "failed"
RETRYING = "retrying"
@dataclass
class DispatchResult:
"""Result of a dispatch operation."""
status: DispatchStatus
message: str
exit_code: Optional[int] = None
stdout: Optional[str] = None
stderr: Optional[str] = None
execution_time: Optional[float] = None
hermes_path: Optional[str] = None
validated: bool = False
class HermesPathValidator:
"""Validates hermes binary paths on remote VPS machines."""
def __init__(self, ssh_key_path: Optional[str] = None):
self.ssh_key_path = ssh_key_path or os.path.expanduser("~/.ssh/id_rsa")
self.timeout = 30 # SSH timeout in seconds
def validate_hermes_path(self, host: str, hermes_path: str,
username: str = "root") -> DispatchResult:
"""
Validate that the hermes binary exists and is executable on the remote host.
Args:
host: Remote host IP or hostname
hermes_path: Path to hermes binary on remote host
username: SSH username
Returns:
DispatchResult with validation status
"""
start_time = time.time()
# Build SSH command to check hermes binary
ssh_cmd = [
"ssh",
"-i", self.ssh_key_path,
"-o", "StrictHostKeyChecking=no",
"-o", "ConnectTimeout=10",
"-o", "BatchMode=yes",
f"{username}@{host}",
f"test -x {hermes_path} && echo 'VALID' || echo 'INVALID'"
]
try:
result = subprocess.run(
ssh_cmd,
capture_output=True,
text=True,
timeout=self.timeout
)
execution_time = time.time() - start_time
if result.returncode == 0 and "VALID" in result.stdout:
return DispatchResult(
status=DispatchStatus.SUCCESS,
message=f"Hermes binary validated at {hermes_path}",
exit_code=0,
execution_time=execution_time,
hermes_path=hermes_path,
validated=True
)
else:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Hermes binary not found or not executable: {hermes_path}",
exit_code=result.returncode,
stdout=result.stdout,
stderr=result.stderr,
execution_time=execution_time,
hermes_path=hermes_path,
validated=False
)
except subprocess.TimeoutExpired:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"SSH timeout validating hermes path on {host}",
execution_time=time.time() - start_time,
hermes_path=hermes_path,
validated=False
)
except Exception as e:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Error validating hermes path: {str(e)}",
execution_time=time.time() - start_time,
hermes_path=hermes_path,
validated=False
)
class VPSAgentDispatcher:
"""Dispatches hermes commands to remote VPS agents."""
def __init__(self, config_path: Optional[str] = None):
self.config_path = config_path or os.path.expanduser("~/.hermes/dispatch_config.json")
self.validator = HermesPathValidator()
self.config = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""Load dispatch configuration."""
try:
if os.path.exists(self.config_path):
with open(self.config_path, 'r') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load dispatch config: {e}")
# Default configuration
return {
"agents": {
"ezra": {
"host": "143.198.27.163",
"hermes_path": "/root/wizards/ezra/hermes-agent/venv/bin/hermes",
"username": "root"
},
"timmy": {
"host": "timmy",
"hermes_path": "/root/wizards/timmy/hermes-agent/venv/bin/hermes",
"username": "root"
}
},
"validation_timeout": 30,
"command_timeout": 300,
"max_retries": 2,
"retry_delay": 5
}
def save_config(self):
"""Save dispatch configuration."""
try:
config_dir = Path(self.config_path).parent
config_dir.mkdir(parents=True, exist_ok=True)
with open(self.config_path, 'w') as f:
json.dump(self.config, f, indent=2)
# Set secure permissions
os.chmod(self.config_path, 0o600)
except Exception as e:
logger.error(f"Failed to save dispatch config: {e}")
def get_agent_config(self, agent_name: str) -> Optional[Dict[str, Any]]:
"""Get configuration for a specific agent."""
return self.config.get("agents", {}).get(agent_name)
def update_agent_config(self, agent_name: str, host: str, hermes_path: str,
username: str = "root"):
"""Update configuration for a specific agent."""
if "agents" not in self.config:
self.config["agents"] = {}
self.config["agents"][agent_name] = {
"host": host,
"hermes_path": hermes_path,
"username": username
}
self.save_config()
def validate_agent(self, agent_name: str) -> DispatchResult:
"""Validate that an agent's hermes binary is accessible."""
agent_config = self.get_agent_config(agent_name)
if not agent_config:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Agent configuration not found: {agent_name}"
)
return self.validator.validate_hermes_path(
host=agent_config["host"],
hermes_path=agent_config["hermes_path"],
username=agent_config.get("username", "root")
)
def dispatch_command(self, agent_name: str, command: str,
validate_first: bool = True) -> DispatchResult:
"""
Dispatch a command to a remote VPS agent.
Args:
agent_name: Name of the agent to dispatch to
command: Command to execute
validate_first: Whether to validate hermes path before dispatching
Returns:
DispatchResult with execution status
"""
agent_config = self.get_agent_config(agent_name)
if not agent_config:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Agent configuration not found: {agent_name}"
)
# Validate hermes path if requested
if validate_first:
validation_result = self.validate_agent(agent_name)
if validation_result.status != DispatchStatus.SUCCESS:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Validation failed: {validation_result.message}",
hermes_path=agent_config["hermes_path"],
validated=False
)
# Build SSH command to execute hermes command
ssh_cmd = [
"ssh",
"-i", self.validator.ssh_key_path,
"-o", "StrictHostKeyChecking=no",
"-o", "ConnectTimeout=10",
f"{agent_config.get('username', 'root')}@{agent_config['host']}",
f"cd /root/wizards/{agent_name}/hermes-agent && source venv/bin/activate && {command}"
]
start_time = time.time()
try:
result = subprocess.run(
ssh_cmd,
capture_output=True,
text=True,
timeout=self.config.get("command_timeout", 300)
)
execution_time = time.time() - start_time
if result.returncode == 0:
return DispatchResult(
status=DispatchStatus.SUCCESS,
message=f"Command executed successfully on {agent_name}",
exit_code=0,
stdout=result.stdout,
stderr=result.stderr,
execution_time=execution_time,
hermes_path=agent_config["hermes_path"],
validated=validate_first
)
else:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Command failed on {agent_name}: {result.stderr}",
exit_code=result.returncode,
stdout=result.stdout,
stderr=result.stderr,
execution_time=execution_time,
hermes_path=agent_config["hermes_path"],
validated=validate_first
)
except subprocess.TimeoutExpired:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Command timeout on {agent_name}",
execution_time=time.time() - start_time,
hermes_path=agent_config["hermes_path"],
validated=validate_first
)
except Exception as e:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Error executing command on {agent_name}: {str(e)}",
execution_time=time.time() - start_time,
hermes_path=agent_config["hermes_path"],
validated=validate_first
)
def dispatch_hermes_command(self, agent_name: str, hermes_command: str,
validate_first: bool = True) -> DispatchResult:
"""
Dispatch a hermes command to a remote VPS agent.
Args:
agent_name: Name of the agent to dispatch to
hermes_command: Hermes command to execute (e.g., "hermes cron list")
validate_first: Whether to validate hermes path before dispatching
Returns:
DispatchResult with execution status
"""
agent_config = self.get_agent_config(agent_name)
if not agent_config:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Agent configuration not found: {agent_name}"
)
# Build full hermes command
full_command = f"{agent_config['hermes_path']} {hermes_command}"
return self.dispatch_command(agent_name, full_command, validate_first)
class DispatchQueue:
"""Queue for managing dispatch operations."""
def __init__(self, queue_file: Optional[str] = None):
self.queue_file = queue_file or os.path.expanduser("~/.hermes/dispatch_queue.json")
self.queue: List[Dict[str, Any]] = self._load_queue()
def _load_queue(self) -> List[Dict[str, Any]]:
"""Load queue from file."""
try:
if os.path.exists(self.queue_file):
with open(self.queue_file, 'r') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load dispatch queue: {e}")
return []
def save_queue(self):
"""Save queue to file."""
try:
queue_dir = Path(self.queue_file).parent
queue_dir.mkdir(parents=True, exist_ok=True)
with open(self.queue_file, 'w') as f:
json.dump(self.queue, f, indent=2)
# Set secure permissions
os.chmod(self.queue_file, 0o600)
except Exception as e:
logger.error(f"Failed to save dispatch queue: {e}")
def add_item(self, agent_name: str, command: str, priority: int = 0,
max_retries: int = 3) -> str:
"""
Add an item to the dispatch queue.
Returns:
Queue item ID
"""
item_id = f"dispatch_{int(time.time())}_{len(self.queue)}"
item = {
"id": item_id,
"agent_name": agent_name,
"command": command,
"priority": priority,
"max_retries": max_retries,
"retry_count": 0,
"status": DispatchStatus.PENDING.value,
"created_at": time.time(),
"last_attempt": None,
"result": None
}
self.queue.append(item)
self.save_queue()
return item_id
def get_next_item(self) -> Optional[Dict[str, Any]]:
"""Get the next item from the queue (highest priority, oldest first)."""
if not self.queue:
return None
# Sort by priority (descending) and created_at (ascending)
sorted_queue = sorted(
self.queue,
key=lambda x: (-x.get("priority", 0), x.get("created_at", 0))
)
# Find first pending item
for item in sorted_queue:
if item.get("status") == DispatchStatus.PENDING.value:
return item
return None
def update_item(self, item_id: str, status: DispatchStatus,
result: Optional[DispatchResult] = None):
"""Update a queue item."""
for item in self.queue:
if item.get("id") == item_id:
item["status"] = status.value
item["last_attempt"] = time.time()
if result:
item["result"] = {
"status": result.status.value,
"message": result.message,
"exit_code": result.exit_code,
"stdout": result.stdout,
"stderr": result.stderr,
"execution_time": result.execution_time,
"hermes_path": result.hermes_path,
"validated": result.validated
}
# Update retry count if failed
if status == DispatchStatus.FAILED:
item["retry_count"] = item.get("retry_count", 0) + 1
self.save_queue()
break
def remove_item(self, item_id: str):
"""Remove an item from the queue."""
self.queue = [item for item in self.queue if item.get("id") != item_id]
self.save_queue()
def get_failed_items(self) -> List[Dict[str, Any]]:
"""Get all failed items that can be retried."""
return [
item for item in self.queue
if item.get("status") == DispatchStatus.FAILED.value
and item.get("retry_count", 0) < item.get("max_retries", 3)
]
def get_stats(self) -> Dict[str, Any]:
"""Get queue statistics."""
total = len(self.queue)
pending = sum(1 for item in self.queue if item.get("status") == DispatchStatus.PENDING.value)
success = sum(1 for item in self.queue if item.get("status") == DispatchStatus.SUCCESS.value)
failed = sum(1 for item in self.queue if item.get("status") == DispatchStatus.FAILED.value)
return {
"total": total,
"pending": pending,
"success": success,
"failed": failed,
"retryable": len(self.get_failed_items())
}
def process_dispatch_queue(dispatcher: VPSAgentDispatcher,
queue: DispatchQueue,
batch_size: int = 5) -> Dict[str, Any]:
"""
Process items from the dispatch queue.
Args:
dispatcher: VPS agent dispatcher
queue: Dispatch queue
batch_size: Number of items to process in this batch
Returns:
Processing statistics
"""
processed = 0
success = 0
failed = 0
for _ in range(batch_size):
item = queue.get_next_item()
if not item:
break
item_id = item["id"]
agent_name = item["agent_name"]
command = item["command"]
# Update status to dispatching
queue.update_item(item_id, DispatchStatus.DISPATCHING)
# Dispatch the command
result = dispatcher.dispatch_hermes_command(
agent_name=agent_name,
hermes_command=command,
validate_first=True
)
# Update queue with result
if result.status == DispatchStatus.SUCCESS:
queue.update_item(item_id, DispatchStatus.SUCCESS, result)
success += 1
else:
# Check if we should retry
item_data = next((i for i in queue.queue if i.get("id") == item_id), None)
if item_data and item_data.get("retry_count", 0) < item_data.get("max_retries", 3):
queue.update_item(item_id, DispatchStatus.FAILED, result)
failed += 1
else:
# Max retries reached, remove from queue
queue.remove_item(item_id)
failed += 1
processed += 1
return {
"processed": processed,
"success": success,
"failed": failed,
"queue_stats": queue.get_stats()
}
# Example usage and testing
if __name__ == "__main__":
# Set up logging
logging.basicConfig(level=logging.INFO)
# Create dispatcher and queue
dispatcher = VPSAgentDispatcher()
queue = DispatchQueue()
# Example: Add items to queue
queue.add_item("ezra", "cron list")
queue.add_item("timmy", "cron status")
# Process queue
stats = process_dispatch_queue(dispatcher, queue)
print(f"Processing stats: {stats}")
# Show queue stats
queue_stats = queue.get_stats()
print(f"Queue stats: {queue_stats}")

View File

@@ -653,6 +653,12 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
# AIAgent.__init__ is missing params the scheduler expects.
_validate_agent_interface()
# Check if this is a dispatch job
if job.get("type") == "dispatch" or "dispatch" in job.get("name", "").lower():
return _run_dispatch_job(job)
from run_agent import AIAgent
# Initialize SQLite session store so cron job messages are persisted
@@ -1007,6 +1013,89 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
logger.debug("Job '%s': failed to close SQLite session store: %s", job_id, e)
def _run_dispatch_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
"""
Execute a dispatch job that SSHs into remote VPS machines.
Returns:
Tuple of (success, full_output_doc, final_response, error_message)
"""
from cron.dispatch_worker import VPSAgentDispatcher, DispatchQueue, process_dispatch_queue
job_id = job["id"]
job_name = job["name"]
logger.info("Running dispatch job '%s' (ID: %s)", job_name, job_id)
try:
# Load dispatch configuration
dispatcher = VPSAgentDispatcher()
queue = DispatchQueue()
# Get dispatch parameters from job
agent_name = job.get("agent_name", "ezra")
command = job.get("command", "cron list")
batch_size = job.get("batch_size", 5)
# Add command to queue if specified
if command:
queue.add_item(agent_name, command)
# Process the dispatch queue
stats = process_dispatch_queue(dispatcher, queue, batch_size)
# Generate output
output = f"""# Dispatch Job: {job_name}
**Job ID:** {job_id}
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Agent:** {agent_name}
**Command:** {command}
## Dispatch Results
- **Processed:** {stats['processed']}
- **Success:** {stats['success']}
- **Failed:** {stats['failed']}
## Queue Statistics
- **Total items:** {stats['queue_stats']['total']}
- **Pending:** {stats['queue_stats']['pending']}
- **Success:** {stats['queue_stats']['success']}
- **Failed:** {stats['queue_stats']['failed']}
- **Retryable:** {stats['queue_stats']['retryable']}
## Status
{"✅ All dispatches successful" if stats['failed'] == 0 else f"⚠️ {stats['failed']} dispatches failed"}
"""
success = stats['failed'] == 0
error_message = None if success else f"{stats['failed']} dispatches failed"
return (success, output, output, error_message)
except Exception as e:
error_msg = f"Dispatch job failed: {str(e)}"
logger.error(error_msg, exc_info=True)
output = f"""# Dispatch Job: {job_name}
**Job ID:** {job_id}
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Status:** ❌ Failed
## Error
{error_msg}
"""
return (False, output, output, error_msg)
def tick(verbose: bool = True, adapters=None, loop=None) -> int:
"""
Check and run all due jobs.

View File

@@ -32,7 +32,7 @@ T = TypeVar("T")
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
SCHEMA_VERSION = 7
SCHEMA_VERSION = 6
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
@@ -66,7 +66,6 @@ CREATE TABLE IF NOT EXISTS sessions (
cost_source TEXT,
pricing_version TEXT,
title TEXT,
profile TEXT,
FOREIGN KEY (parent_session_id) REFERENCES sessions(id)
);
@@ -87,7 +86,6 @@ CREATE TABLE IF NOT EXISTS messages (
);
CREATE INDEX IF NOT EXISTS idx_sessions_source ON sessions(source);
CREATE INDEX IF NOT EXISTS idx_sessions_profile ON sessions(profile);
CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id);
CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC);
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp);
@@ -332,19 +330,6 @@ class SessionDB:
except sqlite3.OperationalError:
pass # Column already exists
cursor.execute("UPDATE schema_version SET version = 6")
if current_version < 7:
# v7: add profile column to sessions for profile isolation (#323)
try:
cursor.execute('ALTER TABLE sessions ADD COLUMN "profile" TEXT')
except sqlite3.OperationalError:
pass # Column already exists
try:
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_sessions_profile ON sessions(profile)"
)
except sqlite3.OperationalError:
pass
cursor.execute("UPDATE schema_version SET version = 7")
# Unique title index — always ensure it exists (safe to run after migrations
# since the title column is guaranteed to exist at this point)
@@ -377,19 +362,13 @@ class SessionDB:
system_prompt: str = None,
user_id: str = None,
parent_session_id: str = None,
profile: str = None,
) -> str:
"""Create a new session record. Returns the session_id.
Args:
profile: Profile name for session isolation. When set, sessions
are tagged so queries can filter by profile. (#323)
"""
"""Create a new session record. Returns the session_id."""
def _do(conn):
conn.execute(
"""INSERT OR IGNORE INTO sessions (id, source, user_id, model, model_config,
system_prompt, parent_session_id, profile, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
system_prompt, parent_session_id, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
session_id,
source,
@@ -398,7 +377,6 @@ class SessionDB:
json.dumps(model_config) if model_config else None,
system_prompt,
parent_session_id,
profile,
time.time(),
),
)
@@ -527,23 +505,19 @@ class SessionDB:
session_id: str,
source: str = "unknown",
model: str = None,
profile: str = None,
) -> None:
"""Ensure a session row exists, creating it with minimal metadata if absent.
Used by _flush_messages_to_session_db to recover from a failed
create_session() call (e.g. transient SQLite lock at agent startup).
INSERT OR IGNORE is safe to call even when the row already exists.
Args:
profile: Profile name for session isolation. (#323)
"""
def _do(conn):
conn.execute(
"""INSERT OR IGNORE INTO sessions
(id, source, model, profile, started_at)
VALUES (?, ?, ?, ?, ?)""",
(session_id, source, model, profile, time.time()),
(id, source, model, started_at)
VALUES (?, ?, ?, ?)""",
(session_id, source, model, time.time()),
)
self._execute_write(_do)
@@ -814,7 +788,6 @@ class SessionDB:
limit: int = 20,
offset: int = 0,
include_children: bool = False,
profile: str = None,
) -> List[Dict[str, Any]]:
"""List sessions with preview (first user message) and last active timestamp.
@@ -826,10 +799,6 @@ class SessionDB:
By default, child sessions (subagent runs, compression continuations)
are excluded. Pass ``include_children=True`` to include them.
Args:
profile: Filter sessions to this profile name. Pass None to see all.
(#323)
"""
where_clauses = []
params = []
@@ -844,9 +813,6 @@ class SessionDB:
placeholders = ",".join("?" for _ in exclude_sources)
where_clauses.append(f"s.source NOT IN ({placeholders})")
params.extend(exclude_sources)
if profile:
where_clauses.append("s.profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
query = f"""
@@ -1192,52 +1158,34 @@ class SessionDB:
source: str = None,
limit: int = 20,
offset: int = 0,
profile: str = None,
) -> List[Dict[str, Any]]:
"""List sessions, optionally filtered by source and profile.
Args:
profile: Filter sessions to this profile name. Pass None to see all.
(#323)
"""
where_clauses = []
params = []
if source:
where_clauses.append("source = ?")
params.append(source)
if profile:
where_clauses.append("profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
query = f"SELECT * FROM sessions {where_sql} ORDER BY started_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
"""List sessions, optionally filtered by source."""
with self._lock:
cursor = self._conn.execute(query, params)
if source:
cursor = self._conn.execute(
"SELECT * FROM sessions WHERE source = ? ORDER BY started_at DESC LIMIT ? OFFSET ?",
(source, limit, offset),
)
else:
cursor = self._conn.execute(
"SELECT * FROM sessions ORDER BY started_at DESC LIMIT ? OFFSET ?",
(limit, offset),
)
return [dict(row) for row in cursor.fetchall()]
# =========================================================================
# Utility
# =========================================================================
def session_count(self, source: str = None, profile: str = None) -> int:
"""Count sessions, optionally filtered by source and profile.
Args:
profile: Filter to this profile name. Pass None to count all. (#323)
"""
where_clauses = []
params = []
if source:
where_clauses.append("source = ?")
params.append(source)
if profile:
where_clauses.append("profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
def session_count(self, source: str = None) -> int:
"""Count sessions, optionally filtered by source."""
with self._lock:
cursor = self._conn.execute(f"SELECT COUNT(*) FROM sessions {where_sql}", params)
if source:
cursor = self._conn.execute(
"SELECT COUNT(*) FROM sessions WHERE source = ?", (source,)
)
else:
cursor = self._conn.execute("SELECT COUNT(*) FROM sessions")
return cursor.fetchone()[0]
def message_count(self, session_id: str = None) -> int:

View File

@@ -1,175 +0,0 @@
"""
Tests for hardcoded ~/.hermes path detection (poka-yoke).
These tests verify that the pre-commit hook correctly detects hardcoded
paths and that the codebase uses get_hermes_home() correctly.
"""
import os
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
# Import the scanner
import sys
sys.path.insert(0, str(Path(__file__).parent.parent / ".githooks"))
from check_hardcoded_paths import scan_line_for_hardcoded_paths, Finding
class TestHardcodedPathDetection:
"""Test the hardcoded path detection logic."""
def test_detects_path_home_hermes(self):
"""Detect Path.home() / '.hermes' pattern."""
line = ' home = Path.home() / ".hermes"'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 1
assert "Path.home()" in findings[0].message
def test_detects_path_home_hermes_subpath(self):
"""Detect Path.home() / '.hermes' / 'subdir' pattern."""
line = ' config_dir = Path.home() / ".hermes" / "config"'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 1
def test_detects_tilde_hermes_in_string(self):
"""Detect '~/.hermes' in string literals."""
line = ' path = "~/.hermes/config.yaml"'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 1
def test_detects_expanduser_hermes(self):
"""Detect os.path.expanduser('~/.hermes') pattern."""
line = ' home = os.path.expanduser("~/.hermes")'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 1
def test_detects_join_expanduser(self):
"""Detect os.path.join(expanduser('~'), '.hermes') pattern."""
line = ' home = os.path.join(os.path.expanduser("~"), ".hermes")'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 1
def test_ignores_comments(self):
"""Ignore hardcoded paths in comments."""
line = ' # This is ~/.hermes in a comment'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 0
def test_ignores_docstrings(self):
"""Ignore hardcoded paths in docstrings."""
line = ' """This mentions ~/.hermes in a docstring."""'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 0
def test_ignores_hermes_constants(self):
"""hermes_constants.py is allowed to have hardcoded paths."""
line = ' return Path.home() / ".hermes"'
findings = list(scan_line_for_hardcoded_paths(line, "hermes_constants.py", 1))
assert len(findings) == 0
def test_ignores_test_files(self):
"""Test files can have hardcoded paths for testing."""
line = ' home = Path.home() / ".hermes"'
findings = list(scan_line_for_hardcoded_paths(line, "test_something.py", 1))
assert len(findings) == 0
def test_ignores_markdown_files(self):
"""Markdown files can have hardcoded paths in examples."""
line = ' home = Path.home() / ".hermes"'
findings = list(scan_line_for_hardcoded_paths(line, "README.md", 1))
assert len(findings) == 0
def test_ignores_empty_lines(self):
"""Empty lines should not produce findings."""
line = ""
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 0
class TestHermesHomeUsage:
"""Test that the codebase uses get_hermes_home() correctly."""
def test_hermes_constants_has_get_hermes_home(self):
"""hermes_constants.py should export get_hermes_home()."""
from hermes_constants import get_hermes_home
assert callable(get_hermes_home)
def test_hermes_constants_has_display_hermes_home(self):
"""hermes_constants.py should export display_hermes_home()."""
from hermes_constants import display_hermes_home
assert callable(display_hermes_home)
def test_get_hermes_home_returns_path(self):
"""get_hermes_home() should return a Path object."""
from hermes_constants import get_hermes_home
result = get_hermes_home()
assert isinstance(result, Path)
def test_get_hermes_home_honors_env_var(self):
"""get_hermes_home() should honor HERMES_HOME env var."""
from hermes_constants import get_hermes_home
with tempfile.TemporaryDirectory() as tmpdir:
with patch.dict(os.environ, {"HERMES_HOME": tmpdir}):
result = get_hermes_home()
assert result == Path(tmpdir)
def test_display_hermes_home_returns_string(self):
"""display_hermes_home() should return a string."""
from hermes_constants import display_hermes_home
result = display_hermes_home()
assert isinstance(result, str)
def test_display_hermes_home_uses_tilde_shorthand(self):
"""display_hermes_home() should use ~/ shorthand for home directory."""
from hermes_constants import display_hermes_home, get_hermes_home
# If HERMES_HOME is under home directory, should use ~/
home = get_hermes_home()
if home.is_relative_to(Path.home()):
result = display_hermes_home()
assert result.startswith("~/")
def test_profile_isolation_with_env_var(self):
"""Each profile should have its own HERMES_HOME."""
from hermes_constants import get_hermes_home
with tempfile.TemporaryDirectory() as tmpdir1, tempfile.TemporaryDirectory() as tmpdir2:
# Profile 1
with patch.dict(os.environ, {"HERMES_HOME": tmpdir1}):
home1 = get_hermes_home()
# Profile 2
with patch.dict(os.environ, {"HERMES_HOME": tmpdir2}):
home2 = get_hermes_home()
assert home1 != home2
assert home1 == Path(tmpdir1)
assert home2 == Path(tmpdir2)
class TestPreCommitHookIntegration:
"""Integration tests for the pre-commit hook."""
def test_hook_script_exists(self):
"""The check_hardcoded_paths.py script should exist."""
hook_path = Path(__file__).parent.parent / ".githooks" / "check_hardcoded_paths.py"
assert hook_path.exists()
def test_hook_script_is_executable(self):
"""The check_hardcoded_paths.py script should be executable."""
hook_path = Path(__file__).parent.parent / ".githooks" / "check_hardcoded_paths.py"
assert hook_path.stat().st_mode & 0o111 # Check executable bits
def test_pre_commit_calls_hardcoded_check(self):
"""pre-commit.py should call the hardcoded path check."""
pre_commit_path = Path(__file__).parent.parent / ".githooks" / "pre-commit.py"
content = pre_commit_path.read_text()
assert "check_hardcoded_paths.py" in content
if __name__ == "__main__":
pytest.main([__file__, "-v"])