Compare commits

..

3 Commits

Author SHA1 Message Date
Alexander Whitestone
43cbd3191d fix(cron): SSH dispatch validation + failure detection (#350)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m5s
VPS agent dispatch reported OK while remote hermes binary paths were
broken (/root/wizards/.../venv/bin/hermes: No such file or directory).

Root causes:
1. No validation that remote hermes binary exists before dispatch
2. Scheduler failure detection missed common SSH error patterns

New cron/ssh_dispatch.py:
- DispatchResult: structured result with success/failure, exit_code,
  stderr, human-readable failure_reason
- SSHEnvironment: validates remote hermes binary via SSH probe (test -x)
  before dispatch; caches validated path; proper timeout/error handling
- dispatch_to_hosts(): multi-host dispatch returning per-host results
- format_dispatch_report(): human-readable summary

cron/scheduler.py _SCRIPT_FAILURE_PHRASES expanded:
- no such file or directory (exact bash error)
- command not found
- hermes binary not found / hermes not found
- ssh: connect to host
- connection timed out
- host key verification failed

These are detected by _detect_script_failure() so broken SSH dispatches
are flagged as failures instead of reported as OK.

Closes #350
2026-04-13 21:37:17 -04:00
954fd992eb Merge pull request 'perf: lazy session creation — defer DB write until first message (#314)' (#449) from whip/314-1776127532 into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 55s
Forge CI / smoke-and-build (pull_request) Failing after 1m12s
perf: lazy session creation (#314)

Closes #314.
2026-04-14 01:08:13 +00:00
Metatron
f35f56e397 perf: lazy session creation — defer DB write until first message (closes #314)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 56s
Remove eager create_session() call from AIAgent.__init__(). Sessions
are now created lazily on first _flush_messages_to_session_db() call
via ensure_session() which uses INSERT OR IGNORE.

Impact: eliminates 32.4% of sessions (3,564 of 10,985) that were
created at agent init but never received any messages.

The existing ensure_session() fallback in _flush_messages_to_session_db()
already handles this pattern — it was originally designed for recovery
after transient SQLite lock failures. Now it's the primary creation path.

Compression-initiated sessions still use create_session() directly
(line ~5995) since they have messages to write immediately.
2026-04-13 20:52:06 -04:00
9 changed files with 286 additions and 1155 deletions

View File

@@ -1,171 +0,0 @@
"""Memory Backend Interface — pluggable cross-session user modeling.
Provides a common interface for memory backends that persist user
preferences and patterns across sessions. Two implementations:
1. LocalBackend (default): SQLite-based, zero cloud dependency
2. HonchoBackend (opt-in): Honcho AI-native memory, requires API key
Both are zero-overhead when disabled — the interface returns empty
results and no writes occur.
Usage:
from agent.memory import get_memory_backend
backend = get_memory_backend() # returns configured backend
backend.store_preference("user", "prefers_python", "True")
context = backend.query_context("user", "What does this user prefer?")
"""
import json
import logging
import os
import sqlite3
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
@dataclass
class MemoryEntry:
"""A single memory entry."""
key: str
value: str
user_id: str
created_at: float = 0
updated_at: float = 0
metadata: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self):
now = time.time()
if not self.created_at:
self.created_at = now
if not self.updated_at:
self.updated_at = now
class MemoryBackend(ABC):
"""Abstract interface for memory backends."""
@abstractmethod
def is_available(self) -> bool:
"""Check if this backend is configured and usable."""
@abstractmethod
def store(self, user_id: str, key: str, value: str, metadata: Dict = None) -> bool:
"""Store a memory entry."""
@abstractmethod
def retrieve(self, user_id: str, key: str) -> Optional[MemoryEntry]:
"""Retrieve a single memory entry."""
@abstractmethod
def query(self, user_id: str, query_text: str, limit: int = 10) -> List[MemoryEntry]:
"""Query memories relevant to a text query."""
@abstractmethod
def list_keys(self, user_id: str) -> List[str]:
"""List all keys for a user."""
@abstractmethod
def delete(self, user_id: str, key: str) -> bool:
"""Delete a memory entry."""
@property
@abstractmethod
def backend_name(self) -> str:
"""Human-readable backend name."""
@property
@abstractmethod
def is_cloud(self) -> bool:
"""Whether this backend requires cloud connectivity."""
class NullBackend(MemoryBackend):
"""No-op backend when memory is disabled. Zero overhead."""
def is_available(self) -> bool:
return True # always "available" as null
def store(self, user_id: str, key: str, value: str, metadata: Dict = None) -> bool:
return True # no-op
def retrieve(self, user_id: str, key: str) -> Optional[MemoryEntry]:
return None
def query(self, user_id: str, query_text: str, limit: int = 10) -> List[MemoryEntry]:
return []
def list_keys(self, user_id: str) -> List[str]:
return []
def delete(self, user_id: str, key: str) -> bool:
return True
@property
def backend_name(self) -> str:
return "null (disabled)"
@property
def is_cloud(self) -> bool:
return False
# ---------------------------------------------------------------------------
# Singleton
# ---------------------------------------------------------------------------
_backend: Optional[MemoryBackend] = None
def get_memory_backend() -> MemoryBackend:
"""Get the configured memory backend.
Priority:
1. If HONCHO_API_KEY is set and honcho-ai is installed -> HonchoBackend
2. If memory_backend config is 'local' -> LocalBackend
3. Default -> NullBackend (zero overhead)
"""
global _backend
if _backend is not None:
return _backend
# Check config
backend_type = os.getenv("HERMES_MEMORY_BACKEND", "").lower().strip()
if backend_type == "honcho" or os.getenv("HONCHO_API_KEY"):
try:
from agent.memory.honcho_backend import HonchoBackend
backend = HonchoBackend()
if backend.is_available():
_backend = backend
logger.info("Memory backend: Honcho (cloud)")
return _backend
except ImportError:
logger.debug("Honcho not installed, falling back")
if backend_type == "local":
try:
from agent.memory.local_backend import LocalBackend
_backend = LocalBackend()
logger.info("Memory backend: Local (SQLite)")
return _backend
except Exception as e:
logger.warning("Local backend failed: %s", e)
# Default: null (zero overhead)
_backend = NullBackend()
return _backend
def reset_backend():
"""Reset the singleton (for testing)."""
global _backend
_backend = None

View File

@@ -1,263 +0,0 @@
"""Memory Backend Evaluation Framework.
Provides structured evaluation for comparing memory backends on:
1. Latency (store/retrieve/query operations)
2. Relevance (does query return useful results?)
3. Privacy (where is data stored?)
4. Reliability (availability, error handling)
5. Cost (API calls, cloud dependency)
Usage:
from agent.memory.evaluation import evaluate_backends
report = evaluate_backends()
"""
import json
import logging
import time
from dataclasses import dataclass, field, asdict
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
@dataclass
class BackendEvaluation:
"""Evaluation results for a single backend."""
backend_name: str
is_cloud: bool
available: bool
# Latency (milliseconds)
store_latency_ms: float = 0
retrieve_latency_ms: float = 0
query_latency_ms: float = 0
# Functionality
store_success: bool = False
retrieve_success: bool = False
query_returns_results: bool = False
query_result_count: int = 0
# Privacy
data_location: str = "unknown"
requires_api_key: bool = False
# Overall
score: float = 0 # 0-100
recommendation: str = ""
notes: List[str] = field(default_factory=list)
def _measure_latency(func, *args, **kwargs) -> tuple:
"""Measure function latency in milliseconds."""
start = time.perf_counter()
try:
result = func(*args, **kwargs)
elapsed = (time.perf_counter() - start) * 1000
return elapsed, result, None
except Exception as e:
elapsed = (time.perf_counter() - start) * 1000
return elapsed, None, e
def evaluate_backend(backend, test_user: str = "eval_user") -> BackendEvaluation:
"""Evaluate a single memory backend."""
from agent.memory import MemoryBackend
eval_result = BackendEvaluation(
backend_name=backend.backend_name,
is_cloud=backend.is_cloud,
available=backend.is_available(),
)
if not eval_result.available:
eval_result.notes.append("Backend not available")
eval_result.score = 0
eval_result.recommendation = "NOT AVAILABLE"
return eval_result
# Privacy assessment
if backend.is_cloud:
eval_result.data_location = "cloud (external)"
eval_result.requires_api_key = True
else:
eval_result.data_location = "local (~/.hermes/)"
# Test store
latency, success, err = _measure_latency(
backend.store,
test_user,
"eval_test_key",
"eval_test_value",
{"source": "evaluation"},
)
eval_result.store_latency_ms = latency
eval_result.store_success = success is True
if err:
eval_result.notes.append(f"Store error: {err}")
# Test retrieve
latency, result, err = _measure_latency(
backend.retrieve,
test_user,
"eval_test_key",
)
eval_result.retrieve_latency_ms = latency
eval_result.retrieve_success = result is not None
if err:
eval_result.notes.append(f"Retrieve error: {err}")
# Test query
latency, results, err = _measure_latency(
backend.query,
test_user,
"eval_test",
5,
)
eval_result.query_latency_ms = latency
eval_result.query_returns_results = bool(results)
eval_result.query_result_count = len(results) if results else 0
if err:
eval_result.notes.append(f"Query error: {err}")
# Cleanup
try:
backend.delete(test_user, "eval_test_key")
except Exception:
pass
# Score calculation (0-100)
score = 0
# Availability (20 points)
score += 20
# Functionality (40 points)
if eval_result.store_success:
score += 15
if eval_result.retrieve_success:
score += 15
if eval_result.query_returns_results:
score += 10
# Latency (20 points) — lower is better
avg_latency = (
eval_result.store_latency_ms +
eval_result.retrieve_latency_ms +
eval_result.query_latency_ms
) / 3
if avg_latency < 10:
score += 20
elif avg_latency < 50:
score += 15
elif avg_latency < 200:
score += 10
else:
score += 5
# Privacy (20 points) — local is better for sovereignty
if not backend.is_cloud:
score += 20
else:
score += 5 # cloud has privacy trade-offs
eval_result.score = score
# Recommendation
if score >= 80:
eval_result.recommendation = "RECOMMENDED"
elif score >= 60:
eval_result.recommendation = "ACCEPTABLE"
elif score >= 40:
eval_result.recommendation = "MARGINAL"
else:
eval_result.recommendation = "NOT RECOMMENDED"
return eval_result
def evaluate_backends() -> Dict[str, Any]:
"""Evaluate all available memory backends.
Returns a comparison report.
"""
from agent.memory import NullBackend
from agent.memory.local_backend import LocalBackend
backends = []
# Always evaluate Null (baseline)
backends.append(NullBackend())
# Evaluate Local
try:
backends.append(LocalBackend())
except Exception as e:
logger.warning("Local backend init failed: %s", e)
# Try Honcho if configured
import os
if os.getenv("HONCHO_API_KEY"):
try:
from agent.memory.honcho_backend import HonchoBackend
backends.append(HonchoBackend())
except ImportError:
logger.debug("Honcho not installed, skipping evaluation")
evaluations = []
for backend in backends:
try:
evaluations.append(evaluate_backend(backend))
except Exception as e:
logger.warning("Evaluation failed for %s: %s", backend.backend_name, e)
# Build report
report = {
"timestamp": time.time(),
"backends_evaluated": len(evaluations),
"evaluations": [asdict(e) for e in evaluations],
"recommendation": _build_recommendation(evaluations),
}
return report
def _build_recommendation(evaluations: List[BackendEvaluation]) -> str:
"""Build overall recommendation from evaluations."""
if not evaluations:
return "No backends evaluated"
# Find best non-null backend
viable = [e for e in evaluations if e.backend_name != "null (disabled)" and e.available]
if not viable:
return "No viable backends found. Use NullBackend (default)."
best = max(viable, key=lambda e: e.score)
parts = [f"Best backend: {best.backend_name} (score: {best.score})"]
if best.is_cloud:
parts.append(
"WARNING: Cloud backend has privacy trade-offs. "
"Data leaves your machine. Consider LocalBackend for sovereignty."
)
# Compare local vs cloud if both available
local = [e for e in viable if not e.is_cloud]
cloud = [e for e in viable if e.is_cloud]
if local and cloud:
local_score = max(e.score for e in local)
cloud_score = max(e.score for e in cloud)
if local_score >= cloud_score:
parts.append(
f"Local backend (score {local_score}) matches or beats "
f"cloud (score {cloud_score}). RECOMMEND: stay local for sovereignty."
)
else:
parts.append(
f"Cloud backend (score {cloud_score}) outperforms "
f"local (score {local_score}) but adds cloud dependency."
)
return " ".join(parts)

View File

@@ -1,171 +0,0 @@
"""Honcho memory backend — opt-in cloud-based user modeling.
Requires:
- pip install honcho-ai
- HONCHO_API_KEY environment variable (from app.honcho.dev)
Provides dialectic user context queries via Honcho's AI-native memory.
Zero runtime overhead when not configured — get_memory_backend() falls
back to LocalBackend or NullBackend if this fails to initialize.
This is the evaluation wrapper. It adapts the Honcho SDK to our
MemoryBackend interface so we can A/B test against LocalBackend.
"""
import json
import logging
import os
import time
from typing import Any, Dict, List, Optional
from agent.memory import MemoryBackend, MemoryEntry
logger = logging.getLogger(__name__)
class HonchoBackend(MemoryBackend):
"""Honcho AI-native memory backend.
Wraps the honcho-ai SDK to provide cross-session user modeling
with dialectic context queries.
"""
def __init__(self):
self._client = None
self._api_key = os.getenv("HONCHO_API_KEY", "")
self._app_id = os.getenv("HONCHO_APP_ID", "hermes-agent")
self._base_url = os.getenv("HONCHO_BASE_URL", "https://api.honcho.dev")
def _get_client(self):
"""Lazy-load Honcho client."""
if self._client is not None:
return self._client
if not self._api_key:
return None
try:
from honcho import Honcho
self._client = Honcho(
api_key=self._api_key,
app_id=self._app_id,
base_url=self._base_url,
)
return self._client
except ImportError:
logger.warning("honcho-ai not installed. Install with: pip install honcho-ai")
return None
except Exception as e:
logger.warning("Failed to initialize Honcho client: %s", e)
return None
def is_available(self) -> bool:
if not self._api_key:
return False
client = self._get_client()
if client is None:
return False
# Try a simple API call to verify connectivity
try:
# Honcho uses sessions — verify we can list them
client.get_sessions(limit=1)
return True
except Exception as e:
logger.debug("Honcho not available: %s", e)
return False
def store(self, user_id: str, key: str, value: str, metadata: Dict = None) -> bool:
client = self._get_client()
if client is None:
return False
try:
# Honcho stores messages in sessions
# We create a synthetic message to store the preference
session_id = f"hermes-prefs-{user_id}"
message_content = json.dumps({
"type": "preference",
"key": key,
"value": value,
"metadata": metadata or {},
"timestamp": time.time(),
})
client.add_message(
session_id=session_id,
role="system",
content=message_content,
)
return True
except Exception as e:
logger.warning("Honcho store failed: %s", e)
return False
def retrieve(self, user_id: str, key: str) -> Optional[MemoryEntry]:
# Honcho doesn't have direct key-value retrieval
# We query for the key and return the latest match
results = self.query(user_id, key, limit=1)
for entry in results:
if entry.key == key:
return entry
return None
def query(self, user_id: str, query_text: str, limit: int = 10) -> List[MemoryEntry]:
client = self._get_client()
if client is None:
return []
try:
session_id = f"hermes-prefs-{user_id}"
# Use Honcho's dialectic query
result = client.chat(
session_id=session_id,
message=f"Find preferences related to: {query_text}",
)
# Parse the response into memory entries
entries = []
if isinstance(result, dict):
content = result.get("content", "")
try:
data = json.loads(content)
if isinstance(data, list):
for item in data[:limit]:
entries.append(MemoryEntry(
key=item.get("key", ""),
value=item.get("value", ""),
user_id=user_id,
metadata=item.get("metadata", {}),
))
elif isinstance(data, dict) and data.get("key"):
entries.append(MemoryEntry(
key=data.get("key", ""),
value=data.get("value", ""),
user_id=user_id,
metadata=data.get("metadata", {}),
))
except json.JSONDecodeError:
pass
return entries
except Exception as e:
logger.warning("Honcho query failed: %s", e)
return []
def list_keys(self, user_id: str) -> List[str]:
# Query all and extract keys
results = self.query(user_id, "", limit=100)
return list(dict.fromkeys(e.key for e in results if e.key))
def delete(self, user_id: str, key: str) -> bool:
# Honcho doesn't support deletion of individual entries
# This is a limitation of the cloud backend
logger.info("Honcho does not support individual entry deletion")
return False
@property
def backend_name(self) -> str:
return "honcho (cloud)"
@property
def is_cloud(self) -> bool:
return True

View File

@@ -1,156 +0,0 @@
"""Local SQLite memory backend.
Zero cloud dependency. Stores user preferences and patterns in a
local SQLite database at ~/.hermes/memory.db.
Provides basic key-value storage with simple text search.
No external dependencies beyond Python stdlib.
"""
import json
import logging
import sqlite3
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
from hermes_constants import get_hermes_home
from agent.memory import MemoryBackend, MemoryEntry
logger = logging.getLogger(__name__)
class LocalBackend(MemoryBackend):
"""SQLite-backed local memory storage."""
def __init__(self, db_path: Path = None):
self._db_path = db_path or (get_hermes_home() / "memory.db")
self._init_db()
def _init_db(self):
"""Initialize the database schema."""
self._db_path.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(str(self._db_path)) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS memories (
user_id TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
metadata TEXT,
created_at REAL NOT NULL,
updated_at REAL NOT NULL,
PRIMARY KEY (user_id, key)
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_memories_user
ON memories(user_id)
""")
conn.commit()
def is_available(self) -> bool:
try:
with sqlite3.connect(str(self._db_path)) as conn:
conn.execute("SELECT 1")
return True
except Exception:
return False
def store(self, user_id: str, key: str, value: str, metadata: Dict = None) -> bool:
try:
now = time.time()
meta_json = json.dumps(metadata) if metadata else None
with sqlite3.connect(str(self._db_path)) as conn:
conn.execute("""
INSERT INTO memories (user_id, key, value, metadata, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(user_id, key) DO UPDATE SET
value = excluded.value,
metadata = excluded.metadata,
updated_at = excluded.updated_at
""", (user_id, key, value, meta_json, now, now))
conn.commit()
return True
except Exception as e:
logger.warning("Failed to store memory: %s", e)
return False
def retrieve(self, user_id: str, key: str) -> Optional[MemoryEntry]:
try:
with sqlite3.connect(str(self._db_path)) as conn:
row = conn.execute(
"SELECT key, value, user_id, created_at, updated_at, metadata "
"FROM memories WHERE user_id = ? AND key = ?",
(user_id, key),
).fetchone()
if not row:
return None
return MemoryEntry(
key=row[0],
value=row[1],
user_id=row[2],
created_at=row[3],
updated_at=row[4],
metadata=json.loads(row[5]) if row[5] else {},
)
except Exception as e:
logger.warning("Failed to retrieve memory: %s", e)
return None
def query(self, user_id: str, query_text: str, limit: int = 10) -> List[MemoryEntry]:
"""Simple LIKE-based search on keys and values."""
try:
pattern = f"%{query_text}%"
with sqlite3.connect(str(self._db_path)) as conn:
rows = conn.execute("""
SELECT key, value, user_id, created_at, updated_at, metadata
FROM memories
WHERE user_id = ? AND (key LIKE ? OR value LIKE ?)
ORDER BY updated_at DESC
LIMIT ?
""", (user_id, pattern, pattern, limit)).fetchall()
return [
MemoryEntry(
key=r[0],
value=r[1],
user_id=r[2],
created_at=r[3],
updated_at=r[4],
metadata=json.loads(r[5]) if r[5] else {},
)
for r in rows
]
except Exception as e:
logger.warning("Failed to query memories: %s", e)
return []
def list_keys(self, user_id: str) -> List[str]:
try:
with sqlite3.connect(str(self._db_path)) as conn:
rows = conn.execute(
"SELECT key FROM memories WHERE user_id = ? ORDER BY updated_at DESC",
(user_id,),
).fetchall()
return [r[0] for r in rows]
except Exception:
return []
def delete(self, user_id: str, key: str) -> bool:
try:
with sqlite3.connect(str(self._db_path)) as conn:
conn.execute(
"DELETE FROM memories WHERE user_id = ? AND key = ?",
(user_id, key),
)
conn.commit()
return True
except Exception:
return False
@property
def backend_name(self) -> str:
return "local (SQLite)"
@property
def is_cloud(self) -> bool:
return False

View File

@@ -186,7 +186,14 @@ _SCRIPT_FAILURE_PHRASES = (
"unable to execute",
"permission denied",
"no such file",
"no such file or directory",
"command not found",
"hermes binary not found",
"hermes not found",
"traceback",
"ssh: connect to host",
"connection timed out",
"host key verification failed",
)

275
cron/ssh_dispatch.py Normal file
View File

@@ -0,0 +1,275 @@
"""SSH dispatch utilities for VPS agent operations.
Provides validated SSH execution with proper failure detection.
Used by cron jobs that dispatch work to remote VPS agents.
Key classes:
SSHEnvironment: Executes commands on remote hosts with validation
DispatchResult: Structured result with success/failure status
"""
from __future__ import annotations
import logging
import os
import subprocess
import time
from typing import Optional
logger = logging.getLogger(__name__)
_SSH_TIMEOUT = int(os.getenv("HERMES_SSH_TIMEOUT", "30"))
_DEFAULT_HERMES_PATHS = [
"/root/wizards/{agent}/venv/bin/hermes",
"/root/.local/bin/hermes",
"/usr/local/bin/hermes",
"~/.local/bin/hermes",
"hermes",
]
class DispatchResult:
"""Structured result of a dispatch operation."""
__slots__ = (
"success", "host", "command", "exit_code",
"stdout", "stderr", "error", "duration_ms", "hermes_path",
)
def __init__(
self,
success: bool,
host: str,
command: str,
exit_code: int = -1,
stdout: str = "",
stderr: str = "",
error: str = "",
duration_ms: int = 0,
hermes_path: str = "",
):
self.success = success
self.host = host
self.command = command
self.exit_code = exit_code
self.stdout = stdout
self.stderr = stderr
self.error = error
self.duration_ms = duration_ms
self.hermes_path = hermes_path
def to_dict(self) -> dict:
return {
"success": self.success,
"host": self.host,
"exit_code": self.exit_code,
"error": self.error,
"duration_ms": self.duration_ms,
"hermes_path": self.hermes_path,
"stderr_tail": self.stderr[-200:] if self.stderr else "",
}
@property
def failure_reason(self) -> str:
if self.success:
return ""
if self.error:
return self.error
if "No such file" in self.stderr or "command not found" in self.stderr:
return f"Hermes binary not found on {self.host}"
if self.exit_code != 0:
return f"Remote command exited {self.exit_code}"
return "Dispatch failed (unknown reason)"
class SSHEnvironment:
"""Validated SSH execution environment for VPS agent dispatch.
Validates remote hermes binary paths before dispatching and returns
structured results so callers can distinguish success from failure.
"""
def __init__(
self,
host: str,
agent: str = "",
ssh_key: str = "",
ssh_port: int = 22,
timeout: int = _SSH_TIMEOUT,
hermes_path: str = "",
):
self.host = host
self.agent = agent
self.ssh_key = ssh_key
self.ssh_port = ssh_port
self.timeout = timeout
self.hermes_path = hermes_path
self._validated_path: str = ""
def _ssh_base_cmd(self) -> list[str]:
cmd = ["ssh", "-o", "StrictHostKeyChecking=accept-new"]
cmd.extend(["-o", "ConnectTimeout=10"])
cmd.extend(["-o", "BatchMode=yes"])
if self.ssh_key:
cmd.extend(["-i", self.ssh_key])
if self.ssh_port != 22:
cmd.extend(["-p", str(self.ssh_port)])
cmd.append(self.host)
return cmd
def _resolve_hermes_paths(self) -> list[str]:
if self.hermes_path:
return [self.hermes_path]
paths = []
for tmpl in _DEFAULT_HERMES_PATHS:
path = tmpl.format(agent=self.agent) if "{agent}" in tmpl else tmpl
paths.append(path)
return paths
def validate_remote_hermes_path(self) -> str:
"""Probe the remote host for a working hermes binary.
Returns the validated path on success, raises RuntimeError on failure.
Caches the result so validation is only done once per instance.
"""
if self._validated_path:
return self._validated_path
candidates = self._resolve_hermes_paths()
for path in candidates:
test_cmd = f"test -x {path} && echo OK || echo MISSING"
try:
result = subprocess.run(
self._ssh_base_cmd() + [test_cmd],
capture_output=True, text=True, timeout=self.timeout,
)
if result.returncode == 0 and "OK" in (result.stdout or ""):
logger.info("SSH %s: hermes validated at %s", self.host, path)
self._validated_path = path
return path
except subprocess.TimeoutExpired:
logger.warning("SSH %s: timeout probing %s", self.host, path)
continue
except Exception as exc:
logger.debug("SSH %s: probe %s failed: %s", self.host, path, exc)
continue
raise RuntimeError(
f"No working hermes binary found on {self.host}. "
f"Checked: {', '.join(candidates)}."
)
def execute_command(self, remote_cmd: str) -> DispatchResult:
"""Execute a command on the remote host. Returns DispatchResult."""
t0 = time.monotonic()
full_cmd = self._ssh_base_cmd() + [remote_cmd]
try:
result = subprocess.run(
full_cmd, capture_output=True, text=True, timeout=self.timeout,
)
elapsed = int((time.monotonic() - t0) * 1000)
stderr = (result.stderr or "").strip()
stdout = (result.stdout or "").strip()
if result.returncode != 0:
return DispatchResult(
success=False, host=self.host, command=remote_cmd,
exit_code=result.returncode, stdout=stdout, stderr=stderr,
error=stderr.split("\n")[0] if stderr else f"exit code {result.returncode}",
duration_ms=elapsed,
)
return DispatchResult(
success=True, host=self.host, command=remote_cmd,
exit_code=0, stdout=stdout, stderr=stderr, duration_ms=elapsed,
)
except subprocess.TimeoutExpired:
elapsed = int((time.monotonic() - t0) * 1000)
return DispatchResult(
success=False, host=self.host, command=remote_cmd,
error=f"SSH timed out after {self.timeout}s", duration_ms=elapsed,
)
except Exception as exc:
elapsed = int((time.monotonic() - t0) * 1000)
return DispatchResult(
success=False, host=self.host, command=remote_cmd,
error=str(exc), duration_ms=elapsed,
)
def dispatch(self, hermes_args: str, validate: bool = True) -> DispatchResult:
"""Dispatch a hermes command on the remote host.
Args:
hermes_args: Arguments to pass to hermes (e.g. "cron tick").
validate: If True, validate the hermes binary exists first.
Returns DispatchResult. Only success=True if command actually ran.
"""
if validate:
try:
hermes_path = self.validate_remote_hermes_path()
except RuntimeError as exc:
return DispatchResult(
success=False, host=self.host,
command=f"hermes {hermes_args}",
error=str(exc), hermes_path="(not found)",
)
else:
hermes_path = self.hermes_path or "hermes"
remote_cmd = f"{hermes_path} {hermes_args}"
result = self.execute_command(remote_cmd)
result.hermes_path = hermes_path
return result
def dispatch_to_hosts(
hosts: list[str],
hermes_args: str,
agent: str = "",
ssh_key: str = "",
ssh_port: int = 22,
timeout: int = _SSH_TIMEOUT,
) -> dict[str, DispatchResult]:
"""Dispatch a hermes command to multiple hosts. Returns host -> DispatchResult."""
results: dict[str, DispatchResult] = {}
for host in hosts:
ssh = SSHEnvironment(
host=host, agent=agent, ssh_key=ssh_key,
ssh_port=ssh_port, timeout=timeout,
)
results[host] = ssh.dispatch(hermes_args)
logger.info(
"Dispatch %s: %s", host,
"OK" if results[host].success else results[host].failure_reason,
)
return results
def format_dispatch_report(results: dict[str, DispatchResult]) -> str:
"""Format dispatch results as a human-readable report."""
lines = []
ok = [r for r in results.values() if r.success]
failed = [r for r in results.values() if not r.success]
lines.append(f"Dispatch report: {len(ok)} OK, {len(failed)} failed")
lines.append("")
for host, result in results.items():
status = "OK" if result.success else "FAILED"
line = f" {host}: {status}"
if not result.success:
line += f" -- {result.failure_reason}"
if result.duration_ms:
line += f" ({result.duration_ms}ms)"
lines.append(line)
if failed:
lines.append("")
lines.append("Failed dispatches:")
for host, result in results.items():
if not result.success:
lines.append(f" {host}: {result.failure_reason}")
if result.stderr:
lines.append(f" stderr: {result.stderr[-150:]}")
return "\n".join(lines)

View File

@@ -1001,30 +1001,10 @@ class AIAgent:
self._session_db = session_db
self._parent_session_id = parent_session_id
self._last_flushed_db_idx = 0 # tracks DB-write cursor to prevent duplicate writes
if self._session_db:
try:
self._session_db.create_session(
session_id=self.session_id,
source=self.platform or os.environ.get("HERMES_SESSION_SOURCE", "cli"),
model=self.model,
model_config={
"max_iterations": self.max_iterations,
"reasoning_config": reasoning_config,
"max_tokens": max_tokens,
},
user_id=None,
parent_session_id=self._parent_session_id,
)
except Exception as e:
# Transient SQLite lock contention (e.g. CLI and gateway writing
# concurrently) must NOT permanently disable session_search for
# this agent. Keep _session_db alive — subsequent message
# flushes and session_search calls will still work once the
# lock clears. The session row may be missing from the index
# for this run, but that is recoverable (flushes upsert rows).
logger.warning(
"Session DB create_session failed (session_search still available): %s", e
)
# Lazy session creation: defer until first message flush (#314).
# _flush_messages_to_session_db() calls ensure_session() which uses
# INSERT OR IGNORE — creating the row only when messages arrive.
# This eliminates 32% of sessions that are created but never used.
# In-memory todo list for task planning (one per agent/session)
from tools.todo_tool import TodoStore

View File

@@ -1,205 +0,0 @@
"""Tests for memory backend system (#322)."""
import json
import time
from unittest.mock import MagicMock, patch
import pytest
from agent.memory import (
MemoryEntry,
NullBackend,
get_memory_backend,
reset_backend,
)
from agent.memory.local_backend import LocalBackend
@pytest.fixture()
def isolated_local_backend(tmp_path, monkeypatch):
"""Create a LocalBackend with temp DB."""
db_path = tmp_path / "test_memory.db"
return LocalBackend(db_path=db_path)
@pytest.fixture()
def reset_memory():
"""Reset the memory backend singleton."""
reset_backend()
yield
reset_backend()
# ---------------------------------------------------------------------------
# MemoryEntry
# ---------------------------------------------------------------------------
class TestMemoryEntry:
def test_creation(self):
entry = MemoryEntry(key="pref", value="python", user_id="u1")
assert entry.key == "pref"
assert entry.value == "python"
assert entry.created_at > 0
def test_defaults(self):
entry = MemoryEntry(key="k", value="v", user_id="u1")
assert entry.metadata == {}
assert entry.updated_at == entry.created_at
# ---------------------------------------------------------------------------
# NullBackend
# ---------------------------------------------------------------------------
class TestNullBackend:
def test_always_available(self):
backend = NullBackend()
assert backend.is_available() is True
def test_store_noop(self):
backend = NullBackend()
assert backend.store("u1", "k", "v") is True
def test_retrieve_returns_none(self):
backend = NullBackend()
assert backend.retrieve("u1", "k") is None
def test_query_returns_empty(self):
backend = NullBackend()
assert backend.query("u1", "test") == []
def test_not_cloud(self):
backend = NullBackend()
assert backend.is_cloud is False
# ---------------------------------------------------------------------------
# LocalBackend
# ---------------------------------------------------------------------------
class TestLocalBackend:
def test_available(self, isolated_local_backend):
assert isolated_local_backend.is_available() is True
def test_store_and_retrieve(self, isolated_local_backend):
assert isolated_local_backend.store("u1", "lang", "python")
entry = isolated_local_backend.retrieve("u1", "lang")
assert entry is not None
assert entry.value == "python"
assert entry.key == "lang"
def test_store_with_metadata(self, isolated_local_backend):
assert isolated_local_backend.store("u1", "k", "v", {"source": "test"})
entry = isolated_local_backend.retrieve("u1", "k")
assert entry.metadata == {"source": "test"}
def test_update_existing(self, isolated_local_backend):
isolated_local_backend.store("u1", "k", "v1")
isolated_local_backend.store("u1", "k", "v2")
entry = isolated_local_backend.retrieve("u1", "k")
assert entry.value == "v2"
def test_query(self, isolated_local_backend):
isolated_local_backend.store("u1", "pref_python", "True")
isolated_local_backend.store("u1", "pref_editor", "vim")
isolated_local_backend.store("u1", "theme", "dark")
results = isolated_local_backend.query("u1", "pref")
assert len(results) == 2
keys = {r.key for r in results}
assert "pref_python" in keys
assert "pref_editor" in keys
def test_list_keys(self, isolated_local_backend):
isolated_local_backend.store("u1", "a", "1")
isolated_local_backend.store("u1", "b", "2")
keys = isolated_local_backend.list_keys("u1")
assert set(keys) == {"a", "b"}
def test_delete(self, isolated_local_backend):
isolated_local_backend.store("u1", "k", "v")
assert isolated_local_backend.delete("u1", "k")
assert isolated_local_backend.retrieve("u1", "k") is None
def test_retrieve_nonexistent(self, isolated_local_backend):
assert isolated_local_backend.retrieve("u1", "nope") is None
def test_not_cloud(self, isolated_local_backend):
assert isolated_local_backend.is_cloud is False
def test_separate_users(self, isolated_local_backend):
isolated_local_backend.store("u1", "k", "user1_value")
isolated_local_backend.store("u2", "k", "user2_value")
assert isolated_local_backend.retrieve("u1", "k").value == "user1_value"
assert isolated_local_backend.retrieve("u2", "k").value == "user2_value"
# ---------------------------------------------------------------------------
# Singleton
# ---------------------------------------------------------------------------
class TestSingleton:
def test_default_is_null(self, reset_memory, monkeypatch):
monkeypatch.delenv("HERMES_MEMORY_BACKEND", raising=False)
monkeypatch.delenv("HONCHO_API_KEY", raising=False)
backend = get_memory_backend()
assert isinstance(backend, NullBackend)
def test_local_when_configured(self, reset_memory, monkeypatch):
monkeypatch.setenv("HERMES_MEMORY_BACKEND", "local")
backend = get_memory_backend()
assert isinstance(backend, LocalBackend)
def test_caches_instance(self, reset_memory, monkeypatch):
monkeypatch.setenv("HERMES_MEMORY_BACKEND", "local")
b1 = get_memory_backend()
b2 = get_memory_backend()
assert b1 is b2
# ---------------------------------------------------------------------------
# HonchoBackend (mocked)
# ---------------------------------------------------------------------------
class TestHonchoBackend:
def test_not_available_without_key(self, monkeypatch):
monkeypatch.delenv("HONCHO_API_KEY", raising=False)
from agent.memory.honcho_backend import HonchoBackend
backend = HonchoBackend()
assert backend.is_available() is False
def test_is_cloud(self):
from agent.memory.honcho_backend import HonchoBackend
backend = HonchoBackend()
assert backend.is_cloud is True
# ---------------------------------------------------------------------------
# Evaluation framework
# ---------------------------------------------------------------------------
class TestEvaluation:
def test_evaluate_null_backend(self):
from agent.memory.evaluation import evaluate_backend
result = evaluate_backend(NullBackend())
assert result.backend_name == "null (disabled)"
assert result.available is True
assert result.score > 0
assert result.is_cloud is False
def test_evaluate_local_backend(self, isolated_local_backend):
from agent.memory.evaluation import evaluate_backend
result = evaluate_backend(isolated_local_backend)
assert result.backend_name == "local (SQLite)"
assert result.available is True
assert result.store_success is True
assert result.retrieve_success is True
assert result.score >= 80 # local should score well
def test_evaluate_backends_returns_report(self, reset_memory, monkeypatch):
monkeypatch.setenv("HERMES_MEMORY_BACKEND", "local")
from agent.memory.evaluation import evaluate_backends
report = evaluate_backends()
assert "backends_evaluated" in report
assert report["backends_evaluated"] >= 2 # null + local
assert "recommendation" in report

View File

@@ -1,165 +0,0 @@
"""Memory Backend Tool — manage cross-session memory backends.
Provides store/retrieve/query/evaluate/list actions for the
pluggable memory backend system.
"""
import json
import logging
from typing import Optional
from tools.registry import registry
logger = logging.getLogger(__name__)
def memory_backend(
action: str,
user_id: str = "default",
key: str = None,
value: str = None,
query_text: str = None,
metadata: dict = None,
) -> str:
"""Manage cross-session memory backends.
Actions:
store — store a user preference/pattern
retrieve — retrieve a specific memory by key
query — search memories by text
list — list all keys for a user
delete — delete a memory entry
info — show current backend info
evaluate — run evaluation framework comparing backends
"""
from agent.memory import get_memory_backend
backend = get_memory_backend()
if action == "info":
return json.dumps({
"success": True,
"backend": backend.backend_name,
"is_cloud": backend.is_cloud,
"available": backend.is_available(),
})
if action == "store":
if not key or value is None:
return json.dumps({"success": False, "error": "key and value are required for 'store'."})
success = backend.store(user_id, key, value, metadata)
return json.dumps({"success": success, "key": key})
if action == "retrieve":
if not key:
return json.dumps({"success": False, "error": "key is required for 'retrieve'."})
entry = backend.retrieve(user_id, key)
if entry is None:
return json.dumps({"success": False, "error": f"No memory found for key '{key}'."})
return json.dumps({
"success": True,
"key": entry.key,
"value": entry.value,
"metadata": entry.metadata,
"updated_at": entry.updated_at,
})
if action == "query":
if not query_text:
return json.dumps({"success": False, "error": "query_text is required for 'query'."})
results = backend.query(user_id, query_text)
return json.dumps({
"success": True,
"results": [
{"key": e.key, "value": e.value, "metadata": e.metadata}
for e in results
],
"count": len(results),
})
if action == "list":
keys = backend.list_keys(user_id)
return json.dumps({"success": True, "keys": keys, "count": len(keys)})
if action == "delete":
if not key:
return json.dumps({"success": False, "error": "key is required for 'delete'."})
success = backend.delete(user_id, key)
return json.dumps({"success": success})
if action == "evaluate":
from agent.memory.evaluation import evaluate_backends
report = evaluate_backends()
return json.dumps({
"success": True,
**report,
})
return json.dumps({
"success": False,
"error": f"Unknown action '{action}'. Use: store, retrieve, query, list, delete, info, evaluate",
})
MEMORY_BACKEND_SCHEMA = {
"name": "memory_backend",
"description": (
"Manage cross-session memory backends for user preference persistence. "
"Pluggable architecture supports local SQLite (default, zero cloud dependency) "
"and optional Honcho cloud backend (requires HONCHO_API_KEY).\n\n"
"Actions:\n"
" store — store a user preference/pattern\n"
" retrieve — retrieve a specific memory by key\n"
" query — search memories by text\n"
" list — list all keys for a user\n"
" delete — delete a memory entry\n"
" info — show current backend info\n"
" evaluate — run evaluation framework comparing backends"
),
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["store", "retrieve", "query", "list", "delete", "info", "evaluate"],
"description": "The action to perform.",
},
"user_id": {
"type": "string",
"description": "User identifier for memory operations (default: 'default').",
},
"key": {
"type": "string",
"description": "Memory key for store/retrieve/delete.",
},
"value": {
"type": "string",
"description": "Value to store.",
},
"query_text": {
"type": "string",
"description": "Search text for query action.",
},
"metadata": {
"type": "object",
"description": "Optional metadata dict for store.",
},
},
"required": ["action"],
},
}
registry.register(
name="memory_backend",
toolset="skills",
schema=MEMORY_BACKEND_SCHEMA,
handler=lambda args, **kw: memory_backend(
action=args.get("action", ""),
user_id=args.get("user_id", "default"),
key=args.get("key"),
value=args.get("value"),
query_text=args.get("query_text"),
metadata=args.get("metadata"),
),
emoji="🧠",
)