diff --git a/cron/scheduler.py b/cron/scheduler.py index e6db77c09..b6c2dad43 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -596,6 +596,41 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: origin = _resolve_origin(job) _cron_session_id = f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}" + # Check for checkpoint from previous timeout — resume where we left off + try: + _checkpoint_dir = _hermes_home / "cron" / "checkpoints" + _checkpoint_path = _checkpoint_dir / f"{job_id}.json" + if _checkpoint_path.exists(): + import json as _json + _cp = _json.loads(_checkpoint_path.read_text()) + _cp_age = (_hermes_now() - _parse_dt(_cp["saved_at"])).total_seconds() if _cp.get("saved_at") else 9999 + if _cp_age < 7200: # Only resume if checkpoint is <2 hours old + _cp_context = ( + f"\n\n[CHECKPOINT: This job timed out previously. " + f"Iterations completed: {_cp.get('iterations_completed', '?')}. " + f"Last activity: {_cp.get('last_activity', '?')}. " + f"Continue from where you left off. Do not repeat work already done.]\n" + ) + # Add last conversation context if available + _conv = _cp.get("conversation_history", []) + if _conv: + _cp_context += "\n[Previous work summary from last run:]\n" + for _msg in _conv[-5:]: + _role = _msg.get("role", "?") + _content = str(_msg.get("content", ""))[:300] + if _role in ("assistant", "tool"): + _cp_context += f" {_role}: {_content}\n" + prompt = prompt + _cp_context + logger.info("Job '%s': resuming from checkpoint (%.0fs old, %d iterations)", + job_id, _cp_age, _cp.get("iterations_completed", 0)) + # Clear the checkpoint — we're resuming it now + _checkpoint_path.unlink() + else: + logger.info("Job '%s': checkpoint too old (%.0fs), starting fresh", job_id, _cp_age) + _checkpoint_path.unlink() + except Exception as _cp_err: + logger.debug("Job '%s': checkpoint resume failed: %s", job_id, _cp_err) + logger.info("Running job '%s' (ID: %s)", job_name, job_id) logger.info("Prompt: %s", prompt[:100]) @@ -762,7 +797,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: # # Uses the agent's built-in activity tracker (updated by # _touch_activity() on every tool call, API call, and stream delta). - _cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600)) + _cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 900)) _cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None _POLL_INTERVAL = 5.0 _cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) @@ -812,6 +847,30 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: _iter_n = _activity.get("api_call_count", 0) _iter_max = _activity.get("max_iterations", 0) + # Save checkpoint before killing — next run can resume + try: + _checkpoint_dir = _hermes_home / "cron" / "checkpoints" + _checkpoint_dir.mkdir(parents=True, exist_ok=True) + _checkpoint_path = _checkpoint_dir / f"{job_id}.json" + _conv_history = [] + if hasattr(agent, "conversation_history"): + _conv_history = agent.conversation_history + elif hasattr(agent, "_session_messages"): + _conv_history = agent._session_messages + _checkpoint = { + "job_id": job_id, + "job_name": job_name, + "saved_at": _hermes_now().isoformat(), + "iterations_completed": _iter_n, + "last_activity": _last_desc, + "conversation_history": _conv_history[-20:] if _conv_history else [], + } + import json as _json + _checkpoint_path.write_text(_json.dumps(_checkpoint, indent=2, default=str)) + logger.info("Job '%s': checkpoint saved (%d iterations)", job_id, _iter_n) + except Exception as _cp_err: + logger.warning("Job '%s': failed to save checkpoint: %s", job_id, _cp_err) + logger.error( "Job '%s' idle for %.0fs (inactivity limit %.0fs) " "| last_activity=%s | iteration=%s/%s | tool=%s", diff --git a/tools/code_execution_tool.py b/tools/code_execution_tool.py index 0eb1cd2e4..b1d3c059c 100644 --- a/tools/code_execution_tool.py +++ b/tools/code_execution_tool.py @@ -33,6 +33,7 @@ import json import logging import os import platform +import re import shlex import signal import socket