fix(cron): replace wall-clock timeout with inactivity-based timeout (#5440)
Port the gateway's inactivity-based timeout pattern (PR #5389) to the cron scheduler. The agent can now run for hours if it's actively calling tools or receiving stream tokens — only genuine inactivity (no activity for HERMES_CRON_TIMEOUT seconds, default 600s) triggers a timeout. This fixes the Sunday PR scouts (openclaw, nanoclaw, ironclaw) which all hit the hard 600s wall-clock limit while actively working. Changes: - Replace flat future.result(timeout=N) with a polling loop that checks agent.get_activity_summary() every 5s (same pattern as gateway) - Timeout error now includes diagnostic info: last activity description, idle duration, current tool, iteration count - HERMES_CRON_TIMEOUT=0 means unlimited (no timeout) - Move sys.path.insert before repo-level imports to fix ModuleNotFoundError for hermes_time on stale gateway processes - Add time import needed by the polling loop - Add 9 tests covering active/idle/unlimited/env-var/diagnostic scenarios
This commit is contained in:
@@ -25,11 +25,17 @@ except ImportError:
|
||||
import msvcrt
|
||||
except ImportError:
|
||||
msvcrt = None
|
||||
import time
|
||||
from pathlib import Path
|
||||
from hermes_constants import get_hermes_home
|
||||
from hermes_cli.config import load_config
|
||||
from typing import Optional
|
||||
|
||||
# Add parent directory to path for imports BEFORE repo-level imports.
|
||||
# Without this, standalone invocations (e.g. after `hermes update` reloads
|
||||
# the module) fail with ModuleNotFoundError for hermes_time et al.
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from hermes_constants import get_hermes_home
|
||||
from hermes_cli.config import load_config
|
||||
from hermes_time import now as _hermes_now
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -42,9 +48,6 @@ _KNOWN_DELIVERY_PLATFORMS = frozenset({
|
||||
"wecom", "sms", "email", "webhook",
|
||||
})
|
||||
|
||||
# Add parent directory to path for imports
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run
|
||||
|
||||
# Sentinel: when a cron agent has nothing new to report, it can start its
|
||||
@@ -590,30 +593,79 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
session_db=_session_db,
|
||||
)
|
||||
|
||||
# Run the agent with a timeout so a hung API call or tool doesn't
|
||||
# block the cron ticker thread indefinitely. Default 10 minutes;
|
||||
# override via env var. Uses a separate thread because
|
||||
# run_conversation is synchronous.
|
||||
# Run the agent with an *inactivity*-based timeout: the job can run
|
||||
# for hours if it's actively calling tools / receiving stream tokens,
|
||||
# but a hung API call or stuck tool with no activity for the configured
|
||||
# duration is caught and killed. Default 600s (10 min inactivity);
|
||||
# override via HERMES_CRON_TIMEOUT env var. 0 = unlimited.
|
||||
#
|
||||
# Uses the agent's built-in activity tracker (updated by
|
||||
# _touch_activity() on every tool call, API call, and stream delta).
|
||||
_cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600))
|
||||
_cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None
|
||||
_POLL_INTERVAL = 5.0
|
||||
_cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||
_cron_future = _cron_pool.submit(agent.run_conversation, prompt)
|
||||
_inactivity_timeout = False
|
||||
try:
|
||||
result = _cron_future.result(timeout=_cron_timeout)
|
||||
except concurrent.futures.TimeoutError:
|
||||
logger.error(
|
||||
"Job '%s' timed out after %.0fs — interrupting agent",
|
||||
job_name, _cron_timeout,
|
||||
)
|
||||
if hasattr(agent, "interrupt"):
|
||||
agent.interrupt("Cron job timed out")
|
||||
if _cron_inactivity_limit is None:
|
||||
# Unlimited — just wait for the result.
|
||||
result = _cron_future.result()
|
||||
else:
|
||||
result = None
|
||||
while True:
|
||||
done, _ = concurrent.futures.wait(
|
||||
{_cron_future}, timeout=_POLL_INTERVAL,
|
||||
)
|
||||
if done:
|
||||
result = _cron_future.result()
|
||||
break
|
||||
# Agent still running — check inactivity.
|
||||
_idle_secs = 0.0
|
||||
if hasattr(agent, "get_activity_summary"):
|
||||
try:
|
||||
_act = agent.get_activity_summary()
|
||||
_idle_secs = _act.get("seconds_since_activity", 0.0)
|
||||
except Exception:
|
||||
pass
|
||||
if _idle_secs >= _cron_inactivity_limit:
|
||||
_inactivity_timeout = True
|
||||
break
|
||||
except Exception:
|
||||
_cron_pool.shutdown(wait=False, cancel_futures=True)
|
||||
raise TimeoutError(
|
||||
f"Cron job '{job_name}' timed out after "
|
||||
f"{int(_cron_timeout // 60)} minutes"
|
||||
)
|
||||
raise
|
||||
finally:
|
||||
_cron_pool.shutdown(wait=False)
|
||||
|
||||
if _inactivity_timeout:
|
||||
# Build diagnostic summary from the agent's activity tracker.
|
||||
_activity = {}
|
||||
if hasattr(agent, "get_activity_summary"):
|
||||
try:
|
||||
_activity = agent.get_activity_summary()
|
||||
except Exception:
|
||||
pass
|
||||
_last_desc = _activity.get("last_activity_desc", "unknown")
|
||||
_secs_ago = _activity.get("seconds_since_activity", 0)
|
||||
_cur_tool = _activity.get("current_tool")
|
||||
_iter_n = _activity.get("api_call_count", 0)
|
||||
_iter_max = _activity.get("max_iterations", 0)
|
||||
|
||||
logger.error(
|
||||
"Job '%s' idle for %.0fs (inactivity limit %.0fs) "
|
||||
"| last_activity=%s | iteration=%s/%s | tool=%s",
|
||||
job_name, _secs_ago, _cron_inactivity_limit,
|
||||
_last_desc, _iter_n, _iter_max,
|
||||
_cur_tool or "none",
|
||||
)
|
||||
if hasattr(agent, "interrupt"):
|
||||
agent.interrupt("Cron job timed out (inactivity)")
|
||||
raise TimeoutError(
|
||||
f"Cron job '{job_name}' idle for "
|
||||
f"{int(_secs_ago)}s (limit {int(_cron_inactivity_limit)}s) "
|
||||
f"— last activity: {_last_desc}"
|
||||
)
|
||||
|
||||
final_response = result.get("final_response", "") or ""
|
||||
# Use a separate variable for log display; keep final_response clean
|
||||
# for delivery logic (empty response = no delivery).
|
||||
|
||||
289
tests/cron/test_cron_inactivity_timeout.py
Normal file
289
tests/cron/test_cron_inactivity_timeout.py
Normal file
@@ -0,0 +1,289 @@
|
||||
"""Tests for cron job inactivity-based timeout.
|
||||
|
||||
Tests cover:
|
||||
- Active agent runs indefinitely (no inactivity timeout)
|
||||
- Idle agent triggers inactivity timeout with diagnostic info
|
||||
- Unlimited timeout (HERMES_CRON_TIMEOUT=0)
|
||||
- Backward compat: HERMES_CRON_TIMEOUT env var still works
|
||||
- Error message includes activity summary
|
||||
"""
|
||||
|
||||
import concurrent.futures
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
# Ensure project root is importable
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
||||
|
||||
|
||||
class FakeAgent:
|
||||
"""Mock agent with controllable activity summary for timeout tests."""
|
||||
|
||||
def __init__(self, idle_seconds=0.0, activity_desc="tool_call",
|
||||
current_tool=None, api_call_count=5, max_iterations=90):
|
||||
self._idle_seconds = idle_seconds
|
||||
self._activity_desc = activity_desc
|
||||
self._current_tool = current_tool
|
||||
self._api_call_count = api_call_count
|
||||
self._max_iterations = max_iterations
|
||||
self._interrupted = False
|
||||
self._interrupt_msg = None
|
||||
|
||||
def get_activity_summary(self):
|
||||
return {
|
||||
"last_activity_ts": time.time() - self._idle_seconds,
|
||||
"last_activity_desc": self._activity_desc,
|
||||
"seconds_since_activity": self._idle_seconds,
|
||||
"current_tool": self._current_tool,
|
||||
"api_call_count": self._api_call_count,
|
||||
"max_iterations": self._max_iterations,
|
||||
}
|
||||
|
||||
def interrupt(self, msg):
|
||||
self._interrupted = True
|
||||
self._interrupt_msg = msg
|
||||
|
||||
def run_conversation(self, prompt):
|
||||
"""Simulate a quick agent run that finishes immediately."""
|
||||
return {"final_response": "Done", "messages": []}
|
||||
|
||||
|
||||
class SlowFakeAgent(FakeAgent):
|
||||
"""Agent that runs for a while, simulating active work then going idle."""
|
||||
|
||||
def __init__(self, run_duration=0.5, idle_after=None, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self._run_duration = run_duration
|
||||
self._idle_after = idle_after # seconds before becoming idle
|
||||
self._start_time = None
|
||||
|
||||
def get_activity_summary(self):
|
||||
summary = super().get_activity_summary()
|
||||
if self._idle_after is not None and self._start_time:
|
||||
elapsed = time.time() - self._start_time
|
||||
if elapsed > self._idle_after:
|
||||
# Agent has gone idle
|
||||
idle_time = elapsed - self._idle_after
|
||||
summary["seconds_since_activity"] = idle_time
|
||||
summary["last_activity_desc"] = "api_call_streaming"
|
||||
else:
|
||||
summary["seconds_since_activity"] = 0.0
|
||||
return summary
|
||||
|
||||
def run_conversation(self, prompt):
|
||||
self._start_time = time.time()
|
||||
time.sleep(self._run_duration)
|
||||
return {"final_response": "Completed after work", "messages": []}
|
||||
|
||||
|
||||
class TestInactivityTimeout:
|
||||
"""Test the inactivity-based timeout polling loop in cron scheduler."""
|
||||
|
||||
def test_active_agent_completes_normally(self):
|
||||
"""An agent that finishes quickly should return its result."""
|
||||
agent = FakeAgent(idle_seconds=0.0)
|
||||
_cron_inactivity_limit = 10.0
|
||||
_POLL_INTERVAL = 0.1
|
||||
|
||||
pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||
future = pool.submit(agent.run_conversation, "test prompt")
|
||||
_inactivity_timeout = False
|
||||
|
||||
result = None
|
||||
while True:
|
||||
done, _ = concurrent.futures.wait({future}, timeout=_POLL_INTERVAL)
|
||||
if done:
|
||||
result = future.result()
|
||||
break
|
||||
_idle_secs = 0.0
|
||||
if hasattr(agent, "get_activity_summary"):
|
||||
_act = agent.get_activity_summary()
|
||||
_idle_secs = _act.get("seconds_since_activity", 0.0)
|
||||
if _idle_secs >= _cron_inactivity_limit:
|
||||
_inactivity_timeout = True
|
||||
break
|
||||
|
||||
pool.shutdown(wait=False)
|
||||
assert result is not None
|
||||
assert result["final_response"] == "Done"
|
||||
assert not _inactivity_timeout
|
||||
assert not agent._interrupted
|
||||
|
||||
def test_idle_agent_triggers_timeout(self):
|
||||
"""An agent that goes idle should be detected and interrupted."""
|
||||
# Agent will run for 0.3s, then become idle after 0.1s of that
|
||||
agent = SlowFakeAgent(
|
||||
run_duration=5.0, # would run forever without timeout
|
||||
idle_after=0.1, # goes idle almost immediately
|
||||
activity_desc="api_call_streaming",
|
||||
current_tool="web_search",
|
||||
api_call_count=3,
|
||||
max_iterations=50,
|
||||
)
|
||||
|
||||
_cron_inactivity_limit = 0.5 # 0.5s inactivity triggers timeout
|
||||
_POLL_INTERVAL = 0.1
|
||||
|
||||
pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||
future = pool.submit(agent.run_conversation, "test prompt")
|
||||
_inactivity_timeout = False
|
||||
|
||||
result = None
|
||||
while True:
|
||||
done, _ = concurrent.futures.wait({future}, timeout=_POLL_INTERVAL)
|
||||
if done:
|
||||
result = future.result()
|
||||
break
|
||||
_idle_secs = 0.0
|
||||
if hasattr(agent, "get_activity_summary"):
|
||||
try:
|
||||
_act = agent.get_activity_summary()
|
||||
_idle_secs = _act.get("seconds_since_activity", 0.0)
|
||||
except Exception:
|
||||
pass
|
||||
if _idle_secs >= _cron_inactivity_limit:
|
||||
_inactivity_timeout = True
|
||||
break
|
||||
|
||||
pool.shutdown(wait=False, cancel_futures=True)
|
||||
assert _inactivity_timeout is True
|
||||
assert result is None # Never got a result — interrupted
|
||||
|
||||
def test_unlimited_timeout(self):
|
||||
"""HERMES_CRON_TIMEOUT=0 means no timeout at all."""
|
||||
agent = FakeAgent(idle_seconds=0.0)
|
||||
_cron_inactivity_limit = None # unlimited
|
||||
|
||||
pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||
future = pool.submit(agent.run_conversation, "test prompt")
|
||||
|
||||
# With unlimited, we just await the result directly.
|
||||
result = future.result()
|
||||
pool.shutdown(wait=False)
|
||||
|
||||
assert result["final_response"] == "Done"
|
||||
|
||||
def test_timeout_env_var_parsing(self, monkeypatch):
|
||||
"""HERMES_CRON_TIMEOUT env var is respected."""
|
||||
monkeypatch.setenv("HERMES_CRON_TIMEOUT", "1200")
|
||||
_cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600))
|
||||
assert _cron_timeout == 1200.0
|
||||
|
||||
_cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None
|
||||
assert _cron_inactivity_limit == 1200.0
|
||||
|
||||
def test_timeout_zero_means_unlimited(self, monkeypatch):
|
||||
"""HERMES_CRON_TIMEOUT=0 yields None (unlimited)."""
|
||||
monkeypatch.setenv("HERMES_CRON_TIMEOUT", "0")
|
||||
_cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600))
|
||||
_cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None
|
||||
assert _cron_inactivity_limit is None
|
||||
|
||||
def test_timeout_error_includes_diagnostics(self):
|
||||
"""The TimeoutError message should include last activity info."""
|
||||
agent = SlowFakeAgent(
|
||||
run_duration=5.0,
|
||||
idle_after=0.05,
|
||||
activity_desc="api_call_streaming",
|
||||
current_tool="delegate_task",
|
||||
api_call_count=7,
|
||||
max_iterations=90,
|
||||
)
|
||||
|
||||
_cron_inactivity_limit = 0.3
|
||||
_POLL_INTERVAL = 0.1
|
||||
|
||||
pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||
future = pool.submit(agent.run_conversation, "test")
|
||||
_inactivity_timeout = False
|
||||
|
||||
while True:
|
||||
done, _ = concurrent.futures.wait({future}, timeout=_POLL_INTERVAL)
|
||||
if done:
|
||||
break
|
||||
_idle_secs = 0.0
|
||||
if hasattr(agent, "get_activity_summary"):
|
||||
try:
|
||||
_act = agent.get_activity_summary()
|
||||
_idle_secs = _act.get("seconds_since_activity", 0.0)
|
||||
except Exception:
|
||||
pass
|
||||
if _idle_secs >= _cron_inactivity_limit:
|
||||
_inactivity_timeout = True
|
||||
break
|
||||
|
||||
pool.shutdown(wait=False, cancel_futures=True)
|
||||
assert _inactivity_timeout
|
||||
|
||||
# Build the diagnostic message like the scheduler does
|
||||
_activity = agent.get_activity_summary()
|
||||
_last_desc = _activity.get("last_activity_desc", "unknown")
|
||||
_secs_ago = _activity.get("seconds_since_activity", 0)
|
||||
|
||||
err_msg = (
|
||||
f"Cron job 'test-job' idle for "
|
||||
f"{int(_secs_ago)}s (limit {int(_cron_inactivity_limit)}s) "
|
||||
f"— last activity: {_last_desc}"
|
||||
)
|
||||
assert "idle for" in err_msg
|
||||
assert "api_call_streaming" in err_msg
|
||||
|
||||
def test_agent_without_activity_summary_uses_wallclock_fallback(self):
|
||||
"""If agent lacks get_activity_summary, idle_secs stays 0 (never times out).
|
||||
|
||||
This ensures backward compat if somehow an old agent is used.
|
||||
The polling loop will eventually complete when the task finishes.
|
||||
"""
|
||||
class BareAgent:
|
||||
def run_conversation(self, prompt):
|
||||
return {"final_response": "no activity tracker", "messages": []}
|
||||
|
||||
agent = BareAgent()
|
||||
_cron_inactivity_limit = 0.1 # tiny limit
|
||||
_POLL_INTERVAL = 0.1
|
||||
|
||||
pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||
future = pool.submit(agent.run_conversation, "test")
|
||||
_inactivity_timeout = False
|
||||
|
||||
while True:
|
||||
done, _ = concurrent.futures.wait({future}, timeout=_POLL_INTERVAL)
|
||||
if done:
|
||||
result = future.result()
|
||||
break
|
||||
_idle_secs = 0.0
|
||||
if hasattr(agent, "get_activity_summary"):
|
||||
try:
|
||||
_act = agent.get_activity_summary()
|
||||
_idle_secs = _act.get("seconds_since_activity", 0.0)
|
||||
except Exception:
|
||||
pass
|
||||
if _idle_secs >= _cron_inactivity_limit:
|
||||
_inactivity_timeout = True
|
||||
break
|
||||
|
||||
pool.shutdown(wait=False)
|
||||
# Should NOT have timed out — bare agent has no get_activity_summary
|
||||
assert not _inactivity_timeout
|
||||
assert result["final_response"] == "no activity tracker"
|
||||
|
||||
|
||||
class TestSysPathOrdering:
|
||||
"""Test that sys.path is set before repo-level imports."""
|
||||
|
||||
def test_hermes_time_importable(self):
|
||||
"""hermes_time should be importable when cron.scheduler loads."""
|
||||
# This import would fail if sys.path.insert comes after the import
|
||||
from cron.scheduler import _hermes_now
|
||||
assert callable(_hermes_now)
|
||||
|
||||
def test_hermes_constants_importable(self):
|
||||
"""hermes_constants should be importable from cron context."""
|
||||
from hermes_constants import get_hermes_home
|
||||
assert callable(get_hermes_home)
|
||||
Reference in New Issue
Block a user