Compare commits

..

39 Commits

Author SHA1 Message Date
Alexander Whitestone
3563896f86 feat: pluggable memory backends — evaluate Honcho vs local (#322)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m1s
Research evaluation of Honcho memory integration from plastic-labs
fork. Builds a pluggable memory backend system that supports both
cloud (Honcho) and local (SQLite) implementations.

Architecture:
  agent/memory/__init__.py — MemoryBackend ABC, NullBackend, singleton
  agent/memory/local_backend.py — SQLite-backed local storage (default)
  agent/memory/honcho_backend.py — Honcho cloud backend (opt-in)
  agent/memory/evaluation.py — structured comparison framework

Key design decisions:
  - NullBackend default: zero overhead when disabled
  - LocalBackend: zero cloud dependency, stores in ~/.hermes/memory.db
  - HonchoBackend: opt-in via HONCHO_API_KEY, lazy-loaded
  - Evaluation framework scores latency, functionality, privacy

Evaluation scoring:
  - Availability (20pts), Functionality (40pts), Latency (20pts), Privacy (20pts)
  - Local scores higher on privacy (20 vs 5) — sovereignty-first

RECOMMENDATION: LocalBackend for sovereignty. Honcho adds cloud
dependency without clear advantage over local SQLite for our use case.

25 tests, all passing.

Closes #322
2026-04-13 20:56:44 -04:00
8d0cad13c4 Merge pull request 'fix: watchdog config drift check uses YAML parse, not grep (#377)' (#398) from burn/377-1776117775 into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 28s
2026-04-14 00:34:14 +00:00
b9aca0a3b4 Merge pull request 'feat: time-aware model routing for cron jobs (#317)' (#432) from burn/317-1776125702 into main
Some checks failed
Forge CI / smoke-and-build (push) Has been cancelled
2026-04-14 00:34:06 +00:00
99d36533d5 Merge pull request 'feat: add /debug slash command with paste service upload (#320)' (#416) from burn/320-1776120221 into main
Some checks failed
Forge CI / smoke-and-build (push) Has been cancelled
2026-04-14 00:33:59 +00:00
b562a3d94c Merge pull request 'docs(#322): comprehensive Honcho evaluation — recommendation: KEEP' (#430) from burn/322-1776125702 into main
Some checks failed
Forge CI / smoke-and-build (push) Has been cancelled
2026-04-14 00:33:56 +00:00
37af40a38e Merge pull request 'feat: session garbage collection (#315)' (#383) from feat/315-session-gc into main
Some checks failed
Forge CI / smoke-and-build (push) Has been cancelled
2026-04-14 00:33:15 +00:00
5aa8581e2b Merge pull request 'fix: gateway config debt - validation, defaults, fallback chain checks (#328)' (#381) from fix/gateway-config-debt-328 into main
Some checks failed
Forge CI / smoke-and-build (push) Has been cancelled
2026-04-14 00:32:56 +00:00
b44255f21e Merge pull request 'cron: Comprehensive stale error state handling for recovered jobs (#349)' (#431) from burn/349-1776125702 into main
Some checks failed
Forge CI / smoke-and-build (push) Has been cancelled
2026-04-14 00:32:48 +00:00
6b41bafccd Merge pull request 'fix(cron): disable terminal toolset for cloud providers in cron jobs (#379)' (#436) from burn/379-1776125702 into main
Some checks failed
Forge CI / smoke-and-build (push) Has been cancelled
2026-04-14 00:32:45 +00:00
053fa3a2dd Merge pull request 'fix(cron): normalize model field types in deploy-crons.py' (#410) from burn/376-1776117777 into main
Some checks failed
Forge CI / smoke-and-build (push) Has been cancelled
2026-04-14 00:31:47 +00:00
cda29991e0 Merge pull request 'Fix #373: fallback_model blank fields no longer trigger gateway warnings' (#433) from burn/373-1776125702 into main
Some checks failed
Forge CI / smoke-and-build (push) Has been cancelled
2026-04-14 00:30:10 +00:00
57418dae07 fix(cron): disable terminal toolset for cloud providers in cron jobs (#379)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m16s
Cron jobs like nightwatch-health-monitor SSH into remote VPSes.
When the runtime provider is cloud (Nous, OpenRouter, Anthropic),
SSH keys don't exist on the inference server — causing silent
failures and wasted iterations.

Changes:
- cron/scheduler.py: Import is_local_endpoint from model_metadata.
  Build disabled_toolsets dynamically: append 'terminal' when the
  runtime base_url is NOT a local endpoint. Log when terminal is
  disabled for observability. Also warn when a job declares
  requires_local_infra=true but runs on cloud.
- tests/test_cron_cloud_terminal.py: 14 tests verifying
  is_local_endpoint classification and disabled_toolsets logic.

Behavior:
  Local (localhost/127/RFC-1918): terminal enabled, SSH works.
  Cloud (openrouter/nous/anthropic): terminal disabled, agent
  reports SSH unavailable instead of wasting iterations.

Closes #379
2026-04-13 20:20:41 -04:00
Alexander Whitestone
5989600d80 feat: time-aware model routing for cron jobs (#317)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m1s
Empirical audit: cron error rate peaks at 18:00 (9.4%) vs 4.0% at 09:00.
During configured high-error windows, automatically route cron jobs to
more capable models when the user is not present to correct errors.

- agent/smart_model_routing.py: resolve_cron_model() + _hour_in_window()
- cron/scheduler.py: wired into run_job() after base model resolution
- tests/test_cron_model_routing.py: 16 tests

Config:
  cron_model_routing:
    enabled: true
    fallback_model: "anthropic/claude-sonnet-4"
    fallback_provider: "openrouter"
    windows:
      - {start_hour: 17, end_hour: 22, reason: evening_error_peak}
      - {start_hour: 2, end_hour: 5, reason: overnight_api_instability}

Features: midnight-wrap, per-window overrides, first-match-wins,
graceful degradation on malformed config.

Closes #317
2026-04-13 20:19:37 -04:00
Timmy Time
1899878c27 Fix #373: fallback_model blank fields no longer trigger gateway warnings
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m1s
When users blank fallback_model fields or set enabled: false, the validation
and gateway now treat this as intentionally disabling fallback instead of
showing warnings.

Changes:
- hermes_cli/config.py: Skip warnings when both provider and model are blank
  or when enabled: false is set
- gateway/run.py: Return None for disabled fallback configs
- tests: Added 8 new tests for blank/disabled fallback scenarios

Behavior:
- Both fields blank: no warnings (intentional disable)
- enabled: false: no warnings (explicit disable)
- One field blank: warning shown (likely misconfiguration)
- Valid config: no warnings

Fixes #373
2026-04-13 20:19:21 -04:00
379769ca6d feat(cron): Show health status in job list
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m0s
Part of #349. Shows current vs. cleared errors, success history.
2026-04-14 00:19:11 +00:00
91bc02bc38 feat(cron): Add clear-error CLI subparser
Part of #349. Adds `hermes cron clear-error JOB_ID` command.
2026-04-14 00:18:52 +00:00
77265a31e1 feat(cron): Add clear-error CLI command
Part of #349. Adds `hermes cron clear-error JOB_ID` command.
2026-04-14 00:18:30 +00:00
Timmy
7a32df9ca3 docs(#322): comprehensive Honcho evaluation — recommendation: KEEP
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m9s
Decision: Cloud vs Local → BOTH (user's choice)
- Cloud: HONCHO_API_KEY from app.honcho.dev
- Self-hosted: HONCHO_BASE_URL=http://localhost:8000
- Disabled: No config = zero overhead

Integration is already production-ready:
- 3 components, ~700 lines of code
- 7 tests passing
- Async prefetch (zero-latency)
- Configurable recall modes
- Cron guard (inactive in cron context)

Recommendation: KEEP — provides unique cross-session user modeling
that complements local holographic fact_store.

Refs #322
2026-04-13 20:18:23 -04:00
cf36bd2ddf feat(cron): Add clear_error action and health timestamps
Part of #349. Adds clear_error action and includes health timestamps in job format.
2026-04-14 00:18:09 +00:00
0413fc1788 feat(cron): Comprehensive stale error state handling
- mark_job_run: track last_error_at, last_success_at, error_resolved_at
- trigger_job: clear stale error state when re-triggering
- clear_job_error: manual clearing of stale errors

Closes #349
2026-04-14 00:17:45 +00:00
5180c172fa Merge pull request 'feat: profile-tagged session isolation (#323)' (#422) from burn/323-1776120221 into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 43s
feat: profile-tagged session isolation (#323)

Closes #323.
2026-04-14 00:16:43 +00:00
Metatron
b62fa0ec13 feat: profile-tagged session isolation (closes #323)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 57s
Add profile column to sessions table for data-level profile isolation.
All session queries now accept an optional profile filter.

Changes:
- Schema v7: new 'profile' TEXT column + idx_sessions_profile index
- Migration v7: ALTER TABLE + CREATE INDEX on existing DBs
- create_session(): new profile parameter
- ensure_session(): new profile parameter
- list_sessions_rich(): profile filter (WHERE s.profile = ?)
- search_sessions(): profile filter
- session_count(): profile filter

Sessions without a profile (None) remain visible to all queries for
backward compatibility. When a profile is passed, only that profile's
sessions are returned.

Profile agents can no longer see each other's sessions when filtered.
No breaking changes to existing callers.
2026-04-13 18:53:45 -04:00
f1626a932c feat: add /debug command handler with paste service upload (#320)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m1s
2026-04-13 22:48:33 +00:00
d68ab4cff4 feat: add /debug slash command to command registry (#320) 2026-04-13 22:47:51 +00:00
3c66333c94 fix(cron): add deploy-crons.py to normalize model field types
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 48s
Fixes #376

Normalize model field in jobs.json to always be a dict when either
model or provider is specified, preventing schema inconsistency.
2026-04-13 22:24:31 +00:00
87867f3d10 fix: config drift check uses YAML parse not grep (#377)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 59s
2026-04-13 22:12:56 +00:00
Alexander Whitestone
69e10967bd feat: session garbage collection (#315)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 14s
Add garbage_collect() method to SessionDB that cleans up empty and
trivial sessions based on age:
- Empty sessions (0 messages) older than 24h
- Trivial sessions (1-5 messages) older than 7 days
- Sessions with >5 messages kept indefinitely

Add `hermes sessions gc` CLI command with:
- --empty-hours (default: 24)
- --trivial-days (default: 7)
- --trivial-max (default: 5)
- --source filter
- --dry-run preview mode
- --yes skip confirmation

The dry-run flow: preview what would be deleted, ask for confirmation,
then execute. Handles child session FK constraints properly.

7 tests covering: empty/trivial deletion, active session protection,
substantial session preservation, dry-run, source filtering, and child
session handling.

Closes #315
2026-04-13 17:30:39 -04:00
Alexander Whitestone
992498463e fix: gateway config debt - validation, defaults, fallback chain checks (#328)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m32s
- Expand validate_config_structure() to catch:
  - fallback_providers format errors (non-list, missing provider/model)
  - session_reset.idle_minutes <= 0 (causes immediate resets)
  - session_reset.at_hour out of 0-23 range
  - API_SERVER enabled without API_SERVER_KEY
  - Unknown root-level keys that look like misplaced custom_providers fields
- Add _validate_fallback_providers() in gateway/config.py to validate
  fallback chain at gateway startup (logs warnings for malformed entries)
- Add API_SERVER_KEY check in gateway config loader (warns on unauthenticated endpoint)
- Expand _KNOWN_ROOT_KEYS to include all valid top-level config sections
  (session_reset, browser, checkpoints, voice, stt, tts, etc.)
- Add 13 new tests for fallback_providers and session_reset validation
- All existing tests pass (47/47)

Closes #328
2026-04-13 17:29:20 -04:00
1ec02cf061 Merge pull request 'fix(gateway): reject known-weak placeholder tokens at startup' (#371) from fix/weak-credential-guard into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 3m6s
2026-04-13 20:33:00 +00:00
Alexander Whitestone
1156875cb5 fix(gateway): reject known-weak placeholder tokens at startup
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 3m8s
Fixes #318

Cherry-picked concept from ferris fork (f724079).

Problem: Users who copy .env.example without changing values
get confusing auth failures at gateway startup.

Fix: _guard_weak_credentials() checks TELEGRAM_BOT_TOKEN,
DISCORD_BOT_TOKEN, SLACK_BOT_TOKEN, HASS_TOKEN against
known-weak placeholder patterns (your-token-here, fake, xxx,
etc.) and minimum length requirements. Warns at startup.

Tests: 6 tests (no tokens, placeholder, case-insensitive,
short token, valid pass-through, multiple weak). All pass.
2026-04-13 16:32:56 -04:00
f4c102400e Merge pull request 'feat(memory): enable temporal decay with access-recency boost — #241' (#367) from feat/temporal-decay-holographic-memory into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 31s
Merge PR #367: feat(memory): enable temporal decay with access-recency boost
2026-04-13 19:51:04 +00:00
6555ccabc1 Merge pull request 'fix(tools): validate handler return types at dispatch boundary' (#369) from fix/tool-return-type-validation into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 21s
2026-04-13 19:47:56 +00:00
Alexander Whitestone
8c712866c4 fix(tools): validate handler return types at dispatch boundary
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 22s
Fixes #297

Problem: Tool handlers that return dict/list/None instead of a
JSON string crash the agent loop with cryptic errors. No error
proofing at the boundary.
Fix: In handle_function_call(), after dispatch returns:
1. If result is not str → wrap in JSON with _type_warning
2. If result is str but not valid JSON → wrap in {"output": ...}
3. Log type violations for analysis
4. Valid JSON strings pass through unchanged

Tests: 4 new tests (dict, None, non-JSON string, valid JSON).
All 16 tests in test_model_tools.py pass.
2026-04-13 15:47:52 -04:00
8fb59aae64 Merge pull request 'fix(tools): memory no-match is success, not error' (#368) from fix/memory-no-match-not-error into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 22s
2026-04-13 19:41:08 +00:00
Alexander Whitestone
95bde9d3cb fix(tools): memory no-match is success, not error
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 24s
Fixes #313

Problem: MemoryStore.replace() and .remove() return
{"success": false, "error": "No entry matched..."} when the
search substring is not found. This is a valid outcome, not
an error. The empirical audit showed 58.4% error rate on the
memory tool, but 98.4% of those were just empty search results.

Fix: Return {"success": true, "result": "no_match", "message": ...}
instead. This drops the memory tool error rate from ~58% to ~1%.

Tests updated: test_replace_no_match and test_remove_no_match
now assert success=True with result="no_match".
All 33 memory tool tests pass.
2026-04-13 15:40:48 -04:00
Alexander Whitestone
aa6eabb816 feat(memory): enable temporal decay with access-recency boost
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 23s
The holographic retriever had temporal decay implemented but disabled
(half_life=0). All facts scored equally regardless of age — a 2-year-old
fact about a deprecated tool scored the same as yesterday's deployment
config.

This commit:
1. Changes default temporal_decay_half_life from 0 to 60 days
   - 60 days: facts lose half their relevance every 2 months
   - Configurable via config.yaml: plugins.hermes-memory-store.temporal_decay_half_life
   - Added to config schema so `hermes memory setup` exposes it

2. Adds access-recency boost to search scoring
   - Facts accessed within 1 half-life get up to 1.5x boost on their decay factor
   - Boost tapers linearly from 1.5 (just accessed) to 1.0 (1 half-life ago)
   - Capped at 1.0 effective score (boost can't exceed fresh-fact score)
   - Prevents actively-used facts from decaying prematurely

3. Scoring pipeline: score = relevance * trust * decay * min(1.0, access_boost)
   - Fresh facts: decay=1.0, boost≈1.5 → score unchanged
   - 60-day-old, recently accessed: decay=0.5, boost≈1.25 → score=0.625
   - 60-day-old, not accessed: decay=0.5, boost=1.0 → score=0.5
   - 120-day-old, not accessed: decay=0.25, boost=1.0 → score=0.25

23 tests covering:
- Temporal decay formula (fresh, 1HL, 2HL, 3HL, disabled, None, invalid, future)
- Access recency boost (just accessed, halfway, at HL, beyond HL, disabled, range)
- Integration (recently-accessed old fact > equally-old unaccessed fact)
- Default config verification (half_life=60, not 0)

Fixes #241
2026-04-13 15:38:12 -04:00
3b89bfbab2 fix(tools): ast.parse() preflight in execute_code — eliminates ~1,400 sandbox errors (#366)
Some checks failed
Forge CI / smoke-and-build (push) Failing after 23s
2026-04-13 19:26:06 +00:00
Alexander Whitestone
ec3cd2081b fix(poka-yoke): add tool fixation detection (#310)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 26s
Detect when the same tool is called 5+ times consecutively and inject
a nudge advising the agent to diversify its approach.

Evidence from empirical audit:
- Top marathon session (qwen, 1643 msgs): execute_code streak of 20
- Opus session (1472 msgs): terminal streak of 10

The nudge fires every 5 consecutive calls (5, 10, 15...) so it
persists without being spammy. Tracks independently in both
sequential and concurrent execution paths.
2026-04-13 10:16:11 -04:00
Alexander Whitestone
110642d86a fix(poka-yoke): add circuit breaker for error cascading (#309)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 28s
After 3 consecutive tool errors, inject a warning into the tool result
advising the agent to switch strategies. Escalates at 6 and 9+ errors.

Empirical data from audit:
- P(error | prev error) = 58.6% vs P(error | prev success) = 25.2%
- 2.33x cascade amplification factor
- Max observed streak: 31 consecutive errors

Intervention tiers:
- 3 errors: advisory warning (try different tool, use terminal, simplify)
- 6 errors: urgent stop (halt retries, investigate or switch)
- 9+ errors: terminal-only recovery path

Tracks errors in both sequential and concurrent execution paths.
2026-04-13 10:12:24 -04:00
32 changed files with 3577 additions and 74 deletions

171
agent/memory/__init__.py Normal file
View File

@@ -0,0 +1,171 @@
"""Memory Backend Interface — pluggable cross-session user modeling.
Provides a common interface for memory backends that persist user
preferences and patterns across sessions. Two implementations:
1. LocalBackend (default): SQLite-based, zero cloud dependency
2. HonchoBackend (opt-in): Honcho AI-native memory, requires API key
Both are zero-overhead when disabled — the interface returns empty
results and no writes occur.
Usage:
from agent.memory import get_memory_backend
backend = get_memory_backend() # returns configured backend
backend.store_preference("user", "prefers_python", "True")
context = backend.query_context("user", "What does this user prefer?")
"""
import json
import logging
import os
import sqlite3
import time
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
@dataclass
class MemoryEntry:
"""A single memory entry."""
key: str
value: str
user_id: str
created_at: float = 0
updated_at: float = 0
metadata: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self):
now = time.time()
if not self.created_at:
self.created_at = now
if not self.updated_at:
self.updated_at = now
class MemoryBackend(ABC):
"""Abstract interface for memory backends."""
@abstractmethod
def is_available(self) -> bool:
"""Check if this backend is configured and usable."""
@abstractmethod
def store(self, user_id: str, key: str, value: str, metadata: Dict = None) -> bool:
"""Store a memory entry."""
@abstractmethod
def retrieve(self, user_id: str, key: str) -> Optional[MemoryEntry]:
"""Retrieve a single memory entry."""
@abstractmethod
def query(self, user_id: str, query_text: str, limit: int = 10) -> List[MemoryEntry]:
"""Query memories relevant to a text query."""
@abstractmethod
def list_keys(self, user_id: str) -> List[str]:
"""List all keys for a user."""
@abstractmethod
def delete(self, user_id: str, key: str) -> bool:
"""Delete a memory entry."""
@property
@abstractmethod
def backend_name(self) -> str:
"""Human-readable backend name."""
@property
@abstractmethod
def is_cloud(self) -> bool:
"""Whether this backend requires cloud connectivity."""
class NullBackend(MemoryBackend):
"""No-op backend when memory is disabled. Zero overhead."""
def is_available(self) -> bool:
return True # always "available" as null
def store(self, user_id: str, key: str, value: str, metadata: Dict = None) -> bool:
return True # no-op
def retrieve(self, user_id: str, key: str) -> Optional[MemoryEntry]:
return None
def query(self, user_id: str, query_text: str, limit: int = 10) -> List[MemoryEntry]:
return []
def list_keys(self, user_id: str) -> List[str]:
return []
def delete(self, user_id: str, key: str) -> bool:
return True
@property
def backend_name(self) -> str:
return "null (disabled)"
@property
def is_cloud(self) -> bool:
return False
# ---------------------------------------------------------------------------
# Singleton
# ---------------------------------------------------------------------------
_backend: Optional[MemoryBackend] = None
def get_memory_backend() -> MemoryBackend:
"""Get the configured memory backend.
Priority:
1. If HONCHO_API_KEY is set and honcho-ai is installed -> HonchoBackend
2. If memory_backend config is 'local' -> LocalBackend
3. Default -> NullBackend (zero overhead)
"""
global _backend
if _backend is not None:
return _backend
# Check config
backend_type = os.getenv("HERMES_MEMORY_BACKEND", "").lower().strip()
if backend_type == "honcho" or os.getenv("HONCHO_API_KEY"):
try:
from agent.memory.honcho_backend import HonchoBackend
backend = HonchoBackend()
if backend.is_available():
_backend = backend
logger.info("Memory backend: Honcho (cloud)")
return _backend
except ImportError:
logger.debug("Honcho not installed, falling back")
if backend_type == "local":
try:
from agent.memory.local_backend import LocalBackend
_backend = LocalBackend()
logger.info("Memory backend: Local (SQLite)")
return _backend
except Exception as e:
logger.warning("Local backend failed: %s", e)
# Default: null (zero overhead)
_backend = NullBackend()
return _backend
def reset_backend():
"""Reset the singleton (for testing)."""
global _backend
_backend = None

263
agent/memory/evaluation.py Normal file
View File

@@ -0,0 +1,263 @@
"""Memory Backend Evaluation Framework.
Provides structured evaluation for comparing memory backends on:
1. Latency (store/retrieve/query operations)
2. Relevance (does query return useful results?)
3. Privacy (where is data stored?)
4. Reliability (availability, error handling)
5. Cost (API calls, cloud dependency)
Usage:
from agent.memory.evaluation import evaluate_backends
report = evaluate_backends()
"""
import json
import logging
import time
from dataclasses import dataclass, field, asdict
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
@dataclass
class BackendEvaluation:
"""Evaluation results for a single backend."""
backend_name: str
is_cloud: bool
available: bool
# Latency (milliseconds)
store_latency_ms: float = 0
retrieve_latency_ms: float = 0
query_latency_ms: float = 0
# Functionality
store_success: bool = False
retrieve_success: bool = False
query_returns_results: bool = False
query_result_count: int = 0
# Privacy
data_location: str = "unknown"
requires_api_key: bool = False
# Overall
score: float = 0 # 0-100
recommendation: str = ""
notes: List[str] = field(default_factory=list)
def _measure_latency(func, *args, **kwargs) -> tuple:
"""Measure function latency in milliseconds."""
start = time.perf_counter()
try:
result = func(*args, **kwargs)
elapsed = (time.perf_counter() - start) * 1000
return elapsed, result, None
except Exception as e:
elapsed = (time.perf_counter() - start) * 1000
return elapsed, None, e
def evaluate_backend(backend, test_user: str = "eval_user") -> BackendEvaluation:
"""Evaluate a single memory backend."""
from agent.memory import MemoryBackend
eval_result = BackendEvaluation(
backend_name=backend.backend_name,
is_cloud=backend.is_cloud,
available=backend.is_available(),
)
if not eval_result.available:
eval_result.notes.append("Backend not available")
eval_result.score = 0
eval_result.recommendation = "NOT AVAILABLE"
return eval_result
# Privacy assessment
if backend.is_cloud:
eval_result.data_location = "cloud (external)"
eval_result.requires_api_key = True
else:
eval_result.data_location = "local (~/.hermes/)"
# Test store
latency, success, err = _measure_latency(
backend.store,
test_user,
"eval_test_key",
"eval_test_value",
{"source": "evaluation"},
)
eval_result.store_latency_ms = latency
eval_result.store_success = success is True
if err:
eval_result.notes.append(f"Store error: {err}")
# Test retrieve
latency, result, err = _measure_latency(
backend.retrieve,
test_user,
"eval_test_key",
)
eval_result.retrieve_latency_ms = latency
eval_result.retrieve_success = result is not None
if err:
eval_result.notes.append(f"Retrieve error: {err}")
# Test query
latency, results, err = _measure_latency(
backend.query,
test_user,
"eval_test",
5,
)
eval_result.query_latency_ms = latency
eval_result.query_returns_results = bool(results)
eval_result.query_result_count = len(results) if results else 0
if err:
eval_result.notes.append(f"Query error: {err}")
# Cleanup
try:
backend.delete(test_user, "eval_test_key")
except Exception:
pass
# Score calculation (0-100)
score = 0
# Availability (20 points)
score += 20
# Functionality (40 points)
if eval_result.store_success:
score += 15
if eval_result.retrieve_success:
score += 15
if eval_result.query_returns_results:
score += 10
# Latency (20 points) — lower is better
avg_latency = (
eval_result.store_latency_ms +
eval_result.retrieve_latency_ms +
eval_result.query_latency_ms
) / 3
if avg_latency < 10:
score += 20
elif avg_latency < 50:
score += 15
elif avg_latency < 200:
score += 10
else:
score += 5
# Privacy (20 points) — local is better for sovereignty
if not backend.is_cloud:
score += 20
else:
score += 5 # cloud has privacy trade-offs
eval_result.score = score
# Recommendation
if score >= 80:
eval_result.recommendation = "RECOMMENDED"
elif score >= 60:
eval_result.recommendation = "ACCEPTABLE"
elif score >= 40:
eval_result.recommendation = "MARGINAL"
else:
eval_result.recommendation = "NOT RECOMMENDED"
return eval_result
def evaluate_backends() -> Dict[str, Any]:
"""Evaluate all available memory backends.
Returns a comparison report.
"""
from agent.memory import NullBackend
from agent.memory.local_backend import LocalBackend
backends = []
# Always evaluate Null (baseline)
backends.append(NullBackend())
# Evaluate Local
try:
backends.append(LocalBackend())
except Exception as e:
logger.warning("Local backend init failed: %s", e)
# Try Honcho if configured
import os
if os.getenv("HONCHO_API_KEY"):
try:
from agent.memory.honcho_backend import HonchoBackend
backends.append(HonchoBackend())
except ImportError:
logger.debug("Honcho not installed, skipping evaluation")
evaluations = []
for backend in backends:
try:
evaluations.append(evaluate_backend(backend))
except Exception as e:
logger.warning("Evaluation failed for %s: %s", backend.backend_name, e)
# Build report
report = {
"timestamp": time.time(),
"backends_evaluated": len(evaluations),
"evaluations": [asdict(e) for e in evaluations],
"recommendation": _build_recommendation(evaluations),
}
return report
def _build_recommendation(evaluations: List[BackendEvaluation]) -> str:
"""Build overall recommendation from evaluations."""
if not evaluations:
return "No backends evaluated"
# Find best non-null backend
viable = [e for e in evaluations if e.backend_name != "null (disabled)" and e.available]
if not viable:
return "No viable backends found. Use NullBackend (default)."
best = max(viable, key=lambda e: e.score)
parts = [f"Best backend: {best.backend_name} (score: {best.score})"]
if best.is_cloud:
parts.append(
"WARNING: Cloud backend has privacy trade-offs. "
"Data leaves your machine. Consider LocalBackend for sovereignty."
)
# Compare local vs cloud if both available
local = [e for e in viable if not e.is_cloud]
cloud = [e for e in viable if e.is_cloud]
if local and cloud:
local_score = max(e.score for e in local)
cloud_score = max(e.score for e in cloud)
if local_score >= cloud_score:
parts.append(
f"Local backend (score {local_score}) matches or beats "
f"cloud (score {cloud_score}). RECOMMEND: stay local for sovereignty."
)
else:
parts.append(
f"Cloud backend (score {cloud_score}) outperforms "
f"local (score {local_score}) but adds cloud dependency."
)
return " ".join(parts)

View File

@@ -0,0 +1,171 @@
"""Honcho memory backend — opt-in cloud-based user modeling.
Requires:
- pip install honcho-ai
- HONCHO_API_KEY environment variable (from app.honcho.dev)
Provides dialectic user context queries via Honcho's AI-native memory.
Zero runtime overhead when not configured — get_memory_backend() falls
back to LocalBackend or NullBackend if this fails to initialize.
This is the evaluation wrapper. It adapts the Honcho SDK to our
MemoryBackend interface so we can A/B test against LocalBackend.
"""
import json
import logging
import os
import time
from typing import Any, Dict, List, Optional
from agent.memory import MemoryBackend, MemoryEntry
logger = logging.getLogger(__name__)
class HonchoBackend(MemoryBackend):
"""Honcho AI-native memory backend.
Wraps the honcho-ai SDK to provide cross-session user modeling
with dialectic context queries.
"""
def __init__(self):
self._client = None
self._api_key = os.getenv("HONCHO_API_KEY", "")
self._app_id = os.getenv("HONCHO_APP_ID", "hermes-agent")
self._base_url = os.getenv("HONCHO_BASE_URL", "https://api.honcho.dev")
def _get_client(self):
"""Lazy-load Honcho client."""
if self._client is not None:
return self._client
if not self._api_key:
return None
try:
from honcho import Honcho
self._client = Honcho(
api_key=self._api_key,
app_id=self._app_id,
base_url=self._base_url,
)
return self._client
except ImportError:
logger.warning("honcho-ai not installed. Install with: pip install honcho-ai")
return None
except Exception as e:
logger.warning("Failed to initialize Honcho client: %s", e)
return None
def is_available(self) -> bool:
if not self._api_key:
return False
client = self._get_client()
if client is None:
return False
# Try a simple API call to verify connectivity
try:
# Honcho uses sessions — verify we can list them
client.get_sessions(limit=1)
return True
except Exception as e:
logger.debug("Honcho not available: %s", e)
return False
def store(self, user_id: str, key: str, value: str, metadata: Dict = None) -> bool:
client = self._get_client()
if client is None:
return False
try:
# Honcho stores messages in sessions
# We create a synthetic message to store the preference
session_id = f"hermes-prefs-{user_id}"
message_content = json.dumps({
"type": "preference",
"key": key,
"value": value,
"metadata": metadata or {},
"timestamp": time.time(),
})
client.add_message(
session_id=session_id,
role="system",
content=message_content,
)
return True
except Exception as e:
logger.warning("Honcho store failed: %s", e)
return False
def retrieve(self, user_id: str, key: str) -> Optional[MemoryEntry]:
# Honcho doesn't have direct key-value retrieval
# We query for the key and return the latest match
results = self.query(user_id, key, limit=1)
for entry in results:
if entry.key == key:
return entry
return None
def query(self, user_id: str, query_text: str, limit: int = 10) -> List[MemoryEntry]:
client = self._get_client()
if client is None:
return []
try:
session_id = f"hermes-prefs-{user_id}"
# Use Honcho's dialectic query
result = client.chat(
session_id=session_id,
message=f"Find preferences related to: {query_text}",
)
# Parse the response into memory entries
entries = []
if isinstance(result, dict):
content = result.get("content", "")
try:
data = json.loads(content)
if isinstance(data, list):
for item in data[:limit]:
entries.append(MemoryEntry(
key=item.get("key", ""),
value=item.get("value", ""),
user_id=user_id,
metadata=item.get("metadata", {}),
))
elif isinstance(data, dict) and data.get("key"):
entries.append(MemoryEntry(
key=data.get("key", ""),
value=data.get("value", ""),
user_id=user_id,
metadata=data.get("metadata", {}),
))
except json.JSONDecodeError:
pass
return entries
except Exception as e:
logger.warning("Honcho query failed: %s", e)
return []
def list_keys(self, user_id: str) -> List[str]:
# Query all and extract keys
results = self.query(user_id, "", limit=100)
return list(dict.fromkeys(e.key for e in results if e.key))
def delete(self, user_id: str, key: str) -> bool:
# Honcho doesn't support deletion of individual entries
# This is a limitation of the cloud backend
logger.info("Honcho does not support individual entry deletion")
return False
@property
def backend_name(self) -> str:
return "honcho (cloud)"
@property
def is_cloud(self) -> bool:
return True

View File

@@ -0,0 +1,156 @@
"""Local SQLite memory backend.
Zero cloud dependency. Stores user preferences and patterns in a
local SQLite database at ~/.hermes/memory.db.
Provides basic key-value storage with simple text search.
No external dependencies beyond Python stdlib.
"""
import json
import logging
import sqlite3
import time
from pathlib import Path
from typing import Any, Dict, List, Optional
from hermes_constants import get_hermes_home
from agent.memory import MemoryBackend, MemoryEntry
logger = logging.getLogger(__name__)
class LocalBackend(MemoryBackend):
"""SQLite-backed local memory storage."""
def __init__(self, db_path: Path = None):
self._db_path = db_path or (get_hermes_home() / "memory.db")
self._init_db()
def _init_db(self):
"""Initialize the database schema."""
self._db_path.parent.mkdir(parents=True, exist_ok=True)
with sqlite3.connect(str(self._db_path)) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS memories (
user_id TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
metadata TEXT,
created_at REAL NOT NULL,
updated_at REAL NOT NULL,
PRIMARY KEY (user_id, key)
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_memories_user
ON memories(user_id)
""")
conn.commit()
def is_available(self) -> bool:
try:
with sqlite3.connect(str(self._db_path)) as conn:
conn.execute("SELECT 1")
return True
except Exception:
return False
def store(self, user_id: str, key: str, value: str, metadata: Dict = None) -> bool:
try:
now = time.time()
meta_json = json.dumps(metadata) if metadata else None
with sqlite3.connect(str(self._db_path)) as conn:
conn.execute("""
INSERT INTO memories (user_id, key, value, metadata, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(user_id, key) DO UPDATE SET
value = excluded.value,
metadata = excluded.metadata,
updated_at = excluded.updated_at
""", (user_id, key, value, meta_json, now, now))
conn.commit()
return True
except Exception as e:
logger.warning("Failed to store memory: %s", e)
return False
def retrieve(self, user_id: str, key: str) -> Optional[MemoryEntry]:
try:
with sqlite3.connect(str(self._db_path)) as conn:
row = conn.execute(
"SELECT key, value, user_id, created_at, updated_at, metadata "
"FROM memories WHERE user_id = ? AND key = ?",
(user_id, key),
).fetchone()
if not row:
return None
return MemoryEntry(
key=row[0],
value=row[1],
user_id=row[2],
created_at=row[3],
updated_at=row[4],
metadata=json.loads(row[5]) if row[5] else {},
)
except Exception as e:
logger.warning("Failed to retrieve memory: %s", e)
return None
def query(self, user_id: str, query_text: str, limit: int = 10) -> List[MemoryEntry]:
"""Simple LIKE-based search on keys and values."""
try:
pattern = f"%{query_text}%"
with sqlite3.connect(str(self._db_path)) as conn:
rows = conn.execute("""
SELECT key, value, user_id, created_at, updated_at, metadata
FROM memories
WHERE user_id = ? AND (key LIKE ? OR value LIKE ?)
ORDER BY updated_at DESC
LIMIT ?
""", (user_id, pattern, pattern, limit)).fetchall()
return [
MemoryEntry(
key=r[0],
value=r[1],
user_id=r[2],
created_at=r[3],
updated_at=r[4],
metadata=json.loads(r[5]) if r[5] else {},
)
for r in rows
]
except Exception as e:
logger.warning("Failed to query memories: %s", e)
return []
def list_keys(self, user_id: str) -> List[str]:
try:
with sqlite3.connect(str(self._db_path)) as conn:
rows = conn.execute(
"SELECT key FROM memories WHERE user_id = ? ORDER BY updated_at DESC",
(user_id,),
).fetchall()
return [r[0] for r in rows]
except Exception:
return []
def delete(self, user_id: str, key: str) -> bool:
try:
with sqlite3.connect(str(self._db_path)) as conn:
conn.execute(
"DELETE FROM memories WHERE user_id = ? AND key = ?",
(user_id, key),
)
conn.commit()
return True
except Exception:
return False
@property
def backend_name(self) -> str:
return "local (SQLite)"
@property
def is_cloud(self) -> bool:
return False

View File

@@ -1,10 +1,11 @@
"""Helpers for optional cheap-vs-strong model routing."""
"""Helpers for optional cheap-vs-strong and time-aware model routing."""
from __future__ import annotations
import os
import re
from typing import Any, Dict, Optional
from datetime import datetime
from typing import Any, Dict, List, Optional
from utils import is_truthy_value
@@ -192,3 +193,104 @@ def resolve_turn_route(user_message: str, routing_config: Optional[Dict[str, Any
tuple(runtime.get("args") or ()),
),
}
# =========================================================================
# Time-aware cron model routing
# =========================================================================
#
# Empirical finding: cron error rate peaks at 18:00 (9.4%) vs 4.0% at 09:00.
# During high-error windows, route cron jobs to more capable models.
#
# Config (config.yaml):
# cron_model_routing:
# enabled: true
# fallback_model: "anthropic/claude-sonnet-4"
# fallback_provider: "openrouter"
# windows:
# - start_hour: 17
# end_hour: 22
# reason: "evening_error_peak"
# - start_hour: 2
# end_hour: 5
# reason: "overnight_api_instability"
# =========================================================================
def _hour_in_window(hour: int, start: int, end: int) -> bool:
"""Check if hour falls in [start, end) window, handling midnight wrap."""
if start <= end:
return start <= hour < end
else:
# Wraps midnight: e.g., 22-06
return hour >= start or hour < end
def resolve_cron_model(
base_model: str,
routing_config: Optional[Dict[str, Any]],
now: Optional[datetime] = None,
) -> Dict[str, Any]:
"""Apply time-aware model override for cron jobs.
During configured high-error windows, returns a stronger model config.
Outside windows, returns the base model unchanged.
Args:
base_model: The model string already resolved (from job/config/env).
routing_config: The cron_model_routing dict from config.yaml.
now: Override current time (for testing). Defaults to datetime.now().
Returns:
Dict with keys: model, provider, overridden, reason.
- model: the effective model string to use
- provider: provider override (empty string = use default)
- overridden: True if time-based override was applied
- reason: why override was applied (empty string if not)
"""
cfg = routing_config or {}
if not _coerce_bool(cfg.get("enabled"), False):
return {"model": base_model, "provider": "", "overridden": False, "reason": ""}
windows = cfg.get("windows") or []
if not isinstance(windows, list) or not windows:
return {"model": base_model, "provider": "", "overridden": False, "reason": ""}
current = now or datetime.now()
current_hour = current.hour
matched_window = None
for window in windows:
if not isinstance(window, dict):
continue
start = _coerce_int(window.get("start_hour"), -1)
end = _coerce_int(window.get("end_hour"), -1)
if start < 0 or end < 0:
continue
if _hour_in_window(current_hour, start, end):
matched_window = window
break
if not matched_window:
return {"model": base_model, "provider": "", "overridden": False, "reason": ""}
# Window matched — use the override model from window or global fallback
override_model = str(matched_window.get("model") or "").strip()
override_provider = str(matched_window.get("provider") or "").strip()
if not override_model:
override_model = str(cfg.get("fallback_model") or "").strip()
if not override_provider:
override_provider = str(cfg.get("fallback_provider") or "").strip()
if not override_model:
return {"model": base_model, "provider": "", "overridden": False, "reason": ""}
reason = str(matched_window.get("reason") or "time_window").strip()
return {
"model": override_model,
"provider": override_provider,
"overridden": True,
"reason": f"cron_routing:{reason}(hour={current_hour})",
}

192
cli.py
View File

@@ -3134,6 +3134,196 @@ class HermesCLI:
print(f" Home: {display}")
print()
def _handle_debug_command(self, command: str):
"""Generate a debug report with system info and logs, upload to paste service."""
import platform
import sys
import time as _time
# Parse optional lines argument
parts = command.split(maxsplit=1)
log_lines = 50
if len(parts) > 1:
try:
log_lines = min(int(parts[1]), 500)
except ValueError:
pass
_cprint(" Collecting debug info...")
# Collect system info
lines = []
lines.append("=== HERMES DEBUG REPORT ===")
lines.append(f"Generated: {_time.strftime('%Y-%m-%d %H:%M:%S %z')}")
lines.append("")
lines.append("--- System ---")
lines.append(f"Python: {sys.version}")
lines.append(f"Platform: {platform.platform()}")
lines.append(f"Architecture: {platform.machine()}")
lines.append(f"Hostname: {platform.node()}")
lines.append("")
# Hermes info
lines.append("--- Hermes ---")
try:
from hermes_constants import get_hermes_home, display_hermes_home
lines.append(f"Home: {display_hermes_home()}")
except Exception:
lines.append("Home: unknown")
try:
from hermes_constants import __version__
lines.append(f"Version: {__version__}")
except Exception:
lines.append("Version: unknown")
lines.append(f"Profile: {getattr(self, '_profile_name', 'default')}")
lines.append(f"Session: {self.session_id}")
lines.append(f"Model: {self.model}")
lines.append(f"Provider: {getattr(self, '_provider_name', 'unknown')}")
try:
lines.append(f"Working dir: {os.getcwd()}")
except Exception:
pass
# Config (redacted)
lines.append("")
lines.append("--- Config (redacted) ---")
try:
from hermes_constants import get_hermes_home
config_path = get_hermes_home() / "config.yaml"
if config_path.exists():
import yaml
with open(config_path) as f:
cfg = yaml.safe_load(f) or {}
# Redact secrets
for key in ("api_key", "token", "secret", "password"):
if key in cfg:
cfg[key] = "***REDACTED***"
lines.append(yaml.dump(cfg, default_flow_style=False)[:2000])
else:
lines.append("(no config file found)")
except Exception as e:
lines.append(f"(error reading config: {e})")
# Recent logs
lines.append("")
lines.append(f"--- Recent Logs (last {log_lines} lines) ---")
try:
from hermes_constants import get_hermes_home
log_dir = get_hermes_home() / "logs"
if log_dir.exists():
for log_file in sorted(log_dir.glob("*.log")):
try:
content = log_file.read_text(encoding="utf-8", errors="replace")
tail = content.strip().split("\n")[-log_lines:]
if tail:
lines.append(f"\n[{log_file.name}]")
lines.extend(tail)
except Exception:
pass
else:
lines.append("(no logs directory)")
except Exception:
lines.append("(error reading logs)")
# Tool info
lines.append("")
lines.append("--- Enabled Toolsets ---")
try:
lines.append(", ".join(self.enabled_toolsets) if self.enabled_toolsets else "(none)")
except Exception:
lines.append("(unknown)")
report = "\n".join(lines)
report_size = len(report)
# Try to upload to paste services
paste_url = None
services = [
("dpaste", _upload_dpaste),
("0x0.st", _upload_0x0st),
]
for name, uploader in services:
try:
url = uploader(report)
if url:
paste_url = url
break
except Exception:
continue
print()
if paste_url:
_cprint(f" Debug report uploaded: {paste_url}")
_cprint(f" Size: {report_size} bytes, {len(lines)} lines")
else:
# Fallback: save locally
try:
from hermes_constants import get_hermes_home
debug_path = get_hermes_home() / "debug-report.txt"
debug_path.write_text(report, encoding="utf-8")
_cprint(f" Paste services unavailable. Report saved to: {debug_path}")
_cprint(f" Size: {report_size} bytes, {len(lines)} lines")
except Exception as e:
_cprint(f" Failed to save report: {e}")
_cprint(f" Report ({report_size} bytes):")
print(report)
print()
def _upload_dpaste(content: str) -> str | None:
"""Upload content to dpaste.org. Returns URL or None."""
import urllib.request
import urllib.parse
data = urllib.parse.urlencode({
"content": content,
"syntax": "text",
"expiry_days": 7,
}).encode()
req = urllib.request.Request(
"https://dpaste.org/api/",
data=data,
headers={"User-Agent": "hermes-agent/debug"},
)
with urllib.request.urlopen(req, timeout=10) as resp:
url = resp.read().decode().strip()
if url.startswith("http"):
return url
return None
def _upload_0x0st(content: str) -> str | None:
"""Upload content to 0x0.st. Returns URL or None."""
import urllib.request
import io
# 0x0.st expects multipart form with a file field
boundary = "----HermesDebugBoundary"
body = (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="file"; filename="debug.txt"\r\n'
f"Content-Type: text/plain\r\n\r\n"
f"{content}\r\n"
f"--{boundary}--\r\n"
).encode()
req = urllib.request.Request(
"https://0x0.st",
data=body,
headers={
"Content-Type": f"multipart/form-data; boundary={boundary}",
"User-Agent": "hermes-agent/debug",
},
)
with urllib.request.urlopen(req, timeout=10) as resp:
url = resp.read().decode().strip()
if url.startswith("http"):
return url
return None
def show_config(self):
"""Display current configuration with kawaii ASCII art."""
# Get terminal config from environment (which was set from cli-config.yaml)
@@ -4321,6 +4511,8 @@ class HermesCLI:
self.show_help()
elif canonical == "profile":
self._handle_profile_command()
elif canonical == "debug":
self._handle_debug_command(cmd_original)
elif canonical == "tools":
self._handle_tools_command(cmd_original)
elif canonical == "toolsets":

View File

@@ -547,20 +547,30 @@ def resume_job(job_id: str) -> Optional[Dict[str, Any]]:
def trigger_job(job_id: str) -> Optional[Dict[str, Any]]:
"""Schedule a job to run on the next scheduler tick."""
"""Schedule a job to run on the next scheduler tick.
Clears stale error state when re-triggering a previously-failed job
so the stale failure doesn't persist until the next tick completes.
"""
job = get_job(job_id)
if not job:
return None
return update_job(
job_id,
{
"enabled": True,
"state": "scheduled",
"paused_at": None,
"paused_reason": None,
"next_run_at": _hermes_now().isoformat(),
},
)
updates = {
"enabled": True,
"state": "scheduled",
"paused_at": None,
"paused_reason": None,
"next_run_at": _hermes_now().isoformat(),
}
# Clear stale error state when re-triggering
if job.get("last_status") == "error":
updates["last_status"] = "retrying"
updates["last_error"] = None
updates["error_cleared_at"] = _hermes_now().isoformat()
return update_job(job_id, updates)
def run_job_now(job_id: str) -> Optional[Dict[str, Any]]:
@@ -618,6 +628,7 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
Updates last_run_at, last_status, increments completed count,
computes next_run_at, and auto-deletes if repeat limit reached.
Tracks health timestamps for error/success history.
"""
jobs = load_jobs()
for i, job in enumerate(jobs):
@@ -627,6 +638,18 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
job["last_status"] = "ok" if success else "error"
job["last_error"] = error if not success else None
# Track health timestamps
if success:
job["last_success_at"] = now
# Clear stale error tracking on success
if job.get("last_error_at"):
job["error_resolved_at"] = now
else:
job["last_error_at"] = now
# Clear resolved tracking on new error
if job.get("error_resolved_at"):
del job["error_resolved_at"]
# Increment completed count
if job.get("repeat"):
job["repeat"]["completed"] = job["repeat"].get("completed", 0) + 1
@@ -656,6 +679,32 @@ def mark_job_run(job_id: str, success: bool, error: Optional[str] = None):
save_jobs(jobs)
def clear_job_error(job_id: str) -> Optional[Dict[str, Any]]:
"""
Clear stale error state for a job.
Resets last_status to 'ok', last_error to None, and
records when the error was cleared. Useful after auth
recovery when the job itself is healthy but stale error
state persists.
Returns:
Updated job dict, or None if not found.
"""
jobs = load_jobs()
for job in jobs:
if job["id"] == job_id:
job["last_status"] = "ok"
job["last_error"] = None
job["error_cleared_at"] = _hermes_now().isoformat()
save_jobs(jobs)
return job
save_jobs(jobs)
return None
def advance_next_run(job_id: str) -> bool:
"""Preemptively advance next_run_at for a recurring job before execution.

View File

@@ -37,6 +37,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent))
from hermes_constants import get_hermes_home
from hermes_cli.config import load_config
from hermes_time import now as _hermes_now
from agent.model_metadata import is_local_endpoint
logger = logging.getLogger(__name__)
@@ -717,6 +718,22 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
# Reasoning config from env or config.yaml
from hermes_constants import parse_reasoning_effort
# Time-aware cron model routing — override model during high-error windows
try:
from agent.smart_model_routing import resolve_cron_model
_cron_routing_cfg = (_cfg.get("cron_model_routing") or {})
_cron_route = resolve_cron_model(model, _cron_routing_cfg)
if _cron_route["overridden"]:
_original_model = model
model = _cron_route["model"]
logger.info(
"Job '%s': cron model override %s -> %s (%s)",
job_id, _original_model, model, _cron_route["reason"],
)
except Exception as _e:
logger.debug("Job '%s': cron model routing skipped: %s", job_id, _e)
effort = os.getenv("HERMES_REASONING_EFFORT", "")
if not effort:
effort = str(_cfg.get("agent", {}).get("reasoning_effort", "")).strip()
@@ -777,6 +794,29 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
},
)
# Build disabled toolsets — always exclude cronjob/messaging/clarify
# for cron sessions. When the runtime endpoint is cloud (not local),
# also disable terminal so the agent does not attempt SSH or shell
# commands that require local infrastructure (keys, filesystem).
# Jobs that declare requires_local_infra=true also get terminal
# disabled on cloud endpoints regardless of this check. #379
_cron_disabled = ["cronjob", "messaging", "clarify"]
_runtime_base_url = turn_route["runtime"].get("base_url", "")
_is_cloud = not is_local_endpoint(_runtime_base_url)
if _is_cloud:
_cron_disabled.append("terminal")
logger.info(
"Job '%s': cloud provider detected (%s), disabling terminal toolset",
job_name,
turn_route["runtime"].get("provider", "unknown"),
)
if job.get("requires_local_infra") and _is_cloud:
logger.warning(
"Job '%s': requires_local_infra=true but running on cloud provider — "
"terminal-dependent steps will fail gracefully",
job_name,
)
_agent_kwargs = _safe_agent_kwargs({
"model": turn_route["model"],
"api_key": turn_route["runtime"].get("api_key"),
@@ -784,7 +824,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
"provider": turn_route["runtime"].get("provider"),
"api_mode": turn_route["runtime"].get("api_mode"),
"acp_command": turn_route["runtime"].get("command"),
"acp_args": turn_route["runtime"].get("args"),
"acp_args": list(turn_route["runtime"].get("args") or []),
"max_iterations": max_iterations,
"reasoning_config": reasoning_config,
"prefill_messages": prefill_messages,
@@ -792,7 +832,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
"providers_ignored": pr.get("ignore"),
"providers_order": pr.get("order"),
"provider_sort": pr.get("sort"),
"disabled_toolsets": ["cronjob", "messaging", "clarify"],
"disabled_toolsets": _cron_disabled,
"tool_choice": "required",
"quiet_mode": True,
"skip_memory": True, # Cron system prompts would corrupt user representations

154
deploy-crons.py Normal file
View File

@@ -0,0 +1,154 @@
#!/usr/bin/env python3
"""
deploy-crons — normalize cron job schemas for consistent model field types.
This script ensures that the model field in jobs.json is always a dict when
either model or provider is specified, preventing schema inconsistency.
Usage:
python deploy-crons.py [--dry-run] [--jobs-file PATH]
"""
import argparse
import json
import sys
from pathlib import Path
from typing import Any, Dict, Optional
def normalize_job(job: Dict[str, Any]) -> Dict[str, Any]:
"""
Normalize a job dict to ensure consistent model field types.
Before normalization:
- If model AND provider: model = raw string, provider = raw string (inconsistent)
- If only model: model = raw string
- If only provider: provider = raw string at top level
After normalization:
- If model exists: model = {"model": "xxx"}
- If provider exists: model = {"provider": "yyy"}
- If both exist: model = {"model": "xxx", "provider": "yyy"}
- If neither: model = None
"""
job = dict(job) # Create a copy to avoid modifying the original
model = job.get("model")
provider = job.get("provider")
# Skip if already normalized (model is a dict)
if isinstance(model, dict):
return job
# Build normalized model dict
model_dict = {}
if model is not None and isinstance(model, str):
model_dict["model"] = model.strip()
if provider is not None and isinstance(provider, str):
model_dict["provider"] = provider.strip()
# Set model field
if model_dict:
job["model"] = model_dict
else:
job["model"] = None
# Remove top-level provider field if it was moved into model dict
if provider is not None and "provider" in model_dict:
# Keep provider field for backward compatibility but mark it as deprecated
# This allows existing code that reads job["provider"] to continue working
pass
return job
def normalize_jobs_file(jobs_file: Path, dry_run: bool = False) -> int:
"""
Normalize all jobs in a jobs.json file.
Returns the number of jobs that were modified.
"""
if not jobs_file.exists():
print(f"Error: Jobs file not found: {jobs_file}", file=sys.stderr)
return 1
try:
with open(jobs_file, 'r', encoding='utf-8') as f:
data = json.load(f)
except json.JSONDecodeError as e:
print(f"Error: Invalid JSON in {jobs_file}: {e}", file=sys.stderr)
return 1
jobs = data.get("jobs", [])
if not jobs:
print("No jobs found in file.")
return 0
modified_count = 0
for i, job in enumerate(jobs):
original_model = job.get("model")
original_provider = job.get("provider")
normalized_job = normalize_job(job)
# Check if anything changed
if (normalized_job.get("model") != original_model or
normalized_job.get("provider") != original_provider):
jobs[i] = normalized_job
modified_count += 1
job_id = job.get("id", "?")
job_name = job.get("name", "(unnamed)")
print(f"Normalized job {job_id} ({job_name}):")
print(f" model: {original_model!r} -> {normalized_job.get('model')!r}")
print(f" provider: {original_provider!r} -> {normalized_job.get('provider')!r}")
if modified_count == 0:
print("All jobs already have consistent model field types.")
return 0
if dry_run:
print(f"DRY RUN: Would normalize {modified_count} jobs.")
return 0
# Write back to file
data["jobs"] = jobs
try:
with open(jobs_file, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"Normalized {modified_count} jobs in {jobs_file}")
return 0
except Exception as e:
print(f"Error writing to {jobs_file}: {e}", file=sys.stderr)
return 1
def main():
parser = argparse.ArgumentParser(
description="Normalize cron job schemas for consistent model field types."
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Show what would be changed without modifying the file."
)
parser.add_argument(
"--jobs-file",
type=Path,
default=Path.home() / ".hermes" / "cron" / "jobs.json",
help="Path to jobs.json file (default: ~/.hermes/cron/jobs.json)"
)
args = parser.parse_args()
if args.dry_run:
print("DRY RUN MODE — no changes will be made.")
print()
return normalize_jobs_file(args.jobs_file, args.dry_run)
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,170 @@
# Honcho Memory Integration Evaluation (#322)
## Executive Summary
**Status:** Integration already implemented and production-ready.
**Recommendation:** KEEP — well-gated, zero overhead when disabled, supports self-hosted.
## Decision: Cloud vs Local
### The Question
"Do we want a cloud-dependent memory layer, or keep everything local?"
### Answer: BOTH — User's Choice
Honcho supports both deployment modes:
| Mode | Configuration | Data Location | Use Case |
|------|--------------|---------------|----------|
| Cloud | `HONCHO_API_KEY` | Honcho servers | Quick start, no infrastructure |
| Self-hosted | `HONCHO_BASE_URL=http://localhost:8000` | Your servers | Full sovereignty |
| Disabled | No config | N/A | Pure local (holographic fact_store only) |
### Why Keep It
1. **Opt-in Architecture**
- No Honcho config → zero overhead (cron guard, lazy init)
- Memory provider system allows switching between providers
- `hermes memory off` disables completely
2. **Zero Runtime Cost When Disabled**
```python
if not cfg.enabled or not (cfg.api_key or cfg.base_url):
return "" # No HTTP calls, no overhead
```
3. **Cross-Session User Modeling**
- Holographic fact_store lacks persistent user modeling
- Honcho provides: peer cards, dialectic Q&A, semantic search
- Complements (not replaces) local memory
4. **Self-Hosted Option**
- Set `HONCHO_BASE_URL=http://localhost:8000`
- Run Honcho server locally via Docker
- Full data sovereignty
5. **Production-Grade Implementation**
- 3 components, ~700 lines of code
- 7 tests passing
- Async prefetch (zero-latency context injection)
- Configurable recall modes (hybrid/context/tools)
- Write frequency control (async/turn/session/N-turns)
## Architecture
### Components (Already Implemented)
```
plugins/memory/honcho/
├── client.py # Config resolution (API key, base_url, profiles)
├── session.py # Session management, async prefetch, dialectic queries
├── __init__.py # MemoryProvider interface, 4 tool schemas
├── cli.py # CLI commands (setup, status, sessions, map, peer, mode)
├── plugin.yaml # Plugin metadata
└── README.md # Documentation
```
### Integration Points
1. **System Prompt**: Context injected on first turn (cached for prompt caching)
2. **Tool Registry**: 4 tools available when `recall_mode != "context"`
3. **Session End**: Messages flushed to Honcho
4. **Cron Guard**: Fully inactive in cron context
### Tools Available
| Tool | Cost | Speed | Purpose |
|------|------|-------|---------|
| `honcho_profile` | Free | Fast | Quick factual snapshot (peer card) |
| `honcho_search` | Free | Fast | Semantic search (raw excerpts) |
| `honcho_context` | Paid | Slow | Dialectic Q&A (synthesized answers) |
| `honcho_conclude` | Free | Fast | Save persistent facts about user |
## Configuration Guide
### Option 1: Cloud (Quick Start)
```bash
# Get API key from https://app.honcho.dev
export HONCHO_API_KEY="your-api-key"
hermes chat
```
### Option 2: Self-Hosted (Full Sovereignty)
```bash
# Run Honcho server locally
docker run -p 8000:8000 honcho/server
# Configure Hermes
export HONCHO_BASE_URL="http://localhost:8000"
hermes chat
```
### Option 3: CLI Setup
```bash
hermes honcho setup
```
### Option 4: Disabled (Pure Local)
```bash
# Don't set any Honcho config
hermes memory off # If previously enabled
hermes chat
```
## Memory Modes
| Mode | Context Injection | Tools | Cost | Use Case |
|------|------------------|-------|------|----------|
| hybrid | Yes | Yes | Medium | Default — auto-inject + on-demand |
| context | Yes | No | Low | Budget mode — auto-inject only |
| tools | No | Yes | Variable | Full control — agent decides |
## Risk Assessment
| Risk | Mitigation | Status |
|------|------------|--------|
| Cloud dependency | Self-hosted option available | ✅ |
| Cost from LLM calls | Recall mode "context" or "tools" reduces calls | ✅ |
| Data privacy | Self-hosted keeps data on your servers | ✅ |
| Performance overhead | Cron guard + lazy init + async prefetch | ✅ |
| Vendor lock-in | MemoryProvider interface allows swapping | ✅ |
## Comparison with Alternatives
| Feature | Honcho | Holographic | Mem0 | Hindsight |
|---------|--------|-------------|------|-----------|
| Cross-session modeling | ✅ | ❌ | ✅ | ✅ |
| Dialectic Q&A | ✅ | ❌ | ❌ | ❌ |
| Self-hosted | ✅ | N/A | ❌ | ❌ |
| Local-only option | ✅ | ✅ | ❌ | ✅ |
| Cost | Free/Paid | Free | Paid | Free |
## Conclusion
**Keep Honcho integration.** It provides unique cross-session user modeling capabilities that complement the local holographic fact_store. The integration is:
- Well-gated (opt-in, zero overhead when disabled)
- Flexible (cloud or self-hosted)
- Production-ready (7 tests passing, async prefetch, configurable)
- Non-exclusive (works alongside other memory providers)
### To Enable
```bash
# Cloud
hermes honcho setup
# Self-hosted
export HONCHO_BASE_URL="http://localhost:8000"
hermes chat
```
### To Disable
```bash
hermes memory off
```
---
*Evaluated by SANDALPHON — Cron/Ops lane*

View File

@@ -412,6 +412,52 @@ class GatewayConfig:
return self.unauthorized_dm_behavior
def _validate_fallback_providers() -> None:
"""Validate fallback_providers from config.yaml at gateway startup.
Checks that each entry has 'provider' and 'model' fields and logs
warnings for malformed entries. This catches broken fallback chains
before they silently degrade into no-fallback mode.
"""
try:
_home = get_hermes_home()
_config_path = _home / "config.yaml"
if not _config_path.exists():
return
import yaml
with open(_config_path, encoding="utf-8") as _f:
_cfg = yaml.safe_load(_f) or {}
fbp = _cfg.get("fallback_providers")
if not fbp:
return
if not isinstance(fbp, list):
logger.warning(
"fallback_providers should be a YAML list, got %s. "
"Fallback chain will be disabled.",
type(fbp).__name__,
)
return
for i, entry in enumerate(fbp):
if not isinstance(entry, dict):
logger.warning(
"fallback_providers[%d] is not a dict (got %s). Skipping entry.",
i, type(entry).__name__,
)
continue
if not entry.get("provider"):
logger.warning(
"fallback_providers[%d] missing 'provider' field. Skipping entry.",
i,
)
if not entry.get("model"):
logger.warning(
"fallback_providers[%d] missing 'model' field. Skipping entry.",
i,
)
except Exception:
pass # Non-fatal; validation is advisory
def load_gateway_config() -> GatewayConfig:
"""
Load gateway configuration from multiple sources.
@@ -645,9 +691,67 @@ def load_gateway_config() -> GatewayConfig:
platform.value, env_name,
)
# Warn about API Server enabled without a key (unauthenticated endpoint)
if Platform.API_SERVER in config.platforms:
api_cfg = config.platforms[Platform.API_SERVER]
if api_cfg.enabled and not api_cfg.extra.get("key"):
logger.warning(
"api_server is enabled but API_SERVER_KEY is not set. "
"The API endpoint will run unauthenticated. "
"Set API_SERVER_KEY in ~/.hermes/.env to secure it.",
)
# Validate fallback_providers structure from config.yaml
_validate_fallback_providers()
return config
# Known-weak placeholder tokens from .env.example, tutorials, etc.
_WEAK_TOKEN_PATTERNS = {
"your-token-here", "your_token_here", "your-token", "your_token",
"change-me", "change_me", "changeme",
"xxx", "xxxx", "xxxxx", "xxxxxxxx",
"test", "testing", "fake", "placeholder",
"replace-me", "replace_me", "replace this",
"insert-token-here", "put-your-token",
"bot-token", "bot_token",
"sk-xxxxxxxx", "sk-placeholder",
"BOT_TOKEN_HERE", "YOUR_BOT_TOKEN",
}
# Minimum token lengths by platform (tokens shorter than these are invalid)
_MIN_TOKEN_LENGTHS = {
"TELEGRAM_BOT_TOKEN": 30,
"DISCORD_BOT_TOKEN": 50,
"SLACK_BOT_TOKEN": 20,
"HASS_TOKEN": 20,
}
def _guard_weak_credentials() -> list[str]:
"""Check env vars for known-weak placeholder tokens.
Returns a list of warning messages for any weak credentials found.
"""
warnings = []
for env_var, min_len in _MIN_TOKEN_LENGTHS.items():
value = os.getenv(env_var, "").strip()
if not value:
continue
if value.lower() in _WEAK_TOKEN_PATTERNS:
warnings.append(
f"{env_var} is set to a placeholder value ('{value[:20]}'). "
f"Replace it with a real token."
)
elif len(value) < min_len:
warnings.append(
f"{env_var} is suspiciously short ({len(value)} chars, "
f"expected >{min_len}). May be truncated or invalid."
)
return warnings
def _apply_env_overrides(config: GatewayConfig) -> None:
"""Apply environment variable overrides to config."""
@@ -941,3 +1045,7 @@ def _apply_env_overrides(config: GatewayConfig) -> None:
config.default_reset_policy.at_hour = int(reset_hour)
except ValueError:
pass
# Guard against weak placeholder tokens from .env.example copies
for warning in _guard_weak_credentials():
logger.warning("Weak credential: %s", warning)

View File

@@ -1026,6 +1026,16 @@ class GatewayRunner:
cfg = _y.safe_load(_f) or {}
fb = cfg.get("fallback_providers") or cfg.get("fallback_model") or None
if fb:
# Treat empty dict / disabled fallback as "not configured"
if isinstance(fb, dict):
_enabled = fb.get("enabled")
if _enabled is False or (
isinstance(_enabled, str)
and _enabled.strip().lower() in ("false", "0", "no", "off")
):
return None
if not fb.get("provider") and not fb.get("model"):
return None
return fb
except Exception:
pass

View File

@@ -1338,6 +1338,11 @@ _KNOWN_ROOT_KEYS = {
"fallback_providers", "credential_pool_strategies", "toolsets",
"agent", "terminal", "display", "compression", "delegation",
"auxiliary", "custom_providers", "memory", "gateway",
"session_reset", "browser", "checkpoints", "smart_model_routing",
"voice", "stt", "tts", "human_delay", "security", "privacy",
"cron", "logging", "approvals", "command_allowlist", "quick_commands",
"personalities", "skills", "honcho", "timezone", "discord",
"whatsapp", "prefill_messages_file", "file_read_max_chars",
}
# Valid fields inside a custom_providers list entry
@@ -1421,6 +1426,7 @@ def validate_config_structure(config: Optional[Dict[str, Any]] = None) -> List["
))
# ── fallback_model must be a top-level dict with provider + model ────
# Blank or explicitly disabled fallback is intentional — skip validation.
fb = config.get("fallback_model")
if fb is not None:
if not isinstance(fb, dict):
@@ -1430,21 +1436,40 @@ def validate_config_structure(config: Optional[Dict[str, Any]] = None) -> List["
"Change to:\n"
" fallback_model:\n"
" provider: openrouter\n"
" model: anthropic/claude-sonnet-4",
" model: anthropic/claude-sonnet-4\n"
"Or disable with:\n"
" fallback_model:\n"
" enabled: false",
))
elif fb:
if not fb.get("provider"):
issues.append(ConfigIssue(
"warning",
"fallback_model is missing 'provider' field — fallback will be disabled",
"Add: provider: openrouter (or another provider)",
))
if not fb.get("model"):
issues.append(ConfigIssue(
"warning",
"fallback_model is missing 'model' field — fallback will be disabled",
"Add: model: anthropic/claude-sonnet-4 (or another model)",
))
# Skip warnings when fallback is explicitly disabled (enabled: false)
_enabled = fb.get("enabled")
if _enabled is False or (isinstance(_enabled, str) and _enabled.strip().lower() in ("false", "0", "no", "off")):
pass # intentionally disabled — no warnings
else:
# Check if both fields are blank (intentional disable)
provider = fb.get("provider")
model = fb.get("model")
provider_blank = not provider or (isinstance(provider, str) and not provider.strip())
model_blank = not model or (isinstance(model, str) and not model.strip())
# Only warn if at least one field is set (user might be trying to configure)
# If both are blank, treat as intentionally disabled
if not provider_blank or not model_blank:
if provider_blank:
issues.append(ConfigIssue(
"warning",
"fallback_model is missing 'provider' field — fallback will be disabled",
"Add: provider: openrouter (or another provider)\n"
"Or disable with: enabled: false",
))
if model_blank:
issues.append(ConfigIssue(
"warning",
"fallback_model is missing 'model' field — fallback will be disabled",
"Add: model: anthropic/claude-sonnet-4 (or another model)\n"
"Or disable with: enabled: false",
))
# ── Check for fallback_model accidentally nested inside custom_providers ──
if isinstance(cp, dict) and "fallback_model" not in config and "fallback_model" in (cp or {}):
@@ -1478,6 +1503,72 @@ def validate_config_structure(config: Optional[Dict[str, Any]] = None) -> List["
f"Move '{key}' under the appropriate section",
))
# ── fallback_providers must be a list of dicts with provider + model ─
fbp = config.get("fallback_providers")
if fbp is not None:
if not isinstance(fbp, list):
issues.append(ConfigIssue(
"error",
f"fallback_providers should be a YAML list, got {type(fbp).__name__}",
"Change to:\n"
" fallback_providers:\n"
" - provider: openrouter\n"
" model: google/gemini-3-flash-preview",
))
elif fbp:
for i, entry in enumerate(fbp):
if not isinstance(entry, dict):
issues.append(ConfigIssue(
"warning",
f"fallback_providers[{i}] is not a dict (got {type(entry).__name__})",
"Each entry needs at minimum: provider, model",
))
continue
if not entry.get("provider"):
issues.append(ConfigIssue(
"warning",
f"fallback_providers[{i}] is missing 'provider' field — this fallback will be skipped",
"Add: provider: openrouter (or another provider name)",
))
if not entry.get("model"):
issues.append(ConfigIssue(
"warning",
f"fallback_providers[{i}] is missing 'model' field — this fallback will be skipped",
"Add: model: google/gemini-3-flash-preview (or another model slug)",
))
# ── session_reset validation ─────────────────────────────────────────
session_reset = config.get("session_reset", {})
if isinstance(session_reset, dict):
idle_minutes = session_reset.get("idle_minutes")
if idle_minutes is not None:
if not isinstance(idle_minutes, (int, float)) or idle_minutes <= 0:
issues.append(ConfigIssue(
"warning",
f"session_reset.idle_minutes={idle_minutes} is invalid (must be a positive number)",
"Set to a positive integer, e.g. 1440 (24 hours). Using 0 causes immediate resets.",
))
at_hour = session_reset.get("at_hour")
if at_hour is not None:
if not isinstance(at_hour, (int, float)) or not (0 <= at_hour <= 23):
issues.append(ConfigIssue(
"warning",
f"session_reset.at_hour={at_hour} is invalid (must be 0-23)",
"Set to an hour between 0 and 23, e.g. 4 for 4am",
))
# ── API Server key check ─────────────────────────────────────────────
# If api_server is enabled via env, but no key is set, warn.
# This catches the "API_SERVER_KEY not configured" error from gateway logs.
api_server_enabled = os.getenv("API_SERVER_ENABLED", "").lower() in ("true", "1", "yes")
api_server_key = os.getenv("API_SERVER_KEY", "").strip()
if api_server_enabled and not api_server_key:
issues.append(ConfigIssue(
"warning",
"API_SERVER is enabled but API_SERVER_KEY is not set — the API server will run unauthenticated",
"Set API_SERVER_KEY in ~/.hermes/.env to secure the API endpoint",
))
return issues

View File

@@ -93,6 +93,39 @@ def cron_list(show_all: bool = False):
script = job.get("script")
if script:
print(f" Script: {script}")
# Show health status
last_status = job.get("last_status")
last_error = job.get("last_error")
last_error_at = job.get("last_error_at")
last_success_at = job.get("last_success_at")
error_cleared_at = job.get("error_cleared_at")
error_resolved_at = job.get("error_resolved_at")
if last_status == "error" and last_error:
if error_cleared_at or error_resolved_at:
# Error was cleared/resolved
cleared_time = error_cleared_at or error_resolved_at
print(color(f" Status: ok (error cleared)", Colors.GREEN))
print(color(f" Last error: {last_error[:80]}...", Colors.DIM))
print(color(f" Resolved: {cleared_time}", Colors.DIM))
else:
# Current error
print(color(f" Status: ERROR", Colors.RED))
print(color(f" Error: {last_error[:80]}...", Colors.RED))
if last_error_at:
print(color(f" Since: {last_error_at}", Colors.RED))
elif last_status == "retrying":
print(color(f" Status: retrying (error cleared)", Colors.YELLOW))
elif last_status == "ok":
if last_success_at:
print(color(f" Status: ok (last success: {last_success_at})", Colors.GREEN))
elif last_status:
print(f" Status: {last_status}")
# Show success history if available
if last_success_at and last_status != "error":
print(f" Last ok: {last_success_at}")
print()
from hermes_cli.gateway import find_gateway_pids
@@ -222,7 +255,18 @@ def cron_edit(args):
def _job_action(action: str, job_id: str, success_verb: str, now: bool = False) -> int:
if action == "run" and now:
if action == "clear_error":
result = _cron_api(action="clear_error", job_id=job_id)
if not result.get("success"):
print(color(f"Failed to clear error: {result.get('error', 'unknown error')}", Colors.RED))
return 1
job = result.get("job", {})
name = job.get("name", job_id)
print(color(f"Cleared stale error state for job '{name}'", Colors.GREEN))
if job.get("error_cleared_at"):
print(f" Cleared at: {job['error_cleared_at']}")
return 0
if action == "run" and now:
# Synchronous execution — run job immediately and show result
result = _cron_api(action="run_now", job_id=job_id)
if not result.get("success"):
@@ -292,9 +336,13 @@ def cron_command(args):
now = getattr(args, 'now', False)
return _job_action("run", args.job_id, "Triggered", now=now)
if subcmd == "clear-error":
return _job_action("clear_error", args.job_id, "Cleared")
if subcmd in {"remove", "rm", "delete"}:
return _job_action("remove", args.job_id, "Removed")
print(f"Unknown cron command: {subcmd}")
print("Usage: hermes cron [list|create|edit|pause|resume|run|remove|status|tick]")
print("Usage: hermes cron [list|create|edit|pause|resume|run|remove|clear-error|status|tick]")
sys.exit(1)

View File

@@ -4576,6 +4576,9 @@ For more help on a command:
cron_run.add_argument("job_id", help="Job ID to trigger")
cron_run.add_argument("--now", action="store_true", help="Execute immediately and wait for result (clears stale errors)")
cron_clear_error = cron_subparsers.add_parser("clear-error", help="Clear stale error state for a job")
cron_clear_error.add_argument("job_id", help="Job ID to clear error for")
cron_remove = cron_subparsers.add_parser("remove", aliases=["rm", "delete"], help="Remove a scheduled job")
cron_remove.add_argument("job_id", help="Job ID to remove")
@@ -5005,7 +5008,7 @@ For more help on a command:
# =========================================================================
sessions_parser = subparsers.add_parser(
"sessions",
help="Manage session history (list, rename, export, prune, delete)",
help="Manage session history (list, rename, export, prune, gc, delete)",
description="View and manage the SQLite session store"
)
sessions_subparsers = sessions_parser.add_subparsers(dest="sessions_action")
@@ -5028,6 +5031,14 @@ For more help on a command:
sessions_prune.add_argument("--source", help="Only prune sessions from this source")
sessions_prune.add_argument("--yes", "-y", action="store_true", help="Skip confirmation")
sessions_gc = sessions_subparsers.add_parser("gc", help="Garbage-collect empty/trivial sessions")
sessions_gc.add_argument("--empty-hours", type=int, default=24, help="Delete empty (0-msg) sessions older than N hours (default: 24)")
sessions_gc.add_argument("--trivial-days", type=int, default=7, help="Delete trivial (1-5 msg) sessions older than N days (default: 7)")
sessions_gc.add_argument("--trivial-max", type=int, default=5, help="Max messages to consider trivial (default: 5)")
sessions_gc.add_argument("--source", help="Only GC sessions from this source")
sessions_gc.add_argument("--dry-run", action="store_true", help="Show what would be deleted without deleting")
sessions_gc.add_argument("--yes", "-y", action="store_true", help="Skip confirmation")
sessions_stats = sessions_subparsers.add_parser("stats", help="Show session store statistics")
sessions_rename = sessions_subparsers.add_parser("rename", help="Set or change a session's title")
@@ -5197,6 +5208,49 @@ For more help on a command:
size_mb = os.path.getsize(db_path) / (1024 * 1024)
print(f"Database size: {size_mb:.1f} MB")
elif action == "gc":
dry_run = getattr(args, "dry_run", False)
if dry_run:
counts = db.garbage_collect(
empty_older_than_hours=args.empty_hours,
trivial_max_messages=args.trivial_max,
trivial_older_than_days=args.trivial_days,
source=args.source,
dry_run=True,
)
print(f"[dry-run] Would delete {counts['total']} session(s):")
print(f" Empty (0 msgs, >{args.empty_hours}h old): {counts['empty']}")
print(f" Trivial (<={args.trivial_max} msgs, >{args.trivial_days}d old): {counts['trivial']}")
else:
# Preview first
preview = db.garbage_collect(
empty_older_than_hours=args.empty_hours,
trivial_max_messages=args.trivial_max,
trivial_older_than_days=args.trivial_days,
source=args.source,
dry_run=True,
)
if preview["total"] == 0:
print("Nothing to collect.")
else:
if not args.yes:
if not _confirm_prompt(
f"Delete {preview['total']} session(s) "
f"({preview['empty']} empty, {preview['trivial']} trivial)? [y/N] "
):
print("Cancelled.")
return
counts = db.garbage_collect(
empty_older_than_hours=args.empty_hours,
trivial_max_messages=args.trivial_max,
trivial_older_than_days=args.trivial_days,
source=args.source,
dry_run=False,
)
print(f"Collected {counts['total']} session(s):")
print(f" Empty: {counts['empty']}")
print(f" Trivial: {counts['trivial']}")
else:
sessions_parser.print_help()

View File

@@ -32,7 +32,7 @@ T = TypeVar("T")
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
SCHEMA_VERSION = 6
SCHEMA_VERSION = 7
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
@@ -66,6 +66,7 @@ CREATE TABLE IF NOT EXISTS sessions (
cost_source TEXT,
pricing_version TEXT,
title TEXT,
profile TEXT,
FOREIGN KEY (parent_session_id) REFERENCES sessions(id)
);
@@ -86,6 +87,7 @@ CREATE TABLE IF NOT EXISTS messages (
);
CREATE INDEX IF NOT EXISTS idx_sessions_source ON sessions(source);
CREATE INDEX IF NOT EXISTS idx_sessions_profile ON sessions(profile);
CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id);
CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC);
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp);
@@ -330,6 +332,19 @@ class SessionDB:
except sqlite3.OperationalError:
pass # Column already exists
cursor.execute("UPDATE schema_version SET version = 6")
if current_version < 7:
# v7: add profile column to sessions for profile isolation (#323)
try:
cursor.execute('ALTER TABLE sessions ADD COLUMN "profile" TEXT')
except sqlite3.OperationalError:
pass # Column already exists
try:
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_sessions_profile ON sessions(profile)"
)
except sqlite3.OperationalError:
pass
cursor.execute("UPDATE schema_version SET version = 7")
# Unique title index — always ensure it exists (safe to run after migrations
# since the title column is guaranteed to exist at this point)
@@ -362,13 +377,19 @@ class SessionDB:
system_prompt: str = None,
user_id: str = None,
parent_session_id: str = None,
profile: str = None,
) -> str:
"""Create a new session record. Returns the session_id."""
"""Create a new session record. Returns the session_id.
Args:
profile: Profile name for session isolation. When set, sessions
are tagged so queries can filter by profile. (#323)
"""
def _do(conn):
conn.execute(
"""INSERT OR IGNORE INTO sessions (id, source, user_id, model, model_config,
system_prompt, parent_session_id, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
system_prompt, parent_session_id, profile, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
session_id,
source,
@@ -377,6 +398,7 @@ class SessionDB:
json.dumps(model_config) if model_config else None,
system_prompt,
parent_session_id,
profile,
time.time(),
),
)
@@ -505,19 +527,23 @@ class SessionDB:
session_id: str,
source: str = "unknown",
model: str = None,
profile: str = None,
) -> None:
"""Ensure a session row exists, creating it with minimal metadata if absent.
Used by _flush_messages_to_session_db to recover from a failed
create_session() call (e.g. transient SQLite lock at agent startup).
INSERT OR IGNORE is safe to call even when the row already exists.
Args:
profile: Profile name for session isolation. (#323)
"""
def _do(conn):
conn.execute(
"""INSERT OR IGNORE INTO sessions
(id, source, model, started_at)
VALUES (?, ?, ?, ?)""",
(session_id, source, model, time.time()),
(id, source, model, profile, started_at)
VALUES (?, ?, ?, ?, ?)""",
(session_id, source, model, profile, time.time()),
)
self._execute_write(_do)
@@ -788,6 +814,7 @@ class SessionDB:
limit: int = 20,
offset: int = 0,
include_children: bool = False,
profile: str = None,
) -> List[Dict[str, Any]]:
"""List sessions with preview (first user message) and last active timestamp.
@@ -799,6 +826,10 @@ class SessionDB:
By default, child sessions (subagent runs, compression continuations)
are excluded. Pass ``include_children=True`` to include them.
Args:
profile: Filter sessions to this profile name. Pass None to see all.
(#323)
"""
where_clauses = []
params = []
@@ -813,6 +844,9 @@ class SessionDB:
placeholders = ",".join("?" for _ in exclude_sources)
where_clauses.append(f"s.source NOT IN ({placeholders})")
params.extend(exclude_sources)
if profile:
where_clauses.append("s.profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
query = f"""
@@ -1158,34 +1192,52 @@ class SessionDB:
source: str = None,
limit: int = 20,
offset: int = 0,
profile: str = None,
) -> List[Dict[str, Any]]:
"""List sessions, optionally filtered by source."""
"""List sessions, optionally filtered by source and profile.
Args:
profile: Filter sessions to this profile name. Pass None to see all.
(#323)
"""
where_clauses = []
params = []
if source:
where_clauses.append("source = ?")
params.append(source)
if profile:
where_clauses.append("profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
query = f"SELECT * FROM sessions {where_sql} ORDER BY started_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
with self._lock:
if source:
cursor = self._conn.execute(
"SELECT * FROM sessions WHERE source = ? ORDER BY started_at DESC LIMIT ? OFFSET ?",
(source, limit, offset),
)
else:
cursor = self._conn.execute(
"SELECT * FROM sessions ORDER BY started_at DESC LIMIT ? OFFSET ?",
(limit, offset),
)
cursor = self._conn.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
# =========================================================================
# Utility
# =========================================================================
def session_count(self, source: str = None) -> int:
"""Count sessions, optionally filtered by source."""
def session_count(self, source: str = None, profile: str = None) -> int:
"""Count sessions, optionally filtered by source and profile.
Args:
profile: Filter to this profile name. Pass None to count all. (#323)
"""
where_clauses = []
params = []
if source:
where_clauses.append("source = ?")
params.append(source)
if profile:
where_clauses.append("profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
with self._lock:
if source:
cursor = self._conn.execute(
"SELECT COUNT(*) FROM sessions WHERE source = ?", (source,)
)
else:
cursor = self._conn.execute("SELECT COUNT(*) FROM sessions")
cursor = self._conn.execute(f"SELECT COUNT(*) FROM sessions {where_sql}", params)
return cursor.fetchone()[0]
def message_count(self, session_id: str = None) -> int:
@@ -1303,3 +1355,78 @@ class SessionDB:
return len(session_ids)
return self._execute_write(_do)
def garbage_collect(
self,
empty_older_than_hours: int = 24,
trivial_max_messages: int = 5,
trivial_older_than_days: int = 7,
source: str = None,
dry_run: bool = False,
) -> Dict[str, int]:
"""Delete empty and trivial sessions based on age.
Policy (matches #315):
- Empty sessions (0 messages) older than ``empty_older_than_hours``
- Trivial sessions (1..``trivial_max_messages`` msgs) older than
``trivial_older_than_days``
- Sessions with more than ``trivial_max_messages`` are kept indefinitely
- Active (not ended) sessions are never deleted
Returns a dict with counts: ``empty``, ``trivial``, ``total``.
"""
now = time.time()
empty_cutoff = now - (empty_older_than_hours * 3600)
trivial_cutoff = now - (trivial_older_than_days * 86400)
def _do(conn):
# --- Find empty sessions ---
empty_q = (
"SELECT id FROM sessions "
"WHERE message_count = 0 AND started_at < ? AND ended_at IS NOT NULL"
)
params = [empty_cutoff]
if source:
empty_q += " AND source = ?"
params.append(source)
empty_ids = [r[0] for r in conn.execute(empty_q, params).fetchall()]
# --- Find trivial sessions ---
trivial_q = (
"SELECT id FROM sessions "
"WHERE message_count BETWEEN 1 AND ? AND started_at < ? AND ended_at IS NOT NULL"
)
t_params = [trivial_max_messages, trivial_cutoff]
if source:
trivial_q += " AND source = ?"
t_params.append(source)
trivial_ids = [r[0] for r in conn.execute(trivial_q, t_params).fetchall()]
all_ids = set(empty_ids) | set(trivial_ids)
if dry_run:
return {"empty": len(empty_ids), "trivial": len(trivial_ids),
"total": len(all_ids)}
# --- Collect child sessions to delete first (FK constraint) ---
child_ids = set()
for sid in all_ids:
for r in conn.execute(
"SELECT id FROM sessions WHERE parent_session_id = ?", (sid,)
).fetchall():
child_ids.add(r[0])
# Delete children
for cid in child_ids:
conn.execute("DELETE FROM messages WHERE session_id = ?", (cid,))
conn.execute("DELETE FROM sessions WHERE id = ?", (cid,))
# Delete targets
for sid in all_ids:
conn.execute("DELETE FROM messages WHERE session_id = ?", (sid,))
conn.execute("DELETE FROM sessions WHERE id = ?", (sid,))
return {"empty": len(empty_ids), "trivial": len(trivial_ids),
"total": len(all_ids)}
return self._execute_write(_do)

286
model-watchdog.py Normal file
View File

@@ -0,0 +1,286 @@
#!/usr/bin/env python3
"""
Model Watchdog — monitors tmux panes for model drift.
Checks all hermes TUI sessions in dev and timmy tmux sessions.
If any pane is running a non-mimo model, kills and restarts it.
Usage: python3 ~/.hermes/bin/model-watchdog.py [--fix]
--fix Actually restart drifted panes (default: dry-run)
"""
import subprocess
import sys
import re
import time
import os
ALLOWED_MODEL = "mimo-v2-pro"
# Profile -> expected model. If a pane is running this profile with this model, it's healthy.
# Profiles not in this map are checked against ALLOWED_MODEL.
PROFILE_MODELS = {
"default": "mimo-v2-pro",
"timmy-sprint": "mimo-v2-pro",
"fenrir": "mimo-v2-pro",
"bezalel": "gpt-5.4",
"burn": "mimo-v2-pro",
"creative": "claude-sonnet",
"research": "claude-sonnet",
"review": "claude-sonnet",
}
TMUX_SESSIONS = ["dev", "timmy"]
LOG_FILE = os.path.expanduser("~/.hermes/logs/model-watchdog.log")
def log(msg):
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
ts = time.strftime("%Y-%m-%d %H:%M:%S")
line = f"[{ts}] {msg}"
print(line)
with open(LOG_FILE, "a") as f:
f.write(line + "\n")
def run(cmd):
r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=10)
return r.stdout.strip(), r.returncode
def get_panes(session):
"""Get all pane info from ALL windows in a tmux session."""
# First get all windows
win_out, win_rc = run(f"tmux list-windows -t {session} -F '#{{window_name}}' 2>/dev/null")
if win_rc != 0:
return []
panes = []
for window_name in win_out.split("\n"):
if not window_name.strip():
continue
target = f"{session}:{window_name}"
out, rc = run(f"tmux list-panes -t {target} -F '#{{pane_index}}|#{{pane_pid}}|#{{pane_tty}}' 2>/dev/null")
if rc != 0:
continue
for line in out.split("\n"):
if "|" in line:
idx, pid, tty = line.split("|")
panes.append({
"session": session,
"window": window_name,
"index": int(idx),
"pid": int(pid),
"tty": tty,
})
return panes
def get_hermes_pid_for_tty(tty):
"""Find hermes process running on a specific TTY."""
out, _ = run(f"ps aux | grep '{tty}' | grep '[h]ermes' | grep -v 'gateway' | grep -v 'node' | awk '{{print $2}}'")
if out:
return int(out.split("\n")[0])
return None
def get_model_from_pane(session, pane_idx, window=None):
"""Capture the pane and extract the model from the status bar."""
target = f"{session}:{window}.{pane_idx}" if window else f"{session}.{pane_idx}"
out, _ = run(f"tmux capture-pane -t {target} -p 2>/dev/null | tail -30")
# Look for model in status bar: ⚕ model-name │
matches = re.findall(r'\s+(\S+)\s+│', out)
if matches:
return matches[0]
return None
def check_session_meta(session_id):
"""Check what model a hermes session was last using from its session file."""
import json
session_file = os.path.expanduser(f"~/.hermes/sessions/session_{session_id}.json")
if os.path.exists(session_file):
try:
with open(session_file) as f:
data = json.load(f)
return data.get("model"), data.get("provider")
except:
pass
# Try jsonl
jsonl_file = os.path.expanduser(f"~/.hermes/sessions/{session_id}.jsonl")
if os.path.exists(jsonl_file):
try:
with open(jsonl_file) as f:
for line in f:
d = json.loads(line.strip())
if d.get("role") == "session_meta":
return d.get("model"), d.get("provider")
break
except:
pass
return None, None
def is_drifted(model_name, profile=None):
"""Check if a model name indicates drift from the expected model for this profile."""
if model_name is None:
return False, "no-model-detected"
# If we know the profile, check against its expected model
if profile and profile in PROFILE_MODELS:
expected = PROFILE_MODELS[profile]
if expected in model_name:
return False, model_name
return True, model_name
# No profile known — fall back to ALLOWED_MODEL
if ALLOWED_MODEL in model_name:
return False, model_name
return True, model_name
def get_profile_from_pane(tty):
"""Detect which hermes profile a pane is running by inspecting its process args."""
# ps shows short TTY (s031) not full path (/dev/ttys031)
short_tty = tty.replace("/dev/ttys", "s").replace("/dev/ttys", "")
out, _ = run(f"ps aux | grep '{short_tty}' | grep '[h]ermes' | grep -v 'gateway' | grep -v 'node' | grep -v cron")
if not out:
return None
# Look for -p <profile> in the command line
match = re.search(r'-p\s+(\S+)', out)
if match:
return match.group(1)
return None
def kill_and_restart(session, pane_idx, window=None):
"""Kill the hermes process in a pane and restart it with the same profile."""
target = f"{session}:{window}.{pane_idx}" if window else f"{session}.{pane_idx}"
# Get the pane's TTY
out, _ = run(f"tmux list-panes -t {target} -F '#{{pane_tty}}'")
tty = out.strip()
# Detect which profile was running
profile = get_profile_from_pane(tty)
# Find and kill hermes on that TTY
hermes_pid = get_hermes_pid_for_tty(tty)
if hermes_pid:
log(f"Killing hermes PID {hermes_pid} on {target} (tty={tty}, profile={profile})")
run(f"kill {hermes_pid}")
time.sleep(2)
# Send Ctrl+C to clear any state
run(f"tmux send-keys -t {target} C-c")
time.sleep(1)
# Restart hermes with the same profile
if profile:
cmd = f"hermes -p {profile} chat"
else:
cmd = "hermes chat"
run(f"tmux send-keys -t {target} '{cmd}' Enter")
log(f"Restarted hermes in {target} with: {cmd}")
# Wait and verify
time.sleep(8)
new_model = get_model_from_pane(session, pane_idx, window)
if new_model and ALLOWED_MODEL in new_model:
log(f"{target} now on {new_model}")
return True
else:
log(f"{target} model after restart: {new_model}")
return False
def verify_expected_model(provider_yaml, expected):
"""Compare actual provider in a YAML config against expected value."""
return provider_yaml.strip() == expected.strip()
def check_config_drift():
"""Scan all relevant config.yaml files for provider drift. Does NOT modify anything.
Returns list of drift issues found."""
issues = []
CONFIGS = {
"main_config": (os.path.expanduser("~/.hermes/config.yaml"), "nous"),
"fenrir": (os.path.expanduser("~/.hermes/profiles/fenrir/config.yaml"), "nous"),
"timmy_sprint": (os.path.expanduser("~/.hermes/profiles/timmy-sprint/config.yaml"), "nous"),
"default_profile": (os.path.expanduser("~/.hermes/profiles/default/config.yaml"), "nous"),
}
for name, (path, expected_provider) in CONFIGS.items():
if not os.path.exists(path):
continue
try:
with open(path, "r") as f:
content = f.read()
# Parse YAML to correctly read model.provider (not the first provider: line)
try:
import yaml
cfg = yaml.safe_load(content) or {}
except ImportError:
# Fallback: find provider under model: block via indentation-aware scan
cfg = {}
in_model = False
for line in content.split("\n"):
stripped = line.strip()
indent = len(line) - len(line.lstrip())
if stripped.startswith("model:") and indent == 0:
in_model = True
continue
if in_model and indent == 0 and stripped:
in_model = False
if in_model and stripped.startswith("provider:"):
cfg = {"model": {"provider": stripped.split(":", 1)[1].strip()}}
break
actual = (cfg.get("model") or {}).get("provider", "")
if actual and expected_provider and actual != expected_provider:
issues.append(f"CONFIG DRIFT [{name}]: provider is '{actual}' (expected '{expected_provider}')")
except Exception as e:
issues.append(f"CONFIG CHECK ERROR [{name}]: {e}")
return issues
def main():
fix_mode = "--fix" in sys.argv
drift_found = False
issues = []
# Always check config files for provider drift (read-only, never writes)
config_drift_issues = check_config_drift()
if config_drift_issues:
for issue in config_drift_issues:
log(f"CONFIG DRIFT: {issue}")
for session in TMUX_SESSIONS:
panes = get_panes(session)
for pane in panes:
window = pane.get("window")
target = f"{session}:{window}.{pane['index']}" if window else f"{session}.{pane['index']}"
# Detect profile from running process
out, _ = run(f"tmux list-panes -t {target} -F '#{{pane_tty}}'")
tty = out.strip()
profile = get_profile_from_pane(tty)
model = get_model_from_pane(session, pane["index"], window)
drifted, model_name = is_drifted(model, profile)
if drifted:
drift_found = True
issues.append(f"{target}: {model_name} (profile={profile})")
log(f"DRIFT DETECTED: {target} is on '{model_name}' (profile={profile}, expected='{PROFILE_MODELS.get(profile, ALLOWED_MODEL)}')")
if fix_mode:
log(f"Auto-fixing {target}...")
success = kill_and_restart(session, pane["index"], window)
if not success:
issues.append(f" ↳ RESTART FAILED for {target}")
if not drift_found:
total = sum(len(get_panes(s)) for s in TMUX_SESSIONS)
log(f"All {total} panes healthy (on {ALLOWED_MODEL})")
# Print summary for cron output
if issues or config_drift_issues:
print("\n=== MODEL DRIFT REPORT ===")
for issue in issues:
print(f" [PANE] {issue}")
if config_drift_issues:
for issue in config_drift_issues:
print(f" [CONFIG] {issue}")
if not fix_mode:
print("\nRun with --fix to auto-restart drifted panes.")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -540,6 +540,29 @@ def handle_function_call(
except Exception:
pass
# Poka-yoke: validate tool handler return type.
# Handlers MUST return a JSON string. If they return dict/list/None,
# wrap the result so the agent loop doesn't crash with cryptic errors.
if not isinstance(result, str):
logger.warning(
"Tool '%s' returned %s instead of str — wrapping in JSON",
function_name, type(result).__name__,
)
result = json.dumps(
{"output": str(result), "_type_warning": f"Tool returned {type(result).__name__}, expected str"},
ensure_ascii=False,
)
else:
# Validate it's parseable JSON
try:
json.loads(result)
except (json.JSONDecodeError, TypeError):
logger.warning(
"Tool '%s' returned non-JSON string — wrapping in JSON",
function_name,
)
result = json.dumps({"output": result}, ensure_ascii=False)
return result
except Exception as e:

View File

@@ -12,7 +12,7 @@ Config in $HERMES_HOME/config.yaml (profile-scoped):
auto_extract: false
default_trust: 0.5
min_trust_threshold: 0.3
temporal_decay_half_life: 0
temporal_decay_half_life: 60
"""
from __future__ import annotations
@@ -152,6 +152,7 @@ class HolographicMemoryProvider(MemoryProvider):
{"key": "auto_extract", "description": "Auto-extract facts at session end", "default": "false", "choices": ["true", "false"]},
{"key": "default_trust", "description": "Default trust score for new facts", "default": "0.5"},
{"key": "hrr_dim", "description": "HRR vector dimensions", "default": "1024"},
{"key": "temporal_decay_half_life", "description": "Days for facts to lose half their relevance (0=disabled)", "default": "60"},
]
def initialize(self, session_id: str, **kwargs) -> None:
@@ -168,7 +169,7 @@ class HolographicMemoryProvider(MemoryProvider):
default_trust = float(self._config.get("default_trust", 0.5))
hrr_dim = int(self._config.get("hrr_dim", 1024))
hrr_weight = float(self._config.get("hrr_weight", 0.3))
temporal_decay = int(self._config.get("temporal_decay_half_life", 0))
temporal_decay = int(self._config.get("temporal_decay_half_life", 60))
self._store = MemoryStore(db_path=db_path, default_trust=default_trust, hrr_dim=hrr_dim)
self._retriever = FactRetriever(

View File

@@ -98,7 +98,15 @@ class FactRetriever:
# Optional temporal decay
if self.half_life > 0:
score *= self._temporal_decay(fact.get("updated_at") or fact.get("created_at"))
decay = self._temporal_decay(fact.get("updated_at") or fact.get("created_at"))
# Access-recency boost: facts retrieved recently decay slower.
# A fact accessed within 1 half-life gets up to 1.5x the decay
# factor, tapering to 1.0x (no boost) after 2 half-lives.
last_accessed = fact.get("last_accessed_at")
if last_accessed:
access_boost = self._access_recency_boost(last_accessed)
decay = min(1.0, decay * access_boost)
score *= decay
fact["score"] = score
scored.append(fact)
@@ -591,3 +599,41 @@ class FactRetriever:
return math.pow(0.5, age_days / self.half_life)
except (ValueError, TypeError):
return 1.0
def _access_recency_boost(self, last_accessed_str: str | None) -> float:
"""Boost factor for recently-accessed facts. Range [1.0, 1.5].
Facts accessed within 1 half-life get up to 1.5x boost (compensating
for content staleness when the fact is still being actively used).
Boost decays linearly to 1.0 (no boost) at 2 half-lives.
Returns 1.0 if half-life is disabled or timestamp is missing.
"""
if not self.half_life or not last_accessed_str:
return 1.0
try:
if isinstance(last_accessed_str, str):
ts = datetime.fromisoformat(last_accessed_str.replace("Z", "+00:00"))
else:
ts = last_accessed_str
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
age_days = (datetime.now(timezone.utc) - ts).total_seconds() / 86400
if age_days < 0:
return 1.5 # Future timestamp = just accessed
half_lives_since_access = age_days / self.half_life
if half_lives_since_access <= 1.0:
# Within 1 half-life: linearly from 1.5 (just now) to 1.0 (at 1 HL)
return 1.0 + 0.5 * (1.0 - half_lives_since_access)
elif half_lives_since_access <= 2.0:
# Between 1 and 2 half-lives: linearly from 1.0 to 1.0 (no boost)
return 1.0
else:
return 1.0
except (ValueError, TypeError):
return 1.0

View File

@@ -0,0 +1,205 @@
"""Tests for memory backend system (#322)."""
import json
import time
from unittest.mock import MagicMock, patch
import pytest
from agent.memory import (
MemoryEntry,
NullBackend,
get_memory_backend,
reset_backend,
)
from agent.memory.local_backend import LocalBackend
@pytest.fixture()
def isolated_local_backend(tmp_path, monkeypatch):
"""Create a LocalBackend with temp DB."""
db_path = tmp_path / "test_memory.db"
return LocalBackend(db_path=db_path)
@pytest.fixture()
def reset_memory():
"""Reset the memory backend singleton."""
reset_backend()
yield
reset_backend()
# ---------------------------------------------------------------------------
# MemoryEntry
# ---------------------------------------------------------------------------
class TestMemoryEntry:
def test_creation(self):
entry = MemoryEntry(key="pref", value="python", user_id="u1")
assert entry.key == "pref"
assert entry.value == "python"
assert entry.created_at > 0
def test_defaults(self):
entry = MemoryEntry(key="k", value="v", user_id="u1")
assert entry.metadata == {}
assert entry.updated_at == entry.created_at
# ---------------------------------------------------------------------------
# NullBackend
# ---------------------------------------------------------------------------
class TestNullBackend:
def test_always_available(self):
backend = NullBackend()
assert backend.is_available() is True
def test_store_noop(self):
backend = NullBackend()
assert backend.store("u1", "k", "v") is True
def test_retrieve_returns_none(self):
backend = NullBackend()
assert backend.retrieve("u1", "k") is None
def test_query_returns_empty(self):
backend = NullBackend()
assert backend.query("u1", "test") == []
def test_not_cloud(self):
backend = NullBackend()
assert backend.is_cloud is False
# ---------------------------------------------------------------------------
# LocalBackend
# ---------------------------------------------------------------------------
class TestLocalBackend:
def test_available(self, isolated_local_backend):
assert isolated_local_backend.is_available() is True
def test_store_and_retrieve(self, isolated_local_backend):
assert isolated_local_backend.store("u1", "lang", "python")
entry = isolated_local_backend.retrieve("u1", "lang")
assert entry is not None
assert entry.value == "python"
assert entry.key == "lang"
def test_store_with_metadata(self, isolated_local_backend):
assert isolated_local_backend.store("u1", "k", "v", {"source": "test"})
entry = isolated_local_backend.retrieve("u1", "k")
assert entry.metadata == {"source": "test"}
def test_update_existing(self, isolated_local_backend):
isolated_local_backend.store("u1", "k", "v1")
isolated_local_backend.store("u1", "k", "v2")
entry = isolated_local_backend.retrieve("u1", "k")
assert entry.value == "v2"
def test_query(self, isolated_local_backend):
isolated_local_backend.store("u1", "pref_python", "True")
isolated_local_backend.store("u1", "pref_editor", "vim")
isolated_local_backend.store("u1", "theme", "dark")
results = isolated_local_backend.query("u1", "pref")
assert len(results) == 2
keys = {r.key for r in results}
assert "pref_python" in keys
assert "pref_editor" in keys
def test_list_keys(self, isolated_local_backend):
isolated_local_backend.store("u1", "a", "1")
isolated_local_backend.store("u1", "b", "2")
keys = isolated_local_backend.list_keys("u1")
assert set(keys) == {"a", "b"}
def test_delete(self, isolated_local_backend):
isolated_local_backend.store("u1", "k", "v")
assert isolated_local_backend.delete("u1", "k")
assert isolated_local_backend.retrieve("u1", "k") is None
def test_retrieve_nonexistent(self, isolated_local_backend):
assert isolated_local_backend.retrieve("u1", "nope") is None
def test_not_cloud(self, isolated_local_backend):
assert isolated_local_backend.is_cloud is False
def test_separate_users(self, isolated_local_backend):
isolated_local_backend.store("u1", "k", "user1_value")
isolated_local_backend.store("u2", "k", "user2_value")
assert isolated_local_backend.retrieve("u1", "k").value == "user1_value"
assert isolated_local_backend.retrieve("u2", "k").value == "user2_value"
# ---------------------------------------------------------------------------
# Singleton
# ---------------------------------------------------------------------------
class TestSingleton:
def test_default_is_null(self, reset_memory, monkeypatch):
monkeypatch.delenv("HERMES_MEMORY_BACKEND", raising=False)
monkeypatch.delenv("HONCHO_API_KEY", raising=False)
backend = get_memory_backend()
assert isinstance(backend, NullBackend)
def test_local_when_configured(self, reset_memory, monkeypatch):
monkeypatch.setenv("HERMES_MEMORY_BACKEND", "local")
backend = get_memory_backend()
assert isinstance(backend, LocalBackend)
def test_caches_instance(self, reset_memory, monkeypatch):
monkeypatch.setenv("HERMES_MEMORY_BACKEND", "local")
b1 = get_memory_backend()
b2 = get_memory_backend()
assert b1 is b2
# ---------------------------------------------------------------------------
# HonchoBackend (mocked)
# ---------------------------------------------------------------------------
class TestHonchoBackend:
def test_not_available_without_key(self, monkeypatch):
monkeypatch.delenv("HONCHO_API_KEY", raising=False)
from agent.memory.honcho_backend import HonchoBackend
backend = HonchoBackend()
assert backend.is_available() is False
def test_is_cloud(self):
from agent.memory.honcho_backend import HonchoBackend
backend = HonchoBackend()
assert backend.is_cloud is True
# ---------------------------------------------------------------------------
# Evaluation framework
# ---------------------------------------------------------------------------
class TestEvaluation:
def test_evaluate_null_backend(self):
from agent.memory.evaluation import evaluate_backend
result = evaluate_backend(NullBackend())
assert result.backend_name == "null (disabled)"
assert result.available is True
assert result.score > 0
assert result.is_cloud is False
def test_evaluate_local_backend(self, isolated_local_backend):
from agent.memory.evaluation import evaluate_backend
result = evaluate_backend(isolated_local_backend)
assert result.backend_name == "local (SQLite)"
assert result.available is True
assert result.store_success is True
assert result.retrieve_success is True
assert result.score >= 80 # local should score well
def test_evaluate_backends_returns_report(self, reset_memory, monkeypatch):
monkeypatch.setenv("HERMES_MEMORY_BACKEND", "local")
from agent.memory.evaluation import evaluate_backends
report = evaluate_backends()
assert "backends_evaluated" in report
assert report["backends_evaluated"] >= 2 # null + local
assert "recommendation" in report

View File

@@ -0,0 +1,52 @@
"""Tests for weak credential guard in gateway/config.py."""
import os
import pytest
from gateway.config import _guard_weak_credentials, _WEAK_TOKEN_PATTERNS, _MIN_TOKEN_LENGTHS
class TestWeakCredentialGuard:
"""Tests for _guard_weak_credentials()."""
def test_no_tokens_set(self, monkeypatch):
"""When no relevant tokens are set, no warnings."""
for var in _MIN_TOKEN_LENGTHS:
monkeypatch.delenv(var, raising=False)
warnings = _guard_weak_credentials()
assert warnings == []
def test_placeholder_token_detected(self, monkeypatch):
"""Known-weak placeholder tokens are flagged."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "your-token-here")
warnings = _guard_weak_credentials()
assert len(warnings) == 1
assert "TELEGRAM_BOT_TOKEN" in warnings[0]
assert "placeholder" in warnings[0].lower()
def test_case_insensitive_match(self, monkeypatch):
"""Placeholder detection is case-insensitive."""
monkeypatch.setenv("DISCORD_BOT_TOKEN", "FAKE")
warnings = _guard_weak_credentials()
assert len(warnings) == 1
assert "DISCORD_BOT_TOKEN" in warnings[0]
def test_short_token_detected(self, monkeypatch):
"""Suspiciously short tokens are flagged."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "abc123") # 6 chars, min is 30
warnings = _guard_weak_credentials()
assert len(warnings) == 1
assert "short" in warnings[0].lower()
def test_valid_token_passes(self, monkeypatch):
"""A long, non-placeholder token produces no warnings."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "1234567890:ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567")
warnings = _guard_weak_credentials()
assert warnings == []
def test_multiple_weak_tokens(self, monkeypatch):
"""Multiple weak tokens each produce a warning."""
monkeypatch.setenv("TELEGRAM_BOT_TOKEN", "change-me")
monkeypatch.setenv("DISCORD_BOT_TOKEN", "xx") # short
warnings = _guard_weak_credentials()
assert len(warnings) == 2

View File

@@ -136,6 +136,83 @@ class TestFallbackModelValidation:
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 0
def test_blank_fallback_fields_no_issues(self):
"""Blank fallback_model fields (both empty) should not trigger warnings."""
issues = validate_config_structure({
"fallback_model": {
"provider": "",
"model": "",
},
})
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 0
def test_blank_fallback_fields_with_whitespace_no_issues(self):
"""Blank fallback_model fields with whitespace should not trigger warnings."""
issues = validate_config_structure({
"fallback_model": {
"provider": " ",
"model": " ",
},
})
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 0
def test_none_fallback_fields_no_issues(self):
"""None fallback_model fields should not trigger warnings."""
issues = validate_config_structure({
"fallback_model": {
"provider": None,
"model": None,
},
})
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 0
def test_enabled_false_no_issues(self):
"""enabled: false should suppress warnings."""
issues = validate_config_structure({
"fallback_model": {
"enabled": False,
},
})
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 0
def test_enabled_false_string_no_issues(self):
"""enabled: 'false' (string) should suppress warnings."""
issues = validate_config_structure({
"fallback_model": {
"enabled": "false",
},
})
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 0
def test_partial_blank_fallback_warns(self):
"""Partial blank fallback (only one field blank) should warn."""
issues = validate_config_structure({
"fallback_model": {
"provider": "",
"model": "anthropic/claude-sonnet-4",
},
})
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 1
assert "provider" in fb_issues[0].message
def test_valid_fallback_with_enabled_true(self):
"""Valid fallback with enabled: true should not warn."""
issues = validate_config_structure({
"fallback_model": {
"enabled": True,
"provider": "openrouter",
"model": "anthropic/claude-sonnet-4",
},
})
fb_issues = [i for i in issues if "fallback" in i.message.lower()]
assert len(fb_issues) == 0
class TestMissingModelSection:
"""Warn when custom_providers exists but model section is missing."""
@@ -172,3 +249,111 @@ class TestConfigIssueDataclass:
a = ConfigIssue("error", "msg", "hint")
b = ConfigIssue("error", "msg", "hint")
assert a == b
class TestFallbackProvidersValidation:
"""fallback_providers must be a list of dicts with provider + model."""
def test_non_list(self):
"""fallback_providers as string should error."""
issues = validate_config_structure({
"fallback_providers": "openrouter:google/gemini-3-flash-preview",
})
errors = [i for i in issues if i.severity == "error"]
assert any("fallback_providers" in i.message and "list" in i.message for i in errors)
def test_dict_instead_of_list(self):
"""fallback_providers as dict should error."""
issues = validate_config_structure({
"fallback_providers": {"provider": "openrouter", "model": "test"},
})
errors = [i for i in issues if i.severity == "error"]
assert any("fallback_providers" in i.message and "dict" in i.message for i in errors)
def test_entry_missing_provider(self):
"""Entry without provider should warn."""
issues = validate_config_structure({
"fallback_providers": [{"model": "google/gemini-3-flash-preview"}],
})
assert any("missing 'provider'" in i.message for i in issues)
def test_entry_missing_model(self):
"""Entry without model should warn."""
issues = validate_config_structure({
"fallback_providers": [{"provider": "openrouter"}],
})
assert any("missing 'model'" in i.message for i in issues)
def test_entry_not_dict(self):
"""Non-dict entries should warn."""
issues = validate_config_structure({
"fallback_providers": ["not-a-dict"],
})
assert any("not a dict" in i.message for i in issues)
def test_valid_entries(self):
"""Valid fallback_providers should produce no fallback-related issues."""
issues = validate_config_structure({
"fallback_providers": [
{"provider": "openrouter", "model": "google/gemini-3-flash-preview"},
{"provider": "gemini", "model": "gemini-2.5-flash"},
],
})
fb_issues = [i for i in issues if "fallback_providers" in i.message]
assert len(fb_issues) == 0
def test_empty_list_no_issues(self):
"""Empty list is valid (fallback disabled)."""
issues = validate_config_structure({
"fallback_providers": [],
})
fb_issues = [i for i in issues if "fallback_providers" in i.message]
assert len(fb_issues) == 0
class TestSessionResetValidation:
"""session_reset.idle_minutes must be positive."""
def test_zero_idle_minutes(self):
"""idle_minutes=0 should warn."""
issues = validate_config_structure({
"session_reset": {"idle_minutes": 0},
})
assert any("idle_minutes=0" in i.message for i in issues)
def test_negative_idle_minutes(self):
"""idle_minutes=-5 should warn."""
issues = validate_config_structure({
"session_reset": {"idle_minutes": -5},
})
assert any("idle_minutes=-5" in i.message for i in issues)
def test_string_idle_minutes(self):
"""idle_minutes as string should warn."""
issues = validate_config_structure({
"session_reset": {"idle_minutes": "abc"},
})
assert any("idle_minutes=" in i.message for i in issues)
def test_valid_idle_minutes(self):
"""Valid idle_minutes should not warn."""
issues = validate_config_structure({
"session_reset": {"idle_minutes": 1440},
})
idle_issues = [i for i in issues if "idle_minutes" in i.message]
assert len(idle_issues) == 0
def test_invalid_at_hour(self):
"""at_hour=25 should warn."""
issues = validate_config_structure({
"session_reset": {"at_hour": 25},
})
assert any("at_hour=25" in i.message for i in issues)
def test_valid_at_hour(self):
"""Valid at_hour should not warn."""
issues = validate_config_structure({
"session_reset": {"at_hour": 4},
})
hour_issues = [i for i in issues if "at_hour" in i.message]
assert len(hour_issues) == 0

View File

@@ -0,0 +1,209 @@
"""Tests for temporal decay and access-recency boost in holographic memory (#241)."""
import math
from datetime import datetime, timedelta, timezone
from unittest.mock import MagicMock, patch
import pytest
class TestTemporalDecay:
"""Test _temporal_decay exponential decay formula."""
def _make_retriever(self, half_life=60):
from plugins.memory.holographic.retrieval import FactRetriever
store = MagicMock()
return FactRetriever(store=store, temporal_decay_half_life=half_life)
def test_fresh_fact_no_decay(self):
"""A fact updated today should have decay ≈ 1.0."""
r = self._make_retriever(half_life=60)
now = datetime.now(timezone.utc).isoformat()
decay = r._temporal_decay(now)
assert decay > 0.99
def test_one_half_life(self):
"""A fact updated 1 half-life ago should decay to 0.5."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=60)).isoformat()
decay = r._temporal_decay(old)
assert abs(decay - 0.5) < 0.01
def test_two_half_lives(self):
"""A fact updated 2 half-lives ago should decay to 0.25."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=120)).isoformat()
decay = r._temporal_decay(old)
assert abs(decay - 0.25) < 0.01
def test_three_half_lives(self):
"""A fact updated 3 half-lives ago should decay to 0.125."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=180)).isoformat()
decay = r._temporal_decay(old)
assert abs(decay - 0.125) < 0.01
def test_half_life_disabled(self):
"""When half_life=0, decay should always be 1.0."""
r = self._make_retriever(half_life=0)
old = (datetime.now(timezone.utc) - timedelta(days=365)).isoformat()
assert r._temporal_decay(old) == 1.0
def test_none_timestamp(self):
"""Missing timestamp should return 1.0 (no decay)."""
r = self._make_retriever(half_life=60)
assert r._temporal_decay(None) == 1.0
def test_empty_timestamp(self):
r = self._make_retriever(half_life=60)
assert r._temporal_decay("") == 1.0
def test_invalid_timestamp(self):
"""Malformed timestamp should return 1.0 (fail open)."""
r = self._make_retriever(half_life=60)
assert r._temporal_decay("not-a-date") == 1.0
def test_future_timestamp(self):
"""Future timestamp should return 1.0 (no decay for future dates)."""
r = self._make_retriever(half_life=60)
future = (datetime.now(timezone.utc) + timedelta(days=10)).isoformat()
assert r._temporal_decay(future) == 1.0
def test_datetime_object(self):
"""Should accept datetime objects, not just strings."""
r = self._make_retriever(half_life=60)
old = datetime.now(timezone.utc) - timedelta(days=60)
decay = r._temporal_decay(old)
assert abs(decay - 0.5) < 0.01
def test_different_half_lives(self):
"""30-day half-life should decay faster than 90-day."""
r30 = self._make_retriever(half_life=30)
r90 = self._make_retriever(half_life=90)
old = (datetime.now(timezone.utc) - timedelta(days=45)).isoformat()
assert r30._temporal_decay(old) < r90._temporal_decay(old)
def test_decay_is_monotonic(self):
"""Older facts should always decay more."""
r = self._make_retriever(half_life=60)
now = datetime.now(timezone.utc)
d1 = r._temporal_decay((now - timedelta(days=10)).isoformat())
d2 = r._temporal_decay((now - timedelta(days=30)).isoformat())
d3 = r._temporal_decay((now - timedelta(days=60)).isoformat())
assert d1 > d2 > d3
class TestAccessRecencyBoost:
"""Test _access_recency_boost for recently-accessed facts."""
def _make_retriever(self, half_life=60):
from plugins.memory.holographic.retrieval import FactRetriever
store = MagicMock()
return FactRetriever(store=store, temporal_decay_half_life=half_life)
def test_just_accessed_max_boost(self):
"""A fact accessed just now should get maximum boost (1.5)."""
r = self._make_retriever(half_life=60)
now = datetime.now(timezone.utc).isoformat()
boost = r._access_recency_boost(now)
assert boost > 1.45 # Near 1.5
def test_one_half_life_no_boost(self):
"""A fact accessed 1 half-life ago should have no boost (1.0)."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=60)).isoformat()
boost = r._access_recency_boost(old)
assert abs(boost - 1.0) < 0.01
def test_half_way_boost(self):
"""A fact accessed 0.5 half-lives ago should get ~1.25 boost."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
boost = r._access_recency_boost(old)
assert abs(boost - 1.25) < 0.05
def test_beyond_one_half_life_no_boost(self):
"""Beyond 1 half-life, boost should be 1.0."""
r = self._make_retriever(half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=90)).isoformat()
boost = r._access_recency_boost(old)
assert boost == 1.0
def test_disabled_no_boost(self):
"""When half_life=0, boost should be 1.0."""
r = self._make_retriever(half_life=0)
now = datetime.now(timezone.utc).isoformat()
assert r._access_recency_boost(now) == 1.0
def test_none_timestamp(self):
r = self._make_retriever(half_life=60)
assert r._access_recency_boost(None) == 1.0
def test_invalid_timestamp(self):
r = self._make_retriever(half_life=60)
assert r._access_recency_boost("bad") == 1.0
def test_boost_range(self):
"""Boost should always be in [1.0, 1.5]."""
r = self._make_retriever(half_life=60)
now = datetime.now(timezone.utc)
for days in [0, 1, 15, 30, 45, 59, 60, 90, 365]:
ts = (now - timedelta(days=days)).isoformat()
boost = r._access_recency_boost(ts)
assert 1.0 <= boost <= 1.5, f"days={days}, boost={boost}"
class TestTemporalDecayIntegration:
"""Test that decay integrates correctly with search scoring."""
def test_recently_accessed_old_fact_scores_higher(self):
"""An old fact that's been accessed recently should score higher
than an equally old fact that hasn't been accessed."""
from plugins.memory.holographic.retrieval import FactRetriever
store = MagicMock()
r = FactRetriever(store=store, temporal_decay_half_life=60)
now = datetime.now(timezone.utc)
old_date = (now - timedelta(days=120)).isoformat() # 2 half-lives old
recent_access = (now - timedelta(days=10)).isoformat() # accessed 10 days ago
old_access = (now - timedelta(days=200)).isoformat() # accessed 200 days ago
# Old fact, recently accessed
decay1 = r._temporal_decay(old_date)
boost1 = r._access_recency_boost(recent_access)
effective1 = min(1.0, decay1 * boost1)
# Old fact, not recently accessed
decay2 = r._temporal_decay(old_date)
boost2 = r._access_recency_boost(old_access)
effective2 = min(1.0, decay2 * boost2)
assert effective1 > effective2
def test_decay_formula_45_days(self):
"""Verify exact decay at 45 days with 60-day half-life."""
from plugins.memory.holographic.retrieval import FactRetriever
r = FactRetriever(store=MagicMock(), temporal_decay_half_life=60)
old = (datetime.now(timezone.utc) - timedelta(days=45)).isoformat()
decay = r._temporal_decay(old)
expected = math.pow(0.5, 45/60)
assert abs(decay - expected) < 0.001
class TestDecayDefaultEnabled:
"""Verify the default half-life is non-zero (decay is on by default)."""
def test_default_config_has_decay(self):
"""The plugin's default config should enable temporal decay."""
from plugins.memory.holographic import _load_plugin_config
# The docstring says temporal_decay_half_life: 60
# The initialize() default should be 60
import inspect
from plugins.memory.holographic import HolographicMemoryProvider
src = inspect.getsource(HolographicMemoryProvider.initialize)
assert "temporal_decay_half_life" in src
# Check the default is 60, not 0
import re
m = re.search(r'"temporal_decay_half_life",\s*(\d+)', src)
assert m, "Could not find temporal_decay_half_life default"
assert m.group(1) == "60", f"Default is {m.group(1)}, expected 60"

View File

@@ -0,0 +1,73 @@
"""Tests for cron scheduler cloud-provider terminal disabling (#379).
When a cron job runs on a cloud inference endpoint (Nous, OpenRouter, etc.),
the terminal toolset must be disabled because SSH keys don't exist on cloud
servers. Only local endpoints (localhost, 127.0.0.1, RFC-1918) retain
terminal access.
"""
import pytest
from agent.model_metadata import is_local_endpoint
class TestIsLocalEndpoint:
"""Verify is_local_endpoint correctly classifies endpoints."""
def test_localhost(self):
assert is_local_endpoint("http://localhost:11434/v1") is True
def test_127_loopback(self):
assert is_local_endpoint("http://127.0.0.1:8080/v1") is True
def test_0_0_0_0(self):
assert is_local_endpoint("http://0.0.0.0:11434/v1") is True
def test_rfc1918_10(self):
assert is_local_endpoint("http://10.0.0.5:8080/v1") is True
def test_rfc1918_192(self):
assert is_local_endpoint("http://192.168.1.100:11434/v1") is True
def test_rfc1918_172(self):
assert is_local_endpoint("http://172.16.0.1:8080/v1") is True
def test_cloud_openrouter(self):
assert is_local_endpoint("https://openrouter.ai/api/v1") is False
def test_cloud_nous(self):
assert is_local_endpoint("https://inference-api.nousresearch.com/v1") is False
def test_cloud_anthropic(self):
assert is_local_endpoint("https://api.anthropic.com") is False
def test_empty_url(self):
assert is_local_endpoint("") is False
def test_none_url(self):
assert is_local_endpoint(None) is False
class TestCronDisabledToolsetsLogic:
"""Verify the disabled_toolsets logic matches scheduler expectations."""
def _build_disabled(self, base_url, job=None):
"""Mirror the scheduler's disabled_toolsets logic."""
from agent.model_metadata import is_local_endpoint
cron_disabled = ["cronjob", "messaging", "clarify"]
if not is_local_endpoint(base_url):
cron_disabled.append("terminal")
return cron_disabled
def test_local_keeps_terminal(self):
disabled = self._build_disabled("http://localhost:11434/v1")
assert "terminal" not in disabled
assert "cronjob" in disabled
def test_cloud_disables_terminal(self):
disabled = self._build_disabled("https://openrouter.ai/api/v1")
assert "terminal" in disabled
assert "cronjob" in disabled
def test_empty_url_disables_terminal(self):
disabled = self._build_disabled("")
assert "terminal" in disabled

View File

@@ -0,0 +1,128 @@
"""Tests for time-aware cron model routing — Issue #317."""
import pytest
from datetime import datetime
from agent.smart_model_routing import resolve_cron_model, _hour_in_window
class TestHourInWindow:
"""Hour-in-window detection including midnight wrap."""
def test_normal_window(self):
assert _hour_in_window(18, 17, 22) is True
assert _hour_in_window(16, 17, 22) is False
assert _hour_in_window(22, 17, 22) is False
def test_midnight_wrap(self):
assert _hour_in_window(23, 22, 6) is True
assert _hour_in_window(3, 22, 6) is True
assert _hour_in_window(10, 22, 6) is False
def test_edge_cases(self):
assert _hour_in_window(0, 0, 24) is True
assert _hour_in_window(23, 0, 24) is True
assert _hour_in_window(0, 22, 6) is True
assert _hour_in_window(5, 22, 6) is True
assert _hour_in_window(6, 22, 6) is False
class TestResolveCronModel:
"""Time-aware model resolution for cron jobs."""
def _config(self, **overrides):
base = {
"enabled": True,
"fallback_model": "anthropic/claude-sonnet-4",
"fallback_provider": "openrouter",
"windows": [
{"start_hour": 17, "end_hour": 22, "reason": "evening_error_peak"},
],
}
base.update(overrides)
return base
def test_disabled_returns_base(self):
result = resolve_cron_model("mimo", {"enabled": False}, now=datetime(2026, 4, 12, 18, 0))
assert result["model"] == "mimo"
assert result["overridden"] is False
def test_no_config_returns_base(self):
result = resolve_cron_model("mimo", None)
assert result["model"] == "mimo"
assert result["overridden"] is False
def test_no_windows_returns_base(self):
result = resolve_cron_model("mimo", {"enabled": True, "windows": []}, now=datetime(2026, 4, 12, 18, 0))
assert result["overridden"] is False
def test_evening_window_overrides(self):
result = resolve_cron_model("mimo", self._config(), now=datetime(2026, 4, 12, 18, 0))
assert result["model"] == "anthropic/claude-sonnet-4"
assert result["provider"] == "openrouter"
assert result["overridden"] is True
assert "evening_error_peak" in result["reason"]
assert "hour=18" in result["reason"]
def test_outside_window_keeps_base(self):
result = resolve_cron_model("mimo", self._config(), now=datetime(2026, 4, 12, 9, 0))
assert result["model"] == "mimo"
assert result["overridden"] is False
def test_window_boundary_start_inclusive(self):
result = resolve_cron_model("mimo", self._config(), now=datetime(2026, 4, 12, 17, 0))
assert result["overridden"] is True
def test_window_boundary_end_exclusive(self):
result = resolve_cron_model("mimo", self._config(), now=datetime(2026, 4, 12, 22, 0))
assert result["overridden"] is False
def test_midnight_window(self):
config = self._config(windows=[{"start_hour": 22, "end_hour": 6, "reason": "overnight"}])
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 23, 0))["overridden"] is True
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 13, 3, 0))["overridden"] is True
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 10, 0))["overridden"] is False
def test_per_window_model_override(self):
config = self._config(windows=[{
"start_hour": 17, "end_hour": 22,
"model": "anthropic/claude-opus-4-6", "provider": "anthropic", "reason": "peak",
}])
result = resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 18, 0))
assert result["model"] == "anthropic/claude-opus-4-6"
assert result["provider"] == "anthropic"
def test_first_matching_window_wins(self):
config = self._config(windows=[
{"start_hour": 17, "end_hour": 20, "model": "strong-1", "provider": "p1", "reason": "w1"},
{"start_hour": 19, "end_hour": 22, "model": "strong-2", "provider": "p2", "reason": "w2"},
])
result = resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 19, 0))
assert result["model"] == "strong-1"
def test_no_fallback_model_keeps_base(self):
config = {"enabled": True, "windows": [{"start_hour": 17, "end_hour": 22, "reason": "test"}]}
result = resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 18, 0))
assert result["overridden"] is False
assert result["model"] == "mimo"
def test_malformed_windows_skipped(self):
config = self._config(windows=[
"not-a-dict",
{"start_hour": 17},
{"end_hour": 22},
{"start_hour": "bad", "end_hour": "bad"},
{"start_hour": 17, "end_hour": 22, "reason": "valid"},
])
result = resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 18, 0))
assert result["overridden"] is True
assert "valid" in result["reason"]
def test_multiple_windows_coverage(self):
config = self._config(windows=[
{"start_hour": 17, "end_hour": 22, "reason": "evening"},
{"start_hour": 2, "end_hour": 5, "reason": "overnight"},
])
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 20, 0))["overridden"] is True
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 13, 3, 0))["overridden"] is True
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 10, 0))["overridden"] is False

View File

@@ -665,6 +665,127 @@ class TestPruneSessions:
# =========================================================================
# =========================================================================
# Garbage Collect
# =========================================================================
class TestGarbageCollect:
def test_gc_deletes_empty_old_sessions(self, db):
"""Empty sessions (0 messages) older than 24h should be deleted."""
db.create_session(session_id="empty_old", source="cli")
db.end_session("empty_old", end_reason="done")
db._conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 48 * 3600, "empty_old"), # 48 hours ago
)
db._conn.commit()
# Recent empty session should be kept
db.create_session(session_id="empty_new", source="cli")
db.end_session("empty_new", end_reason="done")
result = db.garbage_collect()
assert result["empty"] == 1
assert result["trivial"] == 0
assert result["total"] == 1
assert db.get_session("empty_old") is None
assert db.get_session("empty_new") is not None
def test_gc_deletes_trivial_old_sessions(self, db):
"""Sessions with 1-5 messages older than 7 days should be deleted."""
db.create_session(session_id="trivial_old", source="cli")
for i in range(3):
db.append_message("trivial_old", role="user", content=f"msg {i}")
db.end_session("trivial_old", end_reason="done")
db._conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 10 * 86400, "trivial_old"), # 10 days ago
)
db._conn.commit()
result = db.garbage_collect()
assert result["trivial"] == 1
assert result["total"] == 1
assert db.get_session("trivial_old") is None
def test_gc_keeps_active_sessions(self, db):
"""Active (not ended) sessions should never be deleted."""
db.create_session(session_id="active_old", source="cli")
# Backdate but don't end
db._conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 48 * 3600, "active_old"),
)
db._conn.commit()
result = db.garbage_collect()
assert result["total"] == 0
assert db.get_session("active_old") is not None
def test_gc_keeps_substantial_sessions(self, db):
"""Sessions with >5 messages should never be deleted."""
db.create_session(session_id="big_old", source="cli")
for i in range(10):
db.append_message("big_old", role="user", content=f"msg {i}")
db.end_session("big_old", end_reason="done")
db._conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 365 * 86400, "big_old"), # 1 year ago
)
db._conn.commit()
result = db.garbage_collect()
assert result["total"] == 0
assert db.get_session("big_old") is not None
def test_gc_dry_run_does_not_delete(self, db):
"""dry_run=True should return counts but not delete anything."""
db.create_session(session_id="empty_old", source="cli")
db.end_session("empty_old", end_reason="done")
db._conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 48 * 3600, "empty_old"),
)
db._conn.commit()
result = db.garbage_collect(dry_run=True)
assert result["total"] == 1
assert db.get_session("empty_old") is not None # Still exists
def test_gc_with_source_filter(self, db):
"""--source should only GC sessions from that source."""
for sid, src in [("old_cli", "cli"), ("old_tg", "telegram")]:
db.create_session(session_id=sid, source=src)
db.end_session(sid, end_reason="done")
db._conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 48 * 3600, sid),
)
db._conn.commit()
result = db.garbage_collect(source="cli")
assert result["total"] == 1
assert db.get_session("old_cli") is None
assert db.get_session("old_tg") is not None
def test_gc_handles_child_sessions(self, db):
"""Child sessions should be deleted when parent is GC'd."""
db.create_session(session_id="parent_old", source="cli")
db.end_session("parent_old", end_reason="done")
db._conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 48 * 3600, "parent_old"),
)
# Create child session
db.create_session(session_id="child", source="cli", parent_session_id="parent_old")
db.end_session("child", end_reason="done")
db._conn.commit()
result = db.garbage_collect()
assert result["total"] == 1
assert db.get_session("parent_old") is None
assert db.get_session("child") is None
# Schema and WAL mode
# =========================================================================

View File

@@ -137,3 +137,78 @@ class TestBackwardCompat:
def test_tool_to_toolset_map(self):
assert isinstance(TOOL_TO_TOOLSET_MAP, dict)
assert len(TOOL_TO_TOOLSET_MAP) > 0
class TestToolReturnTypeValidation:
"""Poka-yoke: tool handlers must return JSON strings."""
def test_handler_returning_dict_is_wrapped(self, monkeypatch):
"""A handler that returns a dict should be auto-wrapped to JSON string."""
from tools.registry import registry
from model_tools import handle_function_call
import json
# Register a bad handler that returns dict instead of str
registry.register(
name="__test_bad_dict",
toolset="test",
schema={"name": "__test_bad_dict", "description": "test", "parameters": {"type": "object", "properties": {}}},
handler=lambda args, **kw: {"this is": "a dict not a string"},
)
result = handle_function_call("__test_bad_dict", {})
parsed = json.loads(result)
assert "output" in parsed
assert "_type_warning" in parsed
# Cleanup
registry._tools.pop("__test_bad_dict", None)
def test_handler_returning_none_is_wrapped(self, monkeypatch):
"""A handler that returns None should be auto-wrapped."""
from tools.registry import registry
from model_tools import handle_function_call
import json
registry.register(
name="__test_bad_none",
toolset="test",
schema={"name": "__test_bad_none", "description": "test", "parameters": {"type": "object", "properties": {}}},
handler=lambda args, **kw: None,
)
result = handle_function_call("__test_bad_none", {})
parsed = json.loads(result)
assert "_type_warning" in parsed
registry._tools.pop("__test_bad_none", None)
def test_handler_returning_non_json_string_is_wrapped(self):
"""A handler returning a plain string (not JSON) should be wrapped."""
from tools.registry import registry
from model_tools import handle_function_call
import json
registry.register(
name="__test_bad_plain",
toolset="test",
schema={"name": "__test_bad_plain", "description": "test", "parameters": {"type": "object", "properties": {}}},
handler=lambda args, **kw: "just a plain string, not json",
)
result = handle_function_call("__test_bad_plain", {})
parsed = json.loads(result)
assert "output" in parsed
registry._tools.pop("__test_bad_plain", None)
def test_handler_returning_valid_json_passes_through(self):
"""A handler returning valid JSON string passes through unchanged."""
from tools.registry import registry
from model_tools import handle_function_call
import json
registry.register(
name="__test_good",
toolset="test",
schema={"name": "__test_good", "description": "test", "parameters": {"type": "object", "properties": {}}},
handler=lambda args, **kw: json.dumps({"status": "ok", "data": [1, 2, 3]}),
)
result = handle_function_call("__test_good", {})
parsed = json.loads(result)
assert parsed == {"status": "ok", "data": [1, 2, 3]}
registry._tools.pop("__test_good", None)

View File

@@ -144,7 +144,8 @@ class TestMemoryStoreReplace:
def test_replace_no_match(self, store):
store.add("memory", "fact A")
result = store.replace("memory", "nonexistent", "new")
assert result["success"] is False
assert result["success"] is True
assert result["result"] == "no_match"
def test_replace_ambiguous_match(self, store):
store.add("memory", "server A runs nginx")
@@ -177,7 +178,8 @@ class TestMemoryStoreRemove:
def test_remove_no_match(self, store):
result = store.remove("memory", "nonexistent")
assert result["success"] is False
assert result["success"] is True
assert result["result"] == "no_match"
def test_remove_empty_old_text(self, store):
result = store.remove("memory", " ")

View File

@@ -201,6 +201,17 @@ def _format_job(job: Dict[str, Any]) -> Dict[str, Any]:
"paused_at": job.get("paused_at"),
"paused_reason": job.get("paused_reason"),
}
# Health timestamps
if job.get("last_error_at"):
result["last_error_at"] = job["last_error_at"]
if job.get("last_success_at"):
result["last_success_at"] = job["last_success_at"]
if job.get("error_resolved_at"):
result["error_resolved_at"] = job["error_resolved_at"]
if job.get("error_cleared_at"):
result["error_cleared_at"] = job["error_cleared_at"]
if job.get("script"):
result["script"] = job["script"]
return result
@@ -326,6 +337,13 @@ def cronjob(
if result is None:
return json.dumps({"success": False, "error": "Job not found"}, indent=2)
return json.dumps(result, indent=2)
if normalized == "clear_error":
from cron.jobs import clear_job_error
job = clear_job_error(job_id)
if job is None:
return json.dumps({"success": False, "error": "Job not found"}, indent=2)
return json.dumps({"success": True, "job": _format_job(job)}, indent=2)
if normalized == "update":
updates: Dict[str, Any] = {}

View File

@@ -0,0 +1,165 @@
"""Memory Backend Tool — manage cross-session memory backends.
Provides store/retrieve/query/evaluate/list actions for the
pluggable memory backend system.
"""
import json
import logging
from typing import Optional
from tools.registry import registry
logger = logging.getLogger(__name__)
def memory_backend(
action: str,
user_id: str = "default",
key: str = None,
value: str = None,
query_text: str = None,
metadata: dict = None,
) -> str:
"""Manage cross-session memory backends.
Actions:
store — store a user preference/pattern
retrieve — retrieve a specific memory by key
query — search memories by text
list — list all keys for a user
delete — delete a memory entry
info — show current backend info
evaluate — run evaluation framework comparing backends
"""
from agent.memory import get_memory_backend
backend = get_memory_backend()
if action == "info":
return json.dumps({
"success": True,
"backend": backend.backend_name,
"is_cloud": backend.is_cloud,
"available": backend.is_available(),
})
if action == "store":
if not key or value is None:
return json.dumps({"success": False, "error": "key and value are required for 'store'."})
success = backend.store(user_id, key, value, metadata)
return json.dumps({"success": success, "key": key})
if action == "retrieve":
if not key:
return json.dumps({"success": False, "error": "key is required for 'retrieve'."})
entry = backend.retrieve(user_id, key)
if entry is None:
return json.dumps({"success": False, "error": f"No memory found for key '{key}'."})
return json.dumps({
"success": True,
"key": entry.key,
"value": entry.value,
"metadata": entry.metadata,
"updated_at": entry.updated_at,
})
if action == "query":
if not query_text:
return json.dumps({"success": False, "error": "query_text is required for 'query'."})
results = backend.query(user_id, query_text)
return json.dumps({
"success": True,
"results": [
{"key": e.key, "value": e.value, "metadata": e.metadata}
for e in results
],
"count": len(results),
})
if action == "list":
keys = backend.list_keys(user_id)
return json.dumps({"success": True, "keys": keys, "count": len(keys)})
if action == "delete":
if not key:
return json.dumps({"success": False, "error": "key is required for 'delete'."})
success = backend.delete(user_id, key)
return json.dumps({"success": success})
if action == "evaluate":
from agent.memory.evaluation import evaluate_backends
report = evaluate_backends()
return json.dumps({
"success": True,
**report,
})
return json.dumps({
"success": False,
"error": f"Unknown action '{action}'. Use: store, retrieve, query, list, delete, info, evaluate",
})
MEMORY_BACKEND_SCHEMA = {
"name": "memory_backend",
"description": (
"Manage cross-session memory backends for user preference persistence. "
"Pluggable architecture supports local SQLite (default, zero cloud dependency) "
"and optional Honcho cloud backend (requires HONCHO_API_KEY).\n\n"
"Actions:\n"
" store — store a user preference/pattern\n"
" retrieve — retrieve a specific memory by key\n"
" query — search memories by text\n"
" list — list all keys for a user\n"
" delete — delete a memory entry\n"
" info — show current backend info\n"
" evaluate — run evaluation framework comparing backends"
),
"parameters": {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["store", "retrieve", "query", "list", "delete", "info", "evaluate"],
"description": "The action to perform.",
},
"user_id": {
"type": "string",
"description": "User identifier for memory operations (default: 'default').",
},
"key": {
"type": "string",
"description": "Memory key for store/retrieve/delete.",
},
"value": {
"type": "string",
"description": "Value to store.",
},
"query_text": {
"type": "string",
"description": "Search text for query action.",
},
"metadata": {
"type": "object",
"description": "Optional metadata dict for store.",
},
},
"required": ["action"],
},
}
registry.register(
name="memory_backend",
toolset="skills",
schema=MEMORY_BACKEND_SCHEMA,
handler=lambda args, **kw: memory_backend(
action=args.get("action", ""),
user_id=args.get("user_id", "default"),
key=args.get("key"),
value=args.get("value"),
query_text=args.get("query_text"),
metadata=args.get("metadata"),
),
emoji="🧠",
)

View File

@@ -260,8 +260,12 @@ class MemoryStore:
entries = self._entries_for(target)
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
if len(matches) == 0:
return {"success": False, "error": f"No entry matched '{old_text}'."}
if not matches:
return {
"success": True,
"result": "no_match",
"message": f"No entry matched '{old_text}'. The search substring was not found in any existing entry.",
}
if len(matches) > 1:
# If all matches are identical (exact duplicates), operate on the first one
@@ -310,8 +314,12 @@ class MemoryStore:
entries = self._entries_for(target)
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
if len(matches) == 0:
return {"success": False, "error": f"No entry matched '{old_text}'."}
if not matches:
return {
"success": True,
"result": "no_match",
"message": f"No entry matched '{old_text}'. The search substring was not found in any existing entry.",
}
if len(matches) > 1:
# If all matches are identical (exact duplicates), remove the first one
@@ -449,30 +457,30 @@ def memory_tool(
Returns JSON string with results.
"""
if store is None:
return json.dumps({"success": False, "error": "Memory is not available. It may be disabled in config or this environment."}, ensure_ascii=False)
return tool_error("Memory is not available. It may be disabled in config or this environment.", success=False)
if target not in ("memory", "user"):
return json.dumps({"success": False, "error": f"Invalid target '{target}'. Use 'memory' or 'user'."}, ensure_ascii=False)
return tool_error(f"Invalid target '{target}'. Use 'memory' or 'user'.", success=False)
if action == "add":
if not content:
return json.dumps({"success": False, "error": "Content is required for 'add' action."}, ensure_ascii=False)
return tool_error("Content is required for 'add' action.", success=False)
result = store.add(target, content)
elif action == "replace":
if not old_text:
return json.dumps({"success": False, "error": "old_text is required for 'replace' action."}, ensure_ascii=False)
return tool_error("old_text is required for 'replace' action.", success=False)
if not content:
return json.dumps({"success": False, "error": "content is required for 'replace' action."}, ensure_ascii=False)
return tool_error("content is required for 'replace' action.", success=False)
result = store.replace(target, old_text, content)
elif action == "remove":
if not old_text:
return json.dumps({"success": False, "error": "old_text is required for 'remove' action."}, ensure_ascii=False)
return tool_error("old_text is required for 'remove' action.", success=False)
result = store.remove(target, old_text)
else:
return json.dumps({"success": False, "error": f"Unknown action '{action}'. Use: add, replace, remove"}, ensure_ascii=False)
return tool_error(f"Unknown action '{action}'. Use: add, replace, remove", success=False)
return json.dumps(result, ensure_ascii=False)
@@ -539,7 +547,7 @@ MEMORY_SCHEMA = {
# --- Registry ---
from tools.registry import registry
from tools.registry import registry, tool_error
registry.register(
name="memory",