fix(gateway): prevent stuck sessions with agent timeout and staleness eviction
Three changes to prevent sessions from getting permanently locked: 1. Agent execution timeout (HERMES_AGENT_TIMEOUT, default 10min): Wraps run_in_executor with asyncio.wait_for so a hung API call or runaway tool can't lock a session indefinitely. On timeout, the agent is interrupted and the user gets an actionable error message. 2. Staleness eviction for _running_agents: Tracks start timestamps for each session entry. When a new message arrives and the entry is older than timeout + 1min grace, it's evicted as a leaked lock. Safety net for any cleanup path that fails to remove the entry. 3. Cron job timeout (HERMES_CRON_TIMEOUT, default 10min): Wraps run_conversation in a ThreadPoolExecutor with timeout so a hung cron job doesn't block the ticker thread (and all subsequent cron jobs) indefinitely. Follows grammY runner's per-update timeout pattern and aiogram's asyncio.wait_for approach for handler deadlines.
This commit is contained in:
@@ -443,8 +443,28 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
session_db=_session_db,
|
||||
)
|
||||
|
||||
result = agent.run_conversation(prompt)
|
||||
|
||||
# Run the agent with a timeout so a hung API call or tool doesn't
|
||||
# block the cron ticker thread indefinitely. Default 10 minutes;
|
||||
# override via env var. Uses a separate thread because
|
||||
# run_conversation is synchronous.
|
||||
_cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600))
|
||||
import concurrent.futures
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as _cron_pool:
|
||||
_cron_future = _cron_pool.submit(agent.run_conversation, prompt)
|
||||
try:
|
||||
result = _cron_future.result(timeout=_cron_timeout)
|
||||
except concurrent.futures.TimeoutError:
|
||||
logger.error(
|
||||
"Job '%s' timed out after %.0fs — interrupting agent",
|
||||
job_name, _cron_timeout,
|
||||
)
|
||||
if hasattr(agent, "interrupt"):
|
||||
agent.interrupt("Cron job timed out")
|
||||
raise TimeoutError(
|
||||
f"Cron job '{job_name}' timed out after "
|
||||
f"{int(_cron_timeout // 60)} minutes"
|
||||
)
|
||||
|
||||
final_response = result.get("final_response", "") or ""
|
||||
# Use a separate variable for log display; keep final_response clean
|
||||
# for delivery logic (empty response = no delivery).
|
||||
|
||||
@@ -468,6 +468,7 @@ class GatewayRunner:
|
||||
# Track running agents per session for interrupt support
|
||||
# Key: session_key, Value: AIAgent instance
|
||||
self._running_agents: Dict[str, Any] = {}
|
||||
self._running_agents_ts: Dict[str, float] = {} # start timestamp per session
|
||||
self._pending_messages: Dict[str, str] = {} # Queued messages during interrupt
|
||||
|
||||
# Cache AIAgent instances per session to preserve prompt caching.
|
||||
@@ -1720,6 +1721,21 @@ class GatewayRunner:
|
||||
# simultaneous updates. Do NOT interrupt for photo-only follow-ups here;
|
||||
# let the adapter-level batching/queueing logic absorb them.
|
||||
_quick_key = self._session_key_for_source(source)
|
||||
|
||||
# Staleness eviction: if an entry has been in _running_agents for
|
||||
# longer than the agent timeout, it's a leaked lock from a hung or
|
||||
# crashed handler. Evict it so the session isn't permanently stuck.
|
||||
_STALE_TTL = float(os.getenv("HERMES_AGENT_TIMEOUT", 600)) + 60 # timeout + 1 min grace
|
||||
_ts_dict = getattr(self, "_running_agents_ts", {})
|
||||
_stale_ts = _ts_dict.get(_quick_key, 0)
|
||||
if _quick_key in self._running_agents and _stale_ts and (time.time() - _stale_ts) > _STALE_TTL:
|
||||
logger.warning(
|
||||
"Evicting stale _running_agents entry for %s (age: %.0fs)",
|
||||
_quick_key[:30], time.time() - _stale_ts,
|
||||
)
|
||||
del self._running_agents[_quick_key]
|
||||
_ts_dict.pop(_quick_key, None)
|
||||
|
||||
if _quick_key in self._running_agents:
|
||||
if event.get_command() == "status":
|
||||
return await self._handle_status_command(event)
|
||||
@@ -2045,6 +2061,8 @@ class GatewayRunner:
|
||||
# "already running" guard and spin up a duplicate agent for the
|
||||
# same session — corrupting the transcript.
|
||||
self._running_agents[_quick_key] = _AGENT_PENDING_SENTINEL
|
||||
if hasattr(self, "_running_agents_ts"):
|
||||
self._running_agents_ts[_quick_key] = time.time()
|
||||
|
||||
try:
|
||||
return await self._handle_message_with_agent(event, source, _quick_key)
|
||||
@@ -2055,6 +2073,8 @@ class GatewayRunner:
|
||||
# not linger or the session would be permanently locked out.
|
||||
if self._running_agents.get(_quick_key) is _AGENT_PENDING_SENTINEL:
|
||||
del self._running_agents[_quick_key]
|
||||
if hasattr(self, "_running_agents_ts"):
|
||||
self._running_agents_ts.pop(_quick_key, None)
|
||||
|
||||
async def _handle_message_with_agent(self, event, source, _quick_key: str):
|
||||
"""Inner handler that runs under the _running_agents sentinel guard."""
|
||||
@@ -5985,9 +6005,38 @@ class GatewayRunner:
|
||||
interrupt_monitor = asyncio.create_task(monitor_for_interrupt())
|
||||
|
||||
try:
|
||||
# Run in thread pool to not block
|
||||
# Run in thread pool to not block. Cap total execution time
|
||||
# so a hung API call or runaway tool doesn't permanently lock
|
||||
# the session. Default 10 minutes; override with env var.
|
||||
_agent_timeout = float(os.getenv("HERMES_AGENT_TIMEOUT", 600))
|
||||
loop = asyncio.get_event_loop()
|
||||
response = await loop.run_in_executor(None, run_sync)
|
||||
try:
|
||||
response = await asyncio.wait_for(
|
||||
loop.run_in_executor(None, run_sync),
|
||||
timeout=_agent_timeout,
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(
|
||||
"Agent execution timed out after %.0fs for session %s",
|
||||
_agent_timeout, session_key,
|
||||
)
|
||||
# Interrupt the agent if it's still running so the thread
|
||||
# pool worker is freed.
|
||||
_timed_out_agent = agent_holder[0]
|
||||
if _timed_out_agent and hasattr(_timed_out_agent, "interrupt"):
|
||||
_timed_out_agent.interrupt("Execution timed out")
|
||||
response = {
|
||||
"final_response": (
|
||||
f"⏱️ Request timed out after {int(_agent_timeout // 60)} minutes. "
|
||||
"The agent may have been stuck on a tool or API call.\n"
|
||||
"Try again, or use /reset to start fresh."
|
||||
),
|
||||
"messages": result_holder[0].get("messages", []) if result_holder[0] else [],
|
||||
"api_calls": 0,
|
||||
"tools": tools_holder[0] or [],
|
||||
"history_offset": 0,
|
||||
"failed": True,
|
||||
}
|
||||
|
||||
# Track fallback model state: if the agent switched to a
|
||||
# fallback model during this run, persist it so /model shows
|
||||
@@ -6110,6 +6159,8 @@ class GatewayRunner:
|
||||
tracking_task.cancel()
|
||||
if session_key and session_key in self._running_agents:
|
||||
del self._running_agents[session_key]
|
||||
if session_key and hasattr(self, "_running_agents_ts"):
|
||||
self._running_agents_ts.pop(session_key, None)
|
||||
|
||||
# Wait for cancelled tasks
|
||||
for task in [progress_task, interrupt_monitor, tracking_task]:
|
||||
|
||||
Reference in New Issue
Block a user