Compare commits

..

4 Commits

Author SHA1 Message Date
Alexander Whitestone
69e10967bd feat: session garbage collection (#315)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 14s
Add garbage_collect() method to SessionDB that cleans up empty and
trivial sessions based on age:
- Empty sessions (0 messages) older than 24h
- Trivial sessions (1-5 messages) older than 7 days
- Sessions with >5 messages kept indefinitely

Add `hermes sessions gc` CLI command with:
- --empty-hours (default: 24)
- --trivial-days (default: 7)
- --trivial-max (default: 5)
- --source filter
- --dry-run preview mode
- --yes skip confirmation

The dry-run flow: preview what would be deleted, ask for confirmation,
then execute. Handles child session FK constraints properly.

7 tests covering: empty/trivial deletion, active session protection,
substantial session preservation, dry-run, source filtering, and child
session handling.

Closes #315
2026-04-13 17:30:39 -04:00
Alexander Whitestone
992498463e fix: gateway config debt - validation, defaults, fallback chain checks (#328)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m32s
- Expand validate_config_structure() to catch:
  - fallback_providers format errors (non-list, missing provider/model)
  - session_reset.idle_minutes <= 0 (causes immediate resets)
  - session_reset.at_hour out of 0-23 range
  - API_SERVER enabled without API_SERVER_KEY
  - Unknown root-level keys that look like misplaced custom_providers fields
- Add _validate_fallback_providers() in gateway/config.py to validate
  fallback chain at gateway startup (logs warnings for malformed entries)
- Add API_SERVER_KEY check in gateway config loader (warns on unauthenticated endpoint)
- Expand _KNOWN_ROOT_KEYS to include all valid top-level config sections
  (session_reset, browser, checkpoints, voice, stt, tts, etc.)
- Add 13 new tests for fallback_providers and session_reset validation
- All existing tests pass (47/47)

Closes #328
2026-04-13 17:29:20 -04:00
Alexander Whitestone
ec3cd2081b fix(poka-yoke): add tool fixation detection (#310)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 26s
Detect when the same tool is called 5+ times consecutively and inject
a nudge advising the agent to diversify its approach.

Evidence from empirical audit:
- Top marathon session (qwen, 1643 msgs): execute_code streak of 20
- Opus session (1472 msgs): terminal streak of 10

The nudge fires every 5 consecutive calls (5, 10, 15...) so it
persists without being spammy. Tracks independently in both
sequential and concurrent execution paths.
2026-04-13 10:16:11 -04:00
Alexander Whitestone
110642d86a fix(poka-yoke): add circuit breaker for error cascading (#309)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 28s
After 3 consecutive tool errors, inject a warning into the tool result
advising the agent to switch strategies. Escalates at 6 and 9+ errors.

Empirical data from audit:
- P(error | prev error) = 58.6% vs P(error | prev success) = 25.2%
- 2.33x cascade amplification factor
- Max observed streak: 31 consecutive errors

Intervention tiers:
- 3 errors: advisory warning (try different tool, use terminal, simplify)
- 6 errors: urgent stop (halt retries, investigate or switch)
- 9+ errors: terminal-only recovery path

Tracks errors in both sequential and concurrent execution paths.
2026-04-13 10:12:24 -04:00
18 changed files with 535 additions and 627 deletions

View File

@@ -26,11 +26,11 @@ from cron.jobs import (
trigger_job,
JOBS_FILE,
)
from cron.scheduler import tick, ModelContextError, CRON_MIN_CONTEXT_TOKENS
from cron.scheduler import tick
__all__ = [
"create_job",
"get_job",
"get_job",
"list_jobs",
"remove_job",
"update_job",
@@ -39,6 +39,4 @@ __all__ = [
"trigger_job",
"tick",
"JOBS_FILE",
"ModelContextError",
"CRON_MIN_CONTEXT_TOKENS",
]

View File

@@ -563,44 +563,6 @@ def trigger_job(job_id: str) -> Optional[Dict[str, Any]]:
)
def run_job_now(job_id: str) -> Optional[Dict[str, Any]]:
"""
Execute a job immediately and persist fresh state.
Unlike trigger_job() which queues for the next scheduler tick,
this runs the job synchronously and returns the result.
Clears stale error state on success.
Returns:
Dict with 'job', 'success', 'output', 'error' keys, or None if not found.
"""
job = get_job(job_id)
if not job:
return None
try:
from cron.scheduler import run_job as _run_job
except ImportError as exc:
return {
"job": job,
"success": False,
"output": None,
"error": f"Cannot import scheduler: {exc}",
}
success, output, final_response, error = _run_job(job)
mark_job_run(job_id, success, error)
updated_job = get_job(job_id) or job
return {
"job": updated_job,
"success": success,
"output": output,
"final_response": final_response,
"error": error,
}
def remove_job(job_id: str) -> bool:
"""Remove a job by ID."""
jobs = load_jobs()

View File

@@ -40,112 +40,6 @@ from hermes_time import now as _hermes_now
logger = logging.getLogger(__name__)
# =====================================================================
# Deploy Sync Guard
# =====================================================================
#
# If the installed run_agent.py diverges from the version scheduler.py
# was written against, every cron job fails with:
# TypeError: AIAgent.__init__() got an unexpected keyword argument '...'
#
# _validate_agent_interface() catches this at the FIRST job, not the
# 55th. It uses inspect.signature() to verify every kwarg we pass is
# accepted by AIAgent.__init__().
#
# Maintaining this list: if you add a kwarg to the AIAgent() call in
# run_job(), add it here too. The guard catches mismatches.
_SCHEDULER_AGENT_KWARGS: set = frozenset({
"model", "api_key", "base_url", "provider", "api_mode",
"acp_command", "acp_args", "max_iterations", "reasoning_config",
"prefill_messages", "providers_allowed", "providers_ignored",
"providers_order", "provider_sort", "disabled_toolsets",
"tool_choice", "quiet_mode", "skip_memory", "platform",
"session_id", "session_db",
})
_agent_interface_validated: bool = False
def _validate_agent_interface() -> None:
"""Verify installed AIAgent.__init__ accepts every kwarg the scheduler passes.
Raises RuntimeError with actionable guidance if params are missing.
Caches result — runs once per gateway process lifetime.
"""
global _agent_interface_validated
if _agent_interface_validated:
return
import inspect
try:
from run_agent import AIAgent
except ImportError as exc:
raise RuntimeError(
f"Cannot import AIAgent: {exc}\n"
"Is hermes-agent installed? Check PYTHONPATH."
) from exc
sig = inspect.signature(AIAgent.__init__)
accepted = set(sig.parameters.keys()) - {"self"}
missing = _SCHEDULER_AGENT_KWARGS - accepted
if missing:
sorted_missing = sorted(missing)
raise RuntimeError(
"Deploy sync guard FAILED — AIAgent.__init__() is missing params:\n"
f" {', '.join(sorted_missing)}\n"
"This means the installed run_agent.py is out of date.\n"
"Fix: pull latest hermes-agent code and restart the gateway.\n"
" cd ~/.hermes/hermes-agent && git pull && source venv/bin/activate"
)
_agent_interface_validated = True
logger.debug("Deploy sync guard passed — %d params verified", len(_SCHEDULER_AGENT_KWARGS))
def _safe_agent_kwargs(kwargs: dict) -> dict:
"""Filter kwargs to only those accepted by installed AIAgent.__init__.
More resilient than _validate_agent_interface() alone: instead of
crashing on mismatch, drops unsupported kwargs and logs a warning.
Jobs run with degraded functionality instead of failing entirely.
Args:
kwargs: The kwargs dict the scheduler wants to pass to AIAgent().
Returns:
A new dict containing only kwargs the installed AIAgent accepts.
"""
import inspect
try:
from run_agent import AIAgent
except ImportError:
# Can't import — pass everything through, let the real error surface
return kwargs
sig = inspect.signature(AIAgent.__init__)
accepted = set(sig.parameters.keys()) - {"self"}
safe = {}
dropped = []
for key, value in kwargs.items():
if key in accepted:
safe[key] = value
else:
dropped.append(key)
if dropped:
logger.warning(
"Dropping unsupported AIAgent kwargs (stale install?): %s",
", ".join(sorted(dropped)),
)
return safe
# Valid delivery platforms — used to validate user-supplied platform names
# in cron delivery targets, preventing env var enumeration via crafted names.
_KNOWN_DELIVERY_PLATFORMS = frozenset({
@@ -160,72 +54,6 @@ from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_
# response with this marker to suppress delivery. Output is still saved
# locally for audit.
SILENT_MARKER = "[SILENT]"
SCRIPT_FAILED_MARKER = "[SCRIPT_FAILED]"
# Failure phrases that indicate an external script/command failed, even when
# the agent doesn't use the [SCRIPT_FAILED] marker. Matched case-insensitively
# against the final response. These are strong signals — agents rarely use
# these words when a script succeeded.
_SCRIPT_FAILURE_PHRASES = (
"timed out",
"timeout",
"connection error",
"connection refused",
"connection reset",
"failed to execute",
"failed due to",
"script failed",
"script error",
"command failed",
"exit code",
"exit status",
"non-zero exit",
"did not complete",
"could not run",
"unable to execute",
"permission denied",
"no such file",
"traceback",
)
def _detect_script_failure(final_response: str) -> Optional[str]:
"""Detect script failure from agent's final response.
Returns a reason string if failure detected, None otherwise.
Checks both the explicit [SCRIPT_FAILED] marker and heuristic patterns.
"""
if not final_response:
return None
# 1. Explicit marker — highest confidence.
if SCRIPT_FAILED_MARKER in final_response.upper():
import re as _re
_m = _re.search(
r'\[SCRIPT_FAILED\]\s*:?\s*(.*)',
final_response,
_re.IGNORECASE,
)
reason = _m.group(1).strip() if _m and _m.group(1).strip() else None
return reason or "Agent reported script failure"
# 2. Heuristic detection — catch failures described in natural language.
# Only flag if the response contains failure language AND does NOT
# contain success markers like [NOOP] (which means the script ran fine
# but found nothing).
lower = final_response.lower()
has_noop = "[noop]" in lower
has_silent = "[silent]" in lower
if has_noop or has_silent:
return None # Agent explicitly signaled success/nothing-to-report
for phrase in _SCRIPT_FAILURE_PHRASES:
if phrase in lower:
return f"Detected script failure phrase: '{phrase}'"
return None
# Resolve Hermes home directory (respects HERMES_HOME override)
_hermes_home = get_hermes_home()
@@ -586,15 +414,7 @@ def _build_job_prompt(job: dict) -> str:
"SILENT: If there is genuinely nothing new to report, respond "
"with exactly \"[SILENT]\" (nothing else) to suppress delivery. "
"Never combine [SILENT] with content — either report your "
"findings normally, or say [SILENT] and nothing more. "
"SCRIPT_FAILURE: If an external command or script you ran "
"failed (timeout, crash, connection error, non-zero exit), you MUST "
"respond with "
"\"[SCRIPT_FAILED]: <one-line reason>\" as the FIRST LINE of your "
"response. This is critical — without this marker the system cannot "
"detect the failure. Examples: "
"\"[SCRIPT_FAILED]: forge.alexanderwhitestone.com timed out\" "
"\"[SCRIPT_FAILED]: script exited with code 1\".]\\n\\n"
"findings normally, or say [SILENT] and nothing more.]\n\n"
)
prompt = cron_hint + prompt
if skills is None:
@@ -649,10 +469,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
Returns:
Tuple of (success, full_output_doc, final_response, error_message)
"""
# Deploy sync guard — fail fast on first job if the installed
# AIAgent.__init__ is missing params the scheduler expects.
_validate_agent_interface()
from run_agent import AIAgent
# Initialize SQLite session store so cron job messages are persisted
@@ -777,30 +593,29 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
},
)
_agent_kwargs = _safe_agent_kwargs({
"model": turn_route["model"],
"api_key": turn_route["runtime"].get("api_key"),
"base_url": turn_route["runtime"].get("base_url"),
"provider": turn_route["runtime"].get("provider"),
"api_mode": turn_route["runtime"].get("api_mode"),
"acp_command": turn_route["runtime"].get("command"),
"acp_args": turn_route["runtime"].get("args"),
"max_iterations": max_iterations,
"reasoning_config": reasoning_config,
"prefill_messages": prefill_messages,
"providers_allowed": pr.get("only"),
"providers_ignored": pr.get("ignore"),
"providers_order": pr.get("order"),
"provider_sort": pr.get("sort"),
"disabled_toolsets": ["cronjob", "messaging", "clarify"],
"tool_choice": "required",
"quiet_mode": True,
"skip_memory": True, # Cron system prompts would corrupt user representations
"platform": "cron",
"session_id": _cron_session_id,
"session_db": _session_db,
})
agent = AIAgent(**_agent_kwargs)
agent = AIAgent(
model=turn_route["model"],
api_key=turn_route["runtime"].get("api_key"),
base_url=turn_route["runtime"].get("base_url"),
provider=turn_route["runtime"].get("provider"),
api_mode=turn_route["runtime"].get("api_mode"),
acp_command=turn_route["runtime"].get("command"),
acp_args=turn_route["runtime"].get("args"),
max_iterations=max_iterations,
reasoning_config=reasoning_config,
prefill_messages=prefill_messages,
providers_allowed=pr.get("only"),
providers_ignored=pr.get("ignore"),
providers_order=pr.get("order"),
provider_sort=pr.get("sort"),
disabled_toolsets=["cronjob", "messaging", "clarify"],
tool_choice="required",
quiet_mode=True,
skip_memory=True, # Cron system prompts would corrupt user representations
platform="cron",
session_id=_cron_session_id,
session_db=_session_db,
)
# Run the agent with an *inactivity*-based timeout: the job can run
# for hours if it's actively calling tools / receiving stream tokens,
@@ -920,30 +735,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
# Use a separate variable for log display; keep final_response clean
# for delivery logic (empty response = no delivery).
logged_response = final_response if final_response else "(No response generated)"
# Check for script failure — both explicit [SCRIPT_FAILED] marker
# and heuristic detection for failures described in natural language.
_script_failed_reason = _detect_script_failure(final_response)
if _script_failed_reason is not None:
logger.warning(
"Job '%s': agent reported script failure — %s",
job_name, _script_failed_reason,
)
output = f"""# Cron Job: {job_name} (SCRIPT FAILED)
**Job ID:** {job_id}
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Schedule:** {job.get('schedule_display', 'N/A')}
## Prompt
{prompt}
## Response
{logged_response}
"""
return False, output, final_response, _script_failed_reason
output = f"""# Cron Job: {job_name}

View File

@@ -412,6 +412,52 @@ class GatewayConfig:
return self.unauthorized_dm_behavior
def _validate_fallback_providers() -> None:
"""Validate fallback_providers from config.yaml at gateway startup.
Checks that each entry has 'provider' and 'model' fields and logs
warnings for malformed entries. This catches broken fallback chains
before they silently degrade into no-fallback mode.
"""
try:
_home = get_hermes_home()
_config_path = _home / "config.yaml"
if not _config_path.exists():
return
import yaml
with open(_config_path, encoding="utf-8") as _f:
_cfg = yaml.safe_load(_f) or {}
fbp = _cfg.get("fallback_providers")
if not fbp:
return
if not isinstance(fbp, list):
logger.warning(
"fallback_providers should be a YAML list, got %s. "
"Fallback chain will be disabled.",
type(fbp).__name__,
)
return
for i, entry in enumerate(fbp):
if not isinstance(entry, dict):
logger.warning(
"fallback_providers[%d] is not a dict (got %s). Skipping entry.",
i, type(entry).__name__,
)
continue
if not entry.get("provider"):
logger.warning(
"fallback_providers[%d] missing 'provider' field. Skipping entry.",
i,
)
if not entry.get("model"):
logger.warning(
"fallback_providers[%d] missing 'model' field. Skipping entry.",
i,
)
except Exception:
pass # Non-fatal; validation is advisory
def load_gateway_config() -> GatewayConfig:
"""
Load gateway configuration from multiple sources.
@@ -645,6 +691,19 @@ def load_gateway_config() -> GatewayConfig:
platform.value, env_name,
)
# Warn about API Server enabled without a key (unauthenticated endpoint)
if Platform.API_SERVER in config.platforms:
api_cfg = config.platforms[Platform.API_SERVER]
if api_cfg.enabled and not api_cfg.extra.get("key"):
logger.warning(
"api_server is enabled but API_SERVER_KEY is not set. "
"The API endpoint will run unauthenticated. "
"Set API_SERVER_KEY in ~/.hermes/.env to secure it.",
)
# Validate fallback_providers structure from config.yaml
_validate_fallback_providers()
return config

View File

@@ -973,7 +973,6 @@ class APIServerAdapter(BasePlatformAdapter):
pause_job as _cron_pause,
resume_job as _cron_resume,
trigger_job as _cron_trigger,
run_job_now as _cron_run_now,
)
# Wrap as staticmethod to prevent descriptor binding — these are plain
# module functions, not instance methods. Without this, self._cron_*()
@@ -987,7 +986,6 @@ class APIServerAdapter(BasePlatformAdapter):
_cron_pause = staticmethod(_cron_pause)
_cron_resume = staticmethod(_cron_resume)
_cron_trigger = staticmethod(_cron_trigger)
_cron_run_now = staticmethod(_cron_run_now)
_CRON_AVAILABLE = True
except ImportError:
pass
@@ -1206,28 +1204,6 @@ class APIServerAdapter(BasePlatformAdapter):
except Exception as e:
return web.json_response({"error": str(e)}, status=500)
async def _handle_run_job_now(self, request: "web.Request") -> "web.Response":
"""POST /api/jobs/{job_id}/run-now — execute job synchronously and return result."""
auth_err = self._check_auth(request)
if auth_err:
return auth_err
cron_err = self._check_jobs_available()
if cron_err:
return cron_err
job_id, id_err = self._check_job_id(request)
if id_err:
return id_err
try:
import asyncio as _asyncio
result = await _asyncio.get_event_loop().run_in_executor(
None, self._cron_run_now, job_id
)
if result is None:
return web.json_response({"error": "Job not found"}, status=404)
return web.json_response(result)
except Exception as e:
return web.json_response({"error": str(e)}, status=500)
# ------------------------------------------------------------------
# Output extraction helper
# ------------------------------------------------------------------
@@ -1589,7 +1565,6 @@ class APIServerAdapter(BasePlatformAdapter):
self._app.router.add_post("/api/jobs/{job_id}/pause", self._handle_pause_job)
self._app.router.add_post("/api/jobs/{job_id}/resume", self._handle_resume_job)
self._app.router.add_post("/api/jobs/{job_id}/run", self._handle_run_job)
self._app.router.add_post("/api/jobs/{job_id}/run-now", self._handle_run_job_now)
# Structured event streaming
self._app.router.add_post("/v1/runs", self._handle_runs)
self._app.router.add_get("/v1/runs/{run_id}/events", self._handle_run_events)

View File

@@ -1338,6 +1338,11 @@ _KNOWN_ROOT_KEYS = {
"fallback_providers", "credential_pool_strategies", "toolsets",
"agent", "terminal", "display", "compression", "delegation",
"auxiliary", "custom_providers", "memory", "gateway",
"session_reset", "browser", "checkpoints", "smart_model_routing",
"voice", "stt", "tts", "human_delay", "security", "privacy",
"cron", "logging", "approvals", "command_allowlist", "quick_commands",
"personalities", "skills", "honcho", "timezone", "discord",
"whatsapp", "prefill_messages_file", "file_read_max_chars",
}
# Valid fields inside a custom_providers list entry
@@ -1478,6 +1483,72 @@ def validate_config_structure(config: Optional[Dict[str, Any]] = None) -> List["
f"Move '{key}' under the appropriate section",
))
# ── fallback_providers must be a list of dicts with provider + model ─
fbp = config.get("fallback_providers")
if fbp is not None:
if not isinstance(fbp, list):
issues.append(ConfigIssue(
"error",
f"fallback_providers should be a YAML list, got {type(fbp).__name__}",
"Change to:\n"
" fallback_providers:\n"
" - provider: openrouter\n"
" model: google/gemini-3-flash-preview",
))
elif fbp:
for i, entry in enumerate(fbp):
if not isinstance(entry, dict):
issues.append(ConfigIssue(
"warning",
f"fallback_providers[{i}] is not a dict (got {type(entry).__name__})",
"Each entry needs at minimum: provider, model",
))
continue
if not entry.get("provider"):
issues.append(ConfigIssue(
"warning",
f"fallback_providers[{i}] is missing 'provider' field — this fallback will be skipped",
"Add: provider: openrouter (or another provider name)",
))
if not entry.get("model"):
issues.append(ConfigIssue(
"warning",
f"fallback_providers[{i}] is missing 'model' field — this fallback will be skipped",
"Add: model: google/gemini-3-flash-preview (or another model slug)",
))
# ── session_reset validation ─────────────────────────────────────────
session_reset = config.get("session_reset", {})
if isinstance(session_reset, dict):
idle_minutes = session_reset.get("idle_minutes")
if idle_minutes is not None:
if not isinstance(idle_minutes, (int, float)) or idle_minutes <= 0:
issues.append(ConfigIssue(
"warning",
f"session_reset.idle_minutes={idle_minutes} is invalid (must be a positive number)",
"Set to a positive integer, e.g. 1440 (24 hours). Using 0 causes immediate resets.",
))
at_hour = session_reset.get("at_hour")
if at_hour is not None:
if not isinstance(at_hour, (int, float)) or not (0 <= at_hour <= 23):
issues.append(ConfigIssue(
"warning",
f"session_reset.at_hour={at_hour} is invalid (must be 0-23)",
"Set to an hour between 0 and 23, e.g. 4 for 4am",
))
# ── API Server key check ─────────────────────────────────────────────
# If api_server is enabled via env, but no key is set, warn.
# This catches the "API_SERVER_KEY not configured" error from gateway logs.
api_server_enabled = os.getenv("API_SERVER_ENABLED", "").lower() in ("true", "1", "yes")
api_server_key = os.getenv("API_SERVER_KEY", "").strip()
if api_server_enabled and not api_server_key:
issues.append(ConfigIssue(
"warning",
"API_SERVER is enabled but API_SERVER_KEY is not set — the API server will run unauthenticated",
"Set API_SERVER_KEY in ~/.hermes/.env to secure the API endpoint",
))
return issues

View File

@@ -221,31 +221,7 @@ def cron_edit(args):
return 0
def _job_action(action: str, job_id: str, success_verb: str, now: bool = False) -> int:
if action == "run" and now:
# Synchronous execution — run job immediately and show result
result = _cron_api(action="run_now", job_id=job_id)
if not result.get("success"):
if result.get("error"):
print(color(f"Failed to run job now: {result['error']}", Colors.RED))
else:
print(color(f"Failed to run job now: {result.get('error', 'unknown error')}", Colors.RED))
return 1
job = result.get("job", {})
success = result.get("success", False)
error = result.get("error")
final_response = result.get("final_response", "")
name = job.get("name", job_id)
if success:
print(color(f"Job '{name}' completed successfully", Colors.GREEN))
else:
print(color(f"Job '{name}' failed: {error}", Colors.RED))
if final_response:
print(f"\n{final_response}\n")
if not error:
print(color("Stale error state cleared.", Colors.GREEN))
return 0 if success else 1
def _job_action(action: str, job_id: str, success_verb: str) -> int:
result = _cron_api(action=action, job_id=job_id)
if not result.get("success"):
print(color(f"Failed to {action} job: {result.get('error', 'unknown error')}", Colors.RED))
@@ -289,8 +265,7 @@ def cron_command(args):
return _job_action("resume", args.job_id, "Resumed")
if subcmd == "run":
now = getattr(args, 'now', False)
return _job_action("run", args.job_id, "Triggered", now=now)
return _job_action("run", args.job_id, "Triggered")
if subcmd in {"remove", "rm", "delete"}:
return _job_action("remove", args.job_id, "Removed")

View File

@@ -4574,7 +4574,6 @@ For more help on a command:
cron_run = cron_subparsers.add_parser("run", help="Run a job on the next scheduler tick")
cron_run.add_argument("job_id", help="Job ID to trigger")
cron_run.add_argument("--now", action="store_true", help="Execute immediately and wait for result (clears stale errors)")
cron_remove = cron_subparsers.add_parser("remove", aliases=["rm", "delete"], help="Remove a scheduled job")
cron_remove.add_argument("job_id", help="Job ID to remove")
@@ -5005,7 +5004,7 @@ For more help on a command:
# =========================================================================
sessions_parser = subparsers.add_parser(
"sessions",
help="Manage session history (list, rename, export, prune, delete)",
help="Manage session history (list, rename, export, prune, gc, delete)",
description="View and manage the SQLite session store"
)
sessions_subparsers = sessions_parser.add_subparsers(dest="sessions_action")
@@ -5028,6 +5027,14 @@ For more help on a command:
sessions_prune.add_argument("--source", help="Only prune sessions from this source")
sessions_prune.add_argument("--yes", "-y", action="store_true", help="Skip confirmation")
sessions_gc = sessions_subparsers.add_parser("gc", help="Garbage-collect empty/trivial sessions")
sessions_gc.add_argument("--empty-hours", type=int, default=24, help="Delete empty (0-msg) sessions older than N hours (default: 24)")
sessions_gc.add_argument("--trivial-days", type=int, default=7, help="Delete trivial (1-5 msg) sessions older than N days (default: 7)")
sessions_gc.add_argument("--trivial-max", type=int, default=5, help="Max messages to consider trivial (default: 5)")
sessions_gc.add_argument("--source", help="Only GC sessions from this source")
sessions_gc.add_argument("--dry-run", action="store_true", help="Show what would be deleted without deleting")
sessions_gc.add_argument("--yes", "-y", action="store_true", help="Skip confirmation")
sessions_stats = sessions_subparsers.add_parser("stats", help="Show session store statistics")
sessions_rename = sessions_subparsers.add_parser("rename", help="Set or change a session's title")
@@ -5197,6 +5204,49 @@ For more help on a command:
size_mb = os.path.getsize(db_path) / (1024 * 1024)
print(f"Database size: {size_mb:.1f} MB")
elif action == "gc":
dry_run = getattr(args, "dry_run", False)
if dry_run:
counts = db.garbage_collect(
empty_older_than_hours=args.empty_hours,
trivial_max_messages=args.trivial_max,
trivial_older_than_days=args.trivial_days,
source=args.source,
dry_run=True,
)
print(f"[dry-run] Would delete {counts['total']} session(s):")
print(f" Empty (0 msgs, >{args.empty_hours}h old): {counts['empty']}")
print(f" Trivial (<={args.trivial_max} msgs, >{args.trivial_days}d old): {counts['trivial']}")
else:
# Preview first
preview = db.garbage_collect(
empty_older_than_hours=args.empty_hours,
trivial_max_messages=args.trivial_max,
trivial_older_than_days=args.trivial_days,
source=args.source,
dry_run=True,
)
if preview["total"] == 0:
print("Nothing to collect.")
else:
if not args.yes:
if not _confirm_prompt(
f"Delete {preview['total']} session(s) "
f"({preview['empty']} empty, {preview['trivial']} trivial)? [y/N] "
):
print("Cancelled.")
return
counts = db.garbage_collect(
empty_older_than_hours=args.empty_hours,
trivial_max_messages=args.trivial_max,
trivial_older_than_days=args.trivial_days,
source=args.source,
dry_run=False,
)
print(f"Collected {counts['total']} session(s):")
print(f" Empty: {counts['empty']}")
print(f" Trivial: {counts['trivial']}")
else:
sessions_parser.print_help()

View File

@@ -1303,3 +1303,78 @@ class SessionDB:
return len(session_ids)
return self._execute_write(_do)
def garbage_collect(
self,
empty_older_than_hours: int = 24,
trivial_max_messages: int = 5,
trivial_older_than_days: int = 7,
source: str = None,
dry_run: bool = False,
) -> Dict[str, int]:
"""Delete empty and trivial sessions based on age.
Policy (matches #315):
- Empty sessions (0 messages) older than ``empty_older_than_hours``
- Trivial sessions (1..``trivial_max_messages`` msgs) older than
``trivial_older_than_days``
- Sessions with more than ``trivial_max_messages`` are kept indefinitely
- Active (not ended) sessions are never deleted
Returns a dict with counts: ``empty``, ``trivial``, ``total``.
"""
now = time.time()
empty_cutoff = now - (empty_older_than_hours * 3600)
trivial_cutoff = now - (trivial_older_than_days * 86400)
def _do(conn):
# --- Find empty sessions ---
empty_q = (
"SELECT id FROM sessions "
"WHERE message_count = 0 AND started_at < ? AND ended_at IS NOT NULL"
)
params = [empty_cutoff]
if source:
empty_q += " AND source = ?"
params.append(source)
empty_ids = [r[0] for r in conn.execute(empty_q, params).fetchall()]
# --- Find trivial sessions ---
trivial_q = (
"SELECT id FROM sessions "
"WHERE message_count BETWEEN 1 AND ? AND started_at < ? AND ended_at IS NOT NULL"
)
t_params = [trivial_max_messages, trivial_cutoff]
if source:
trivial_q += " AND source = ?"
t_params.append(source)
trivial_ids = [r[0] for r in conn.execute(trivial_q, t_params).fetchall()]
all_ids = set(empty_ids) | set(trivial_ids)
if dry_run:
return {"empty": len(empty_ids), "trivial": len(trivial_ids),
"total": len(all_ids)}
# --- Collect child sessions to delete first (FK constraint) ---
child_ids = set()
for sid in all_ids:
for r in conn.execute(
"SELECT id FROM sessions WHERE parent_session_id = ?", (sid,)
).fetchall():
child_ids.add(r[0])
# Delete children
for cid in child_ids:
conn.execute("DELETE FROM messages WHERE session_id = ?", (cid,))
conn.execute("DELETE FROM sessions WHERE id = ?", (cid,))
# Delete targets
for sid in all_ids:
conn.execute("DELETE FROM messages WHERE session_id = ?", (sid,))
conn.execute("DELETE FROM sessions WHERE id = ?", (sid,))
return {"empty": len(empty_ids), "trivial": len(trivial_ids),
"total": len(all_ids)}
return self._execute_write(_do)

View File

@@ -172,13 +172,13 @@ class MemPalacePortalProvider(MemoryProvider):
# Extract key facts for Hologram
if self._hologram_store:
# Simple heuristic: look for "I prefer", "The project uses", etc.
# Simple heuristic: look for \"I prefer\", \"The project uses\", etc.
for msg in messages:
if msg.get("role") == "user":
content = msg.get("content", "")
if "prefer" in content.lower() or "use" in content.lower():
if msg.get(\"role\") == \"user\":
content = msg.get(\"content\", \"\")
if \"prefer\" in content.lower() or \"use\" in content.lower():
try:
self._hologram_store.add_fact(content[:200], category="user_pref")
self._hologram_store.add_fact(content[:200], category=\"user_pref\")
except Exception:
pass

View File

@@ -7,7 +7,7 @@ from unittest.mock import AsyncMock, patch, MagicMock
import pytest
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, run_job, SILENT_MARKER, _build_job_prompt, _check_model_context_compat, ModelContextError, CRON_MIN_CONTEXT_TOKENS
from cron.scheduler import _resolve_origin, _resolve_delivery_target, _deliver_result, run_job, SILENT_MARKER, _build_job_prompt
class TestResolveOrigin:
@@ -863,141 +863,3 @@ class TestTickAdvanceBeforeRun:
adv_mock.assert_called_once_with("test-advance")
# advance must happen before run
assert call_order == [("advance", "test-advance"), ("run", "test-advance")]
class TestDeploySyncGuard:
"""Tests for _validate_agent_interface() — the deploy sync guard."""
def test_passes_when_all_params_present(self):
"""Validation passes when AIAgent accepts every scheduler kwarg."""
from cron.scheduler import _validate_agent_interface, _agent_interface_validated
import cron.scheduler as sched_mod
# Reset the cached flag so the check actually runs.
sched_mod._agent_interface_validated = False
# Should not raise.
_validate_agent_interface()
assert sched_mod._agent_interface_validated is True
def test_fails_when_param_missing(self):
"""Validation raises RuntimeError when AIAgent is missing a required param."""
import cron.scheduler as sched_mod
from unittest.mock import MagicMock
import inspect
# Save and restore.
orig_flag = sched_mod._agent_interface_validated
try:
sched_mod._agent_interface_validated = False
# Build a fake AIAgent class whose __init__ lacks 'tool_choice'.
class FakeAIAgent:
def __init__(self, model="", max_iterations=90, quiet_mode=False,
disabled_toolsets=None, skip_memory=False, platform=None,
session_id=None, session_db=None):
pass
fake_module = MagicMock()
fake_module.AIAgent = FakeAIAgent
with pytest.raises(RuntimeError, match="Missing parameters: tool_choice"):
with patch.dict("sys.modules", {"run_agent": fake_module}):
sched_mod._validate_agent_interface()
finally:
sched_mod._agent_interface_validated = orig_flag
def test_cached_after_first_run(self):
"""Second call is a no-op (uses cached flag)."""
import cron.scheduler as sched_mod
sched_mod._agent_interface_validated = True
# Should not raise even if we somehow break AIAgent — the flag is set.
sched_mod._validate_agent_interface()
# No exception = pass.
class TestCheckModelContextCompat:
"""Tests for _check_model_context_compat() preflight validation."""
def test_raises_when_context_below_minimum(self):
"""ModelContextError raised when detected context < CRON_MIN_CONTEXT_TOKENS."""
with patch(
"cron.scheduler.get_model_context_length" if False else "agent.model_metadata.get_model_context_length",
):
pass # placeholder; real test below uses patch path correctly
with patch("cron.scheduler._check_model_context_compat") as mock_check:
mock_check.side_effect = ModelContextError(
"Model 'gemma4:latest' has a context window of 8,192 tokens, "
"which is below the minimum 64,000 required by Hermes Agent."
)
with pytest.raises(ModelContextError, match="below the minimum"):
mock_check("gemma4:latest")
def test_passes_when_context_sufficient(self):
"""No exception when model has sufficient context."""
with patch(
"agent.model_metadata.get_model_context_length",
return_value=128_000,
):
# Should not raise
_check_model_context_compat("claude-sonnet-4-6", base_url="", api_key="")
def test_passes_with_config_override_below_minimum(self):
"""When config_context_length is set, check is skipped regardless of detected size."""
with patch(
"agent.model_metadata.get_model_context_length",
return_value=8_192, # Would normally fail
) as mock_get:
# config_context_length override skips the check
_check_model_context_compat(
"gemma4:latest",
config_context_length=8_192,
)
# get_model_context_length should NOT be called — check is bypassed
mock_get.assert_not_called()
def test_raises_model_context_error_subclass_of_value_error(self):
"""ModelContextError is a subclass of ValueError."""
assert issubclass(ModelContextError, ValueError)
def test_detection_failure_is_non_fatal(self):
"""If context length detection raises, the check is skipped (fail-open)."""
with patch(
"agent.model_metadata.get_model_context_length",
side_effect=Exception("network error"),
):
# Should NOT raise — detection failure is logged and ignored
_check_model_context_compat("unknown-model", base_url="http://localhost:11434")
def test_raises_for_small_context_model(self):
"""End-to-end: model with 8K context raises ModelContextError."""
with patch(
"agent.model_metadata.get_model_context_length",
return_value=8_192,
):
with pytest.raises(ModelContextError) as exc_info:
_check_model_context_compat("gemma4:latest")
err = str(exc_info.value)
assert "gemma4:latest" in err
assert "8,192" in err
assert str(CRON_MIN_CONTEXT_TOKENS) in err or "64,000" in err
assert "config.yaml" in err
def test_boundary_exactly_at_minimum_passes(self):
"""A model with exactly CRON_MIN_CONTEXT_TOKENS context is accepted."""
with patch(
"agent.model_metadata.get_model_context_length",
return_value=CRON_MIN_CONTEXT_TOKENS,
):
# Should not raise
_check_model_context_compat("borderline-model")
def test_boundary_one_below_minimum_raises(self):
"""A model with context length one below the minimum is rejected."""
with patch(
"agent.model_metadata.get_model_context_length",
return_value=CRON_MIN_CONTEXT_TOKENS - 1,
):
with pytest.raises(ModelContextError):
_check_model_context_compat("borderline-model")

View File

@@ -172,3 +172,111 @@ class TestConfigIssueDataclass:
a = ConfigIssue("error", "msg", "hint")
b = ConfigIssue("error", "msg", "hint")
assert a == b
class TestFallbackProvidersValidation:
"""fallback_providers must be a list of dicts with provider + model."""
def test_non_list(self):
"""fallback_providers as string should error."""
issues = validate_config_structure({
"fallback_providers": "openrouter:google/gemini-3-flash-preview",
})
errors = [i for i in issues if i.severity == "error"]
assert any("fallback_providers" in i.message and "list" in i.message for i in errors)
def test_dict_instead_of_list(self):
"""fallback_providers as dict should error."""
issues = validate_config_structure({
"fallback_providers": {"provider": "openrouter", "model": "test"},
})
errors = [i for i in issues if i.severity == "error"]
assert any("fallback_providers" in i.message and "dict" in i.message for i in errors)
def test_entry_missing_provider(self):
"""Entry without provider should warn."""
issues = validate_config_structure({
"fallback_providers": [{"model": "google/gemini-3-flash-preview"}],
})
assert any("missing 'provider'" in i.message for i in issues)
def test_entry_missing_model(self):
"""Entry without model should warn."""
issues = validate_config_structure({
"fallback_providers": [{"provider": "openrouter"}],
})
assert any("missing 'model'" in i.message for i in issues)
def test_entry_not_dict(self):
"""Non-dict entries should warn."""
issues = validate_config_structure({
"fallback_providers": ["not-a-dict"],
})
assert any("not a dict" in i.message for i in issues)
def test_valid_entries(self):
"""Valid fallback_providers should produce no fallback-related issues."""
issues = validate_config_structure({
"fallback_providers": [
{"provider": "openrouter", "model": "google/gemini-3-flash-preview"},
{"provider": "gemini", "model": "gemini-2.5-flash"},
],
})
fb_issues = [i for i in issues if "fallback_providers" in i.message]
assert len(fb_issues) == 0
def test_empty_list_no_issues(self):
"""Empty list is valid (fallback disabled)."""
issues = validate_config_structure({
"fallback_providers": [],
})
fb_issues = [i for i in issues if "fallback_providers" in i.message]
assert len(fb_issues) == 0
class TestSessionResetValidation:
"""session_reset.idle_minutes must be positive."""
def test_zero_idle_minutes(self):
"""idle_minutes=0 should warn."""
issues = validate_config_structure({
"session_reset": {"idle_minutes": 0},
})
assert any("idle_minutes=0" in i.message for i in issues)
def test_negative_idle_minutes(self):
"""idle_minutes=-5 should warn."""
issues = validate_config_structure({
"session_reset": {"idle_minutes": -5},
})
assert any("idle_minutes=-5" in i.message for i in issues)
def test_string_idle_minutes(self):
"""idle_minutes as string should warn."""
issues = validate_config_structure({
"session_reset": {"idle_minutes": "abc"},
})
assert any("idle_minutes=" in i.message for i in issues)
def test_valid_idle_minutes(self):
"""Valid idle_minutes should not warn."""
issues = validate_config_structure({
"session_reset": {"idle_minutes": 1440},
})
idle_issues = [i for i in issues if "idle_minutes" in i.message]
assert len(idle_issues) == 0
def test_invalid_at_hour(self):
"""at_hour=25 should warn."""
issues = validate_config_structure({
"session_reset": {"at_hour": 25},
})
assert any("at_hour=25" in i.message for i in issues)
def test_valid_at_hour(self):
"""Valid at_hour should not warn."""
issues = validate_config_structure({
"session_reset": {"at_hour": 4},
})
hour_issues = [i for i in issues if "at_hour" in i.message]
assert len(hour_issues) == 0

View File

@@ -665,6 +665,127 @@ class TestPruneSessions:
# =========================================================================
# =========================================================================
# Garbage Collect
# =========================================================================
class TestGarbageCollect:
def test_gc_deletes_empty_old_sessions(self, db):
"""Empty sessions (0 messages) older than 24h should be deleted."""
db.create_session(session_id="empty_old", source="cli")
db.end_session("empty_old", end_reason="done")
db._conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 48 * 3600, "empty_old"), # 48 hours ago
)
db._conn.commit()
# Recent empty session should be kept
db.create_session(session_id="empty_new", source="cli")
db.end_session("empty_new", end_reason="done")
result = db.garbage_collect()
assert result["empty"] == 1
assert result["trivial"] == 0
assert result["total"] == 1
assert db.get_session("empty_old") is None
assert db.get_session("empty_new") is not None
def test_gc_deletes_trivial_old_sessions(self, db):
"""Sessions with 1-5 messages older than 7 days should be deleted."""
db.create_session(session_id="trivial_old", source="cli")
for i in range(3):
db.append_message("trivial_old", role="user", content=f"msg {i}")
db.end_session("trivial_old", end_reason="done")
db._conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 10 * 86400, "trivial_old"), # 10 days ago
)
db._conn.commit()
result = db.garbage_collect()
assert result["trivial"] == 1
assert result["total"] == 1
assert db.get_session("trivial_old") is None
def test_gc_keeps_active_sessions(self, db):
"""Active (not ended) sessions should never be deleted."""
db.create_session(session_id="active_old", source="cli")
# Backdate but don't end
db._conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 48 * 3600, "active_old"),
)
db._conn.commit()
result = db.garbage_collect()
assert result["total"] == 0
assert db.get_session("active_old") is not None
def test_gc_keeps_substantial_sessions(self, db):
"""Sessions with >5 messages should never be deleted."""
db.create_session(session_id="big_old", source="cli")
for i in range(10):
db.append_message("big_old", role="user", content=f"msg {i}")
db.end_session("big_old", end_reason="done")
db._conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 365 * 86400, "big_old"), # 1 year ago
)
db._conn.commit()
result = db.garbage_collect()
assert result["total"] == 0
assert db.get_session("big_old") is not None
def test_gc_dry_run_does_not_delete(self, db):
"""dry_run=True should return counts but not delete anything."""
db.create_session(session_id="empty_old", source="cli")
db.end_session("empty_old", end_reason="done")
db._conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 48 * 3600, "empty_old"),
)
db._conn.commit()
result = db.garbage_collect(dry_run=True)
assert result["total"] == 1
assert db.get_session("empty_old") is not None # Still exists
def test_gc_with_source_filter(self, db):
"""--source should only GC sessions from that source."""
for sid, src in [("old_cli", "cli"), ("old_tg", "telegram")]:
db.create_session(session_id=sid, source=src)
db.end_session(sid, end_reason="done")
db._conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 48 * 3600, sid),
)
db._conn.commit()
result = db.garbage_collect(source="cli")
assert result["total"] == 1
assert db.get_session("old_cli") is None
assert db.get_session("old_tg") is not None
def test_gc_handles_child_sessions(self, db):
"""Child sessions should be deleted when parent is GC'd."""
db.create_session(session_id="parent_old", source="cli")
db.end_session("parent_old", end_reason="done")
db._conn.execute(
"UPDATE sessions SET started_at = ? WHERE id = ?",
(time.time() - 48 * 3600, "parent_old"),
)
# Create child session
db.create_session(session_id="child", source="cli", parent_session_id="parent_old")
db.end_session("child", end_reason="done")
db._conn.commit()
result = db.garbage_collect()
assert result["total"] == 1
assert db.get_session("parent_old") is None
assert db.get_session("child") is None
# Schema and WAL mode
# =========================================================================

View File

@@ -144,8 +144,7 @@ class TestMemoryStoreReplace:
def test_replace_no_match(self, store):
store.add("memory", "fact A")
result = store.replace("memory", "nonexistent", "new")
assert result["success"] is True
assert result["result"] == "no_match"
assert result["success"] is False
def test_replace_ambiguous_match(self, store):
store.add("memory", "server A runs nginx")
@@ -178,8 +177,7 @@ class TestMemoryStoreRemove:
def test_remove_no_match(self, store):
result = store.remove("memory", "nonexistent")
assert result["success"] is True
assert result["result"] == "no_match"
assert result["success"] is False
def test_remove_empty_old_text(self, store):
result = store.remove("memory", " ")

View File

@@ -1,107 +0,0 @@
"""Tests for syntax preflight check in execute_code (issue #312)."""
import ast
import json
import pytest
class TestSyntaxPreflight:
"""Verify that execute_code catches syntax errors before sandbox execution."""
def test_valid_syntax_passes_parse(self):
"""Valid Python should pass ast.parse."""
code = "print('hello')\nx = 1 + 2\n"
ast.parse(code) # should not raise
def test_syntax_error_indentation(self):
"""IndentationError is a subclass of SyntaxError."""
code = "def foo():\nbar()\n"
with pytest.raises(SyntaxError):
ast.parse(code)
def test_syntax_error_missing_colon(self):
code = "if True\n pass\n"
with pytest.raises(SyntaxError):
ast.parse(code)
def test_syntax_error_unmatched_paren(self):
code = "x = (1 + 2\n"
with pytest.raises(SyntaxError):
ast.parse(code)
def test_syntax_error_invalid_token(self):
code = "x = 1 +*\n"
with pytest.raises(SyntaxError):
ast.parse(code)
def test_syntax_error_details(self):
"""SyntaxError should provide line, offset, msg."""
code = "if True\n pass\n"
with pytest.raises(SyntaxError) as exc_info:
ast.parse(code)
e = exc_info.value
assert e.lineno is not None
assert e.msg is not None
def test_empty_string_passes(self):
"""Empty string is valid Python (empty module)."""
ast.parse("")
def test_comments_only_passes(self):
ast.parse("# just a comment\n# another\n")
def test_complex_valid_code(self):
code = '''
import os
def foo(x):
if x > 0:
return x * 2
return 0
result = [foo(i) for i in range(10)]
print(result)
'''
ast.parse(code)
class TestSyntaxPreflightResponse:
"""Test the error response format from the preflight check."""
def _check_syntax(self, code):
"""Mimic the preflight check logic from execute_code."""
try:
ast.parse(code)
return None
except SyntaxError as e:
return json.dumps({
"error": f"Python syntax error: {e.msg}",
"line": e.lineno,
"offset": e.offset,
"text": (e.text or "").strip()[:200],
})
def test_returns_json_error(self):
result = self._check_syntax("if True\n pass\n")
assert result is not None
data = json.loads(result)
assert "error" in data
assert "syntax error" in data["error"].lower()
def test_includes_line_number(self):
result = self._check_syntax("x = 1\nif True\n pass\n")
data = json.loads(result)
assert data["line"] == 2 # error on line 2
def test_includes_offset(self):
result = self._check_syntax("x = (1 + 2\n")
data = json.loads(result)
assert data["offset"] is not None
def test_includes_snippet(self):
result = self._check_syntax("if True\n")
data = json.loads(result)
assert "if True" in data["text"]
def test_none_for_valid_code(self):
result = self._check_syntax("print('ok')")
assert result is None

View File

@@ -28,7 +28,6 @@ Platform: Linux / macOS only (Unix domain sockets for local). Disabled on Window
Remote execution additionally requires Python 3 in the terminal backend.
"""
import ast
import base64
import json
import logging
@@ -894,20 +893,6 @@ def execute_code(
if not code or not code.strip():
return json.dumps({"error": "No code provided."})
# Poka-yoke (#312): Syntax check before execution.
# 83.2% of execute_code errors are Python exceptions; most are syntax
# errors the LLM generated. ast.parse() is sub-millisecond and catches
# them before we spin up a sandbox child process.
try:
ast.parse(code)
except SyntaxError as e:
return json.dumps({
"error": f"Python syntax error: {e.msg}",
"line": e.lineno,
"offset": e.offset,
"text": (e.text or "").strip()[:200],
})
# Dispatch: remote backends use file-based RPC, local uses UDS
from tools.terminal_tool import _get_env_config
env_type = _get_env_config()["env_type"]

View File

@@ -316,17 +316,10 @@ def cronjob(
updated = resume_job(job_id)
return json.dumps({"success": True, "job": _format_job(updated)}, indent=2)
if normalized in {"run", "trigger"}:
if normalized in {"run", "run_now", "trigger"}:
updated = trigger_job(job_id)
return json.dumps({"success": True, "job": _format_job(updated)}, indent=2)
if normalized == "run_now":
from cron.jobs import run_job_now
result = run_job_now(job_id)
if result is None:
return json.dumps({"success": False, "error": "Job not found"}, indent=2)
return json.dumps(result, indent=2)
if normalized == "update":
updates: Dict[str, Any] = {}
if prompt is not None:

View File

@@ -260,12 +260,8 @@ class MemoryStore:
entries = self._entries_for(target)
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
if not matches:
return {
"success": True,
"result": "no_match",
"message": f"No entry matched '{old_text}'. The search substring was not found in any existing entry.",
}
if len(matches) == 0:
return {"success": False, "error": f"No entry matched '{old_text}'."}
if len(matches) > 1:
# If all matches are identical (exact duplicates), operate on the first one
@@ -314,12 +310,8 @@ class MemoryStore:
entries = self._entries_for(target)
matches = [(i, e) for i, e in enumerate(entries) if old_text in e]
if not matches:
return {
"success": True,
"result": "no_match",
"message": f"No entry matched '{old_text}'. The search substring was not found in any existing entry.",
}
if len(matches) == 0:
return {"success": False, "error": f"No entry matched '{old_text}'."}
if len(matches) > 1:
# If all matches are identical (exact duplicates), remove the first one
@@ -457,30 +449,30 @@ def memory_tool(
Returns JSON string with results.
"""
if store is None:
return tool_error("Memory is not available. It may be disabled in config or this environment.", success=False)
return json.dumps({"success": False, "error": "Memory is not available. It may be disabled in config or this environment."}, ensure_ascii=False)
if target not in ("memory", "user"):
return tool_error(f"Invalid target '{target}'. Use 'memory' or 'user'.", success=False)
return json.dumps({"success": False, "error": f"Invalid target '{target}'. Use 'memory' or 'user'."}, ensure_ascii=False)
if action == "add":
if not content:
return tool_error("Content is required for 'add' action.", success=False)
return json.dumps({"success": False, "error": "Content is required for 'add' action."}, ensure_ascii=False)
result = store.add(target, content)
elif action == "replace":
if not old_text:
return tool_error("old_text is required for 'replace' action.", success=False)
return json.dumps({"success": False, "error": "old_text is required for 'replace' action."}, ensure_ascii=False)
if not content:
return tool_error("content is required for 'replace' action.", success=False)
return json.dumps({"success": False, "error": "content is required for 'replace' action."}, ensure_ascii=False)
result = store.replace(target, old_text, content)
elif action == "remove":
if not old_text:
return tool_error("old_text is required for 'remove' action.", success=False)
return json.dumps({"success": False, "error": "old_text is required for 'remove' action."}, ensure_ascii=False)
result = store.remove(target, old_text)
else:
return tool_error(f"Unknown action '{action}'. Use: add, replace, remove", success=False)
return json.dumps({"success": False, "error": f"Unknown action '{action}'. Use: add, replace, remove"}, ensure_ascii=False)
return json.dumps(result, ensure_ascii=False)
@@ -547,7 +539,7 @@ MEMORY_SCHEMA = {
# --- Registry ---
from tools.registry import registry, tool_error
from tools.registry import registry
registry.register(
name="memory",