diff --git a/cron/jobs.py b/cron/jobs.py index 214da521f..98d0afc82 100644 --- a/cron/jobs.py +++ b/cron/jobs.py @@ -563,6 +563,44 @@ def trigger_job(job_id: str) -> Optional[Dict[str, Any]]: ) +def run_job_now(job_id: str) -> Optional[Dict[str, Any]]: + """ + Execute a job immediately and persist fresh state. + + Unlike trigger_job() which queues for the next scheduler tick, + this runs the job synchronously and returns the result. + Clears stale error state on success. + + Returns: + Dict with 'job', 'success', 'output', 'error' keys, or None if not found. + """ + job = get_job(job_id) + if not job: + return None + + try: + from cron.scheduler import run_job as _run_job + except ImportError as exc: + return { + "job": job, + "success": False, + "output": None, + "error": f"Cannot import scheduler: {exc}", + } + + success, output, final_response, error = _run_job(job) + mark_job_run(job_id, success, error) + + updated_job = get_job(job_id) or job + return { + "job": updated_job, + "success": success, + "output": output, + "final_response": final_response, + "error": error, + } + + def remove_job(job_id: str) -> bool: """Remove a job by ID.""" jobs = load_jobs() diff --git a/gateway/platforms/api_server.py b/gateway/platforms/api_server.py index 7ced55c1e..46b7c6836 100644 --- a/gateway/platforms/api_server.py +++ b/gateway/platforms/api_server.py @@ -973,6 +973,7 @@ class APIServerAdapter(BasePlatformAdapter): pause_job as _cron_pause, resume_job as _cron_resume, trigger_job as _cron_trigger, + run_job_now as _cron_run_now, ) # Wrap as staticmethod to prevent descriptor binding — these are plain # module functions, not instance methods. Without this, self._cron_*() @@ -986,6 +987,7 @@ class APIServerAdapter(BasePlatformAdapter): _cron_pause = staticmethod(_cron_pause) _cron_resume = staticmethod(_cron_resume) _cron_trigger = staticmethod(_cron_trigger) + _cron_run_now = staticmethod(_cron_run_now) _CRON_AVAILABLE = True except ImportError: pass @@ -1204,6 +1206,28 @@ class APIServerAdapter(BasePlatformAdapter): except Exception as e: return web.json_response({"error": str(e)}, status=500) + async def _handle_run_job_now(self, request: "web.Request") -> "web.Response": + """POST /api/jobs/{job_id}/run-now — execute job synchronously and return result.""" + auth_err = self._check_auth(request) + if auth_err: + return auth_err + cron_err = self._check_jobs_available() + if cron_err: + return cron_err + job_id, id_err = self._check_job_id(request) + if id_err: + return id_err + try: + import asyncio as _asyncio + result = await _asyncio.get_event_loop().run_in_executor( + None, self._cron_run_now, job_id + ) + if result is None: + return web.json_response({"error": "Job not found"}, status=404) + return web.json_response(result) + except Exception as e: + return web.json_response({"error": str(e)}, status=500) + # ------------------------------------------------------------------ # Output extraction helper # ------------------------------------------------------------------ @@ -1565,6 +1589,7 @@ class APIServerAdapter(BasePlatformAdapter): self._app.router.add_post("/api/jobs/{job_id}/pause", self._handle_pause_job) self._app.router.add_post("/api/jobs/{job_id}/resume", self._handle_resume_job) self._app.router.add_post("/api/jobs/{job_id}/run", self._handle_run_job) + self._app.router.add_post("/api/jobs/{job_id}/run-now", self._handle_run_job_now) # Structured event streaming self._app.router.add_post("/v1/runs", self._handle_runs) self._app.router.add_get("/v1/runs/{run_id}/events", self._handle_run_events) diff --git a/hermes_cli/cron.py b/hermes_cli/cron.py index d10513a28..95c934f0d 100644 --- a/hermes_cli/cron.py +++ b/hermes_cli/cron.py @@ -221,7 +221,31 @@ def cron_edit(args): return 0 -def _job_action(action: str, job_id: str, success_verb: str) -> int: +def _job_action(action: str, job_id: str, success_verb: str, now: bool = False) -> int: + if action == "run" and now: + # Synchronous execution — run job immediately and show result + result = _cron_api(action="run_now", job_id=job_id) + if not result.get("success"): + if result.get("error"): + print(color(f"Failed to run job now: {result['error']}", Colors.RED)) + else: + print(color(f"Failed to run job now: {result.get('error', 'unknown error')}", Colors.RED)) + return 1 + job = result.get("job", {}) + success = result.get("success", False) + error = result.get("error") + final_response = result.get("final_response", "") + name = job.get("name", job_id) + if success: + print(color(f"Job '{name}' completed successfully", Colors.GREEN)) + else: + print(color(f"Job '{name}' failed: {error}", Colors.RED)) + if final_response: + print(f"\n{final_response}\n") + if not error: + print(color("Stale error state cleared.", Colors.GREEN)) + return 0 if success else 1 + result = _cron_api(action=action, job_id=job_id) if not result.get("success"): print(color(f"Failed to {action} job: {result.get('error', 'unknown error')}", Colors.RED)) @@ -265,7 +289,8 @@ def cron_command(args): return _job_action("resume", args.job_id, "Resumed") if subcmd == "run": - return _job_action("run", args.job_id, "Triggered") + now = getattr(args, 'now', False) + return _job_action("run", args.job_id, "Triggered", now=now) if subcmd in {"remove", "rm", "delete"}: return _job_action("remove", args.job_id, "Removed") diff --git a/hermes_cli/main.py b/hermes_cli/main.py index 899d567c8..78be82204 100644 --- a/hermes_cli/main.py +++ b/hermes_cli/main.py @@ -4574,6 +4574,7 @@ For more help on a command: cron_run = cron_subparsers.add_parser("run", help="Run a job on the next scheduler tick") cron_run.add_argument("job_id", help="Job ID to trigger") + cron_run.add_argument("--now", action="store_true", help="Execute immediately and wait for result (clears stale errors)") cron_remove = cron_subparsers.add_parser("remove", aliases=["rm", "delete"], help="Remove a scheduled job") cron_remove.add_argument("job_id", help="Job ID to remove") diff --git a/tools/cronjob_tools.py b/tools/cronjob_tools.py index 8dbcf7c3a..e73d741b6 100644 --- a/tools/cronjob_tools.py +++ b/tools/cronjob_tools.py @@ -316,10 +316,17 @@ def cronjob( updated = resume_job(job_id) return json.dumps({"success": True, "job": _format_job(updated)}, indent=2) - if normalized in {"run", "run_now", "trigger"}: + if normalized in {"run", "trigger"}: updated = trigger_job(job_id) return json.dumps({"success": True, "job": _format_job(updated)}, indent=2) + if normalized == "run_now": + from cron.jobs import run_job_now + result = run_job_now(job_id) + if result is None: + return json.dumps({"success": False, "error": "Job not found"}, indent=2) + return json.dumps(result, indent=2) + if normalized == "update": updates: Dict[str, Any] = {} if prompt is not None: