Merge pull request 'perf(cron): parallel job execution + priority sorting (#353)' (#357) from fix/cron-tick-backlog into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 20s

This commit was merged in pull request #357.
This commit is contained in:
2026-04-13 08:29:31 +00:00

View File

@@ -910,26 +910,36 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int:
if verbose:
logger.info("%s - %s job(s) due", _hermes_now().strftime('%H:%M:%S'), len(due_jobs))
# Parallel job execution with priority sorting
PRIORITY_ORDER = {"critical": 0, "high": 1, "normal": 2, "low": 3}
due_jobs_sorted = sorted(due_jobs, key=lambda j: PRIORITY_ORDER.get(j.get("priority", "normal"), 2))
MAX_PARALLEL = int(os.environ.get("HERMES_CRON_MAX_PARALLEL", "10"))
executed = 0
for job in due_jobs:
# If the interpreter is shutting down (e.g. gateway restart),
# stop processing immediately — ThreadPoolExecutor.submit()
# will raise RuntimeError for every remaining job.
_job_results = []
def _process_single_job(job):
job_name = job.get("name", job["id"])
if sys.is_finalizing():
logger.warning(
"Interpreter finalizing — skipping %d remaining job(s)",
len(due_jobs) - executed,
)
break
return None
try:
# For recurring jobs (cron/interval), advance next_run_at to the
# next future occurrence BEFORE execution. This way, if the
# process crashes mid-run, the job won't re-fire on restart.
# One-shot jobs are left alone so they can retry on restart.
advance_next_run(job["id"])
success, output, final_response, error = run_job(job)
return (job, success, output, final_response, error)
except Exception as e:
logger.error("Job '%s': parallel error: %s", job_name, e)
return (job, False, "", None, str(e))
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_PARALLEL) as executor:
futures = {executor.submit(_process_single_job, job): job for job in due_jobs_sorted}
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result is None:
continue
_job_results.append(result)
for job, success, output, final_response, error in _job_results:
executed += 1
output_file = save_job_output(job["id"], output)
if verbose: