diff --git a/cron/scheduler.py b/cron/scheduler.py index 860980e0e..2337c25a5 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -25,11 +25,17 @@ except ImportError: import msvcrt except ImportError: msvcrt = None +import time from pathlib import Path -from hermes_constants import get_hermes_home -from hermes_cli.config import load_config from typing import Optional +# Add parent directory to path for imports BEFORE repo-level imports. +# Without this, standalone invocations (e.g. after `hermes update` reloads +# the module) fail with ModuleNotFoundError for hermes_time et al. +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from hermes_constants import get_hermes_home +from hermes_cli.config import load_config from hermes_time import now as _hermes_now logger = logging.getLogger(__name__) @@ -42,9 +48,6 @@ _KNOWN_DELIVERY_PLATFORMS = frozenset({ "wecom", "sms", "email", "webhook", }) -# Add parent directory to path for imports -sys.path.insert(0, str(Path(__file__).parent.parent)) - from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run # Sentinel: when a cron agent has nothing new to report, it can start its @@ -590,30 +593,79 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]: session_db=_session_db, ) - # Run the agent with a timeout so a hung API call or tool doesn't - # block the cron ticker thread indefinitely. Default 10 minutes; - # override via env var. Uses a separate thread because - # run_conversation is synchronous. + # Run the agent with an *inactivity*-based timeout: the job can run + # for hours if it's actively calling tools / receiving stream tokens, + # but a hung API call or stuck tool with no activity for the configured + # duration is caught and killed. Default 600s (10 min inactivity); + # override via HERMES_CRON_TIMEOUT env var. 0 = unlimited. + # + # Uses the agent's built-in activity tracker (updated by + # _touch_activity() on every tool call, API call, and stream delta). _cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600)) + _cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None + _POLL_INTERVAL = 5.0 _cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) _cron_future = _cron_pool.submit(agent.run_conversation, prompt) + _inactivity_timeout = False try: - result = _cron_future.result(timeout=_cron_timeout) - except concurrent.futures.TimeoutError: - logger.error( - "Job '%s' timed out after %.0fs — interrupting agent", - job_name, _cron_timeout, - ) - if hasattr(agent, "interrupt"): - agent.interrupt("Cron job timed out") + if _cron_inactivity_limit is None: + # Unlimited — just wait for the result. + result = _cron_future.result() + else: + result = None + while True: + done, _ = concurrent.futures.wait( + {_cron_future}, timeout=_POLL_INTERVAL, + ) + if done: + result = _cron_future.result() + break + # Agent still running — check inactivity. + _idle_secs = 0.0 + if hasattr(agent, "get_activity_summary"): + try: + _act = agent.get_activity_summary() + _idle_secs = _act.get("seconds_since_activity", 0.0) + except Exception: + pass + if _idle_secs >= _cron_inactivity_limit: + _inactivity_timeout = True + break + except Exception: _cron_pool.shutdown(wait=False, cancel_futures=True) - raise TimeoutError( - f"Cron job '{job_name}' timed out after " - f"{int(_cron_timeout // 60)} minutes" - ) + raise finally: _cron_pool.shutdown(wait=False) + if _inactivity_timeout: + # Build diagnostic summary from the agent's activity tracker. + _activity = {} + if hasattr(agent, "get_activity_summary"): + try: + _activity = agent.get_activity_summary() + except Exception: + pass + _last_desc = _activity.get("last_activity_desc", "unknown") + _secs_ago = _activity.get("seconds_since_activity", 0) + _cur_tool = _activity.get("current_tool") + _iter_n = _activity.get("api_call_count", 0) + _iter_max = _activity.get("max_iterations", 0) + + logger.error( + "Job '%s' idle for %.0fs (inactivity limit %.0fs) " + "| last_activity=%s | iteration=%s/%s | tool=%s", + job_name, _secs_ago, _cron_inactivity_limit, + _last_desc, _iter_n, _iter_max, + _cur_tool or "none", + ) + if hasattr(agent, "interrupt"): + agent.interrupt("Cron job timed out (inactivity)") + raise TimeoutError( + f"Cron job '{job_name}' idle for " + f"{int(_secs_ago)}s (limit {int(_cron_inactivity_limit)}s) " + f"— last activity: {_last_desc}" + ) + final_response = result.get("final_response", "") or "" # Use a separate variable for log display; keep final_response clean # for delivery logic (empty response = no delivery). diff --git a/tests/cron/test_cron_inactivity_timeout.py b/tests/cron/test_cron_inactivity_timeout.py new file mode 100644 index 000000000..0b83f64f0 --- /dev/null +++ b/tests/cron/test_cron_inactivity_timeout.py @@ -0,0 +1,289 @@ +"""Tests for cron job inactivity-based timeout. + +Tests cover: +- Active agent runs indefinitely (no inactivity timeout) +- Idle agent triggers inactivity timeout with diagnostic info +- Unlimited timeout (HERMES_CRON_TIMEOUT=0) +- Backward compat: HERMES_CRON_TIMEOUT env var still works +- Error message includes activity summary +""" + +import concurrent.futures +import os +import sys +import time +import threading +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +# Ensure project root is importable +sys.path.insert(0, str(Path(__file__).parent.parent.parent)) + + +class FakeAgent: + """Mock agent with controllable activity summary for timeout tests.""" + + def __init__(self, idle_seconds=0.0, activity_desc="tool_call", + current_tool=None, api_call_count=5, max_iterations=90): + self._idle_seconds = idle_seconds + self._activity_desc = activity_desc + self._current_tool = current_tool + self._api_call_count = api_call_count + self._max_iterations = max_iterations + self._interrupted = False + self._interrupt_msg = None + + def get_activity_summary(self): + return { + "last_activity_ts": time.time() - self._idle_seconds, + "last_activity_desc": self._activity_desc, + "seconds_since_activity": self._idle_seconds, + "current_tool": self._current_tool, + "api_call_count": self._api_call_count, + "max_iterations": self._max_iterations, + } + + def interrupt(self, msg): + self._interrupted = True + self._interrupt_msg = msg + + def run_conversation(self, prompt): + """Simulate a quick agent run that finishes immediately.""" + return {"final_response": "Done", "messages": []} + + +class SlowFakeAgent(FakeAgent): + """Agent that runs for a while, simulating active work then going idle.""" + + def __init__(self, run_duration=0.5, idle_after=None, **kwargs): + super().__init__(**kwargs) + self._run_duration = run_duration + self._idle_after = idle_after # seconds before becoming idle + self._start_time = None + + def get_activity_summary(self): + summary = super().get_activity_summary() + if self._idle_after is not None and self._start_time: + elapsed = time.time() - self._start_time + if elapsed > self._idle_after: + # Agent has gone idle + idle_time = elapsed - self._idle_after + summary["seconds_since_activity"] = idle_time + summary["last_activity_desc"] = "api_call_streaming" + else: + summary["seconds_since_activity"] = 0.0 + return summary + + def run_conversation(self, prompt): + self._start_time = time.time() + time.sleep(self._run_duration) + return {"final_response": "Completed after work", "messages": []} + + +class TestInactivityTimeout: + """Test the inactivity-based timeout polling loop in cron scheduler.""" + + def test_active_agent_completes_normally(self): + """An agent that finishes quickly should return its result.""" + agent = FakeAgent(idle_seconds=0.0) + _cron_inactivity_limit = 10.0 + _POLL_INTERVAL = 0.1 + + pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) + future = pool.submit(agent.run_conversation, "test prompt") + _inactivity_timeout = False + + result = None + while True: + done, _ = concurrent.futures.wait({future}, timeout=_POLL_INTERVAL) + if done: + result = future.result() + break + _idle_secs = 0.0 + if hasattr(agent, "get_activity_summary"): + _act = agent.get_activity_summary() + _idle_secs = _act.get("seconds_since_activity", 0.0) + if _idle_secs >= _cron_inactivity_limit: + _inactivity_timeout = True + break + + pool.shutdown(wait=False) + assert result is not None + assert result["final_response"] == "Done" + assert not _inactivity_timeout + assert not agent._interrupted + + def test_idle_agent_triggers_timeout(self): + """An agent that goes idle should be detected and interrupted.""" + # Agent will run for 0.3s, then become idle after 0.1s of that + agent = SlowFakeAgent( + run_duration=5.0, # would run forever without timeout + idle_after=0.1, # goes idle almost immediately + activity_desc="api_call_streaming", + current_tool="web_search", + api_call_count=3, + max_iterations=50, + ) + + _cron_inactivity_limit = 0.5 # 0.5s inactivity triggers timeout + _POLL_INTERVAL = 0.1 + + pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) + future = pool.submit(agent.run_conversation, "test prompt") + _inactivity_timeout = False + + result = None + while True: + done, _ = concurrent.futures.wait({future}, timeout=_POLL_INTERVAL) + if done: + result = future.result() + break + _idle_secs = 0.0 + if hasattr(agent, "get_activity_summary"): + try: + _act = agent.get_activity_summary() + _idle_secs = _act.get("seconds_since_activity", 0.0) + except Exception: + pass + if _idle_secs >= _cron_inactivity_limit: + _inactivity_timeout = True + break + + pool.shutdown(wait=False, cancel_futures=True) + assert _inactivity_timeout is True + assert result is None # Never got a result — interrupted + + def test_unlimited_timeout(self): + """HERMES_CRON_TIMEOUT=0 means no timeout at all.""" + agent = FakeAgent(idle_seconds=0.0) + _cron_inactivity_limit = None # unlimited + + pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) + future = pool.submit(agent.run_conversation, "test prompt") + + # With unlimited, we just await the result directly. + result = future.result() + pool.shutdown(wait=False) + + assert result["final_response"] == "Done" + + def test_timeout_env_var_parsing(self, monkeypatch): + """HERMES_CRON_TIMEOUT env var is respected.""" + monkeypatch.setenv("HERMES_CRON_TIMEOUT", "1200") + _cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600)) + assert _cron_timeout == 1200.0 + + _cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None + assert _cron_inactivity_limit == 1200.0 + + def test_timeout_zero_means_unlimited(self, monkeypatch): + """HERMES_CRON_TIMEOUT=0 yields None (unlimited).""" + monkeypatch.setenv("HERMES_CRON_TIMEOUT", "0") + _cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600)) + _cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None + assert _cron_inactivity_limit is None + + def test_timeout_error_includes_diagnostics(self): + """The TimeoutError message should include last activity info.""" + agent = SlowFakeAgent( + run_duration=5.0, + idle_after=0.05, + activity_desc="api_call_streaming", + current_tool="delegate_task", + api_call_count=7, + max_iterations=90, + ) + + _cron_inactivity_limit = 0.3 + _POLL_INTERVAL = 0.1 + + pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) + future = pool.submit(agent.run_conversation, "test") + _inactivity_timeout = False + + while True: + done, _ = concurrent.futures.wait({future}, timeout=_POLL_INTERVAL) + if done: + break + _idle_secs = 0.0 + if hasattr(agent, "get_activity_summary"): + try: + _act = agent.get_activity_summary() + _idle_secs = _act.get("seconds_since_activity", 0.0) + except Exception: + pass + if _idle_secs >= _cron_inactivity_limit: + _inactivity_timeout = True + break + + pool.shutdown(wait=False, cancel_futures=True) + assert _inactivity_timeout + + # Build the diagnostic message like the scheduler does + _activity = agent.get_activity_summary() + _last_desc = _activity.get("last_activity_desc", "unknown") + _secs_ago = _activity.get("seconds_since_activity", 0) + + err_msg = ( + f"Cron job 'test-job' idle for " + f"{int(_secs_ago)}s (limit {int(_cron_inactivity_limit)}s) " + f"— last activity: {_last_desc}" + ) + assert "idle for" in err_msg + assert "api_call_streaming" in err_msg + + def test_agent_without_activity_summary_uses_wallclock_fallback(self): + """If agent lacks get_activity_summary, idle_secs stays 0 (never times out). + + This ensures backward compat if somehow an old agent is used. + The polling loop will eventually complete when the task finishes. + """ + class BareAgent: + def run_conversation(self, prompt): + return {"final_response": "no activity tracker", "messages": []} + + agent = BareAgent() + _cron_inactivity_limit = 0.1 # tiny limit + _POLL_INTERVAL = 0.1 + + pool = concurrent.futures.ThreadPoolExecutor(max_workers=1) + future = pool.submit(agent.run_conversation, "test") + _inactivity_timeout = False + + while True: + done, _ = concurrent.futures.wait({future}, timeout=_POLL_INTERVAL) + if done: + result = future.result() + break + _idle_secs = 0.0 + if hasattr(agent, "get_activity_summary"): + try: + _act = agent.get_activity_summary() + _idle_secs = _act.get("seconds_since_activity", 0.0) + except Exception: + pass + if _idle_secs >= _cron_inactivity_limit: + _inactivity_timeout = True + break + + pool.shutdown(wait=False) + # Should NOT have timed out — bare agent has no get_activity_summary + assert not _inactivity_timeout + assert result["final_response"] == "no activity tracker" + + +class TestSysPathOrdering: + """Test that sys.path is set before repo-level imports.""" + + def test_hermes_time_importable(self): + """hermes_time should be importable when cron.scheduler loads.""" + # This import would fail if sys.path.insert comes after the import + from cron.scheduler import _hermes_now + assert callable(_hermes_now) + + def test_hermes_constants_importable(self): + """hermes_constants should be importable from cron context.""" + from hermes_constants import get_hermes_home + assert callable(get_hermes_home)