fix(gateway): support infinite timeout + periodic notifications + actionable error (#4959)
- HERMES_AGENT_TIMEOUT=0 now means no limit (infinite execution) - Periodic 'still working' notifications every 10 minutes for long tasks - Timeout error message now tells users how to increase the limit - Stale-lock eviction handles infinite timeout correctly (float inf TTL)
This commit is contained in:
@@ -1749,7 +1749,8 @@ class GatewayRunner:
|
||||
# Staleness eviction: if an entry has been in _running_agents for
|
||||
# longer than the agent timeout, it's a leaked lock from a hung or
|
||||
# crashed handler. Evict it so the session isn't permanently stuck.
|
||||
_STALE_TTL = float(os.getenv("HERMES_AGENT_TIMEOUT", 600)) + 60 # timeout + 1 min grace
|
||||
_raw_stale_timeout = float(os.getenv("HERMES_AGENT_TIMEOUT", 600))
|
||||
_STALE_TTL = (_raw_stale_timeout + 60) if _raw_stale_timeout > 0 else float("inf")
|
||||
_stale_ts = self._running_agents_ts.get(_quick_key, 0)
|
||||
if _quick_key in self._running_agents and _stale_ts and (time.time() - _stale_ts) > _STALE_TTL:
|
||||
logger.warning(
|
||||
@@ -6105,12 +6106,37 @@ class GatewayRunner:
|
||||
break
|
||||
|
||||
interrupt_monitor = asyncio.create_task(monitor_for_interrupt())
|
||||
|
||||
|
||||
# Periodic "still working" notifications for long-running tasks.
|
||||
# Fires every 10 minutes so the user knows the agent hasn't died.
|
||||
_NOTIFY_INTERVAL = 600 # 10 minutes
|
||||
_notify_start = time.time()
|
||||
|
||||
async def _notify_long_running():
|
||||
_notify_adapter = self.adapters.get(source.platform)
|
||||
if not _notify_adapter:
|
||||
return
|
||||
while True:
|
||||
await asyncio.sleep(_NOTIFY_INTERVAL)
|
||||
_elapsed_mins = int((time.time() - _notify_start) // 60)
|
||||
try:
|
||||
await _notify_adapter.send(
|
||||
source.chat_id,
|
||||
f"⏳ Still working... ({_elapsed_mins} minutes elapsed)",
|
||||
metadata=_status_thread_metadata,
|
||||
)
|
||||
except Exception as _ne:
|
||||
logger.debug("Long-running notification error: %s", _ne)
|
||||
|
||||
_notify_task = asyncio.create_task(_notify_long_running())
|
||||
|
||||
try:
|
||||
# Run in thread pool to not block. Cap total execution time
|
||||
# so a hung API call or runaway tool doesn't permanently lock
|
||||
# the session. Default 10 minutes; override with env var.
|
||||
_agent_timeout = float(os.getenv("HERMES_AGENT_TIMEOUT", 600))
|
||||
# Set to 0 for no limit (infinite).
|
||||
_agent_timeout_raw = float(os.getenv("HERMES_AGENT_TIMEOUT", 600))
|
||||
_agent_timeout = _agent_timeout_raw if _agent_timeout_raw > 0 else None
|
||||
loop = asyncio.get_event_loop()
|
||||
try:
|
||||
response = await asyncio.wait_for(
|
||||
@@ -6127,10 +6153,13 @@ class GatewayRunner:
|
||||
_timed_out_agent = agent_holder[0]
|
||||
if _timed_out_agent and hasattr(_timed_out_agent, "interrupt"):
|
||||
_timed_out_agent.interrupt("Execution timed out")
|
||||
_timeout_mins = int(_agent_timeout // 60)
|
||||
response = {
|
||||
"final_response": (
|
||||
f"⏱️ Request timed out after {int(_agent_timeout // 60)} minutes. "
|
||||
f"⏱️ Request timed out after {_timeout_mins} minutes. "
|
||||
"The agent may have been stuck on a tool or API call.\n"
|
||||
"To increase the limit, set HERMES_AGENT_TIMEOUT in your .env "
|
||||
"(value in seconds, 0 = no limit) and restart the gateway.\n"
|
||||
"Try again, or use /reset to start fresh."
|
||||
),
|
||||
"messages": result_holder[0].get("messages", []) if result_holder[0] else [],
|
||||
@@ -6227,10 +6256,11 @@ class GatewayRunner:
|
||||
_interrupt_depth=_interrupt_depth + 1,
|
||||
)
|
||||
finally:
|
||||
# Stop progress sender and interrupt monitor
|
||||
# Stop progress sender, interrupt monitor, and notification task
|
||||
if progress_task:
|
||||
progress_task.cancel()
|
||||
interrupt_monitor.cancel()
|
||||
_notify_task.cancel()
|
||||
|
||||
# Wait for stream consumer to finish its final edit
|
||||
if stream_task:
|
||||
@@ -6251,7 +6281,7 @@ class GatewayRunner:
|
||||
self._running_agents_ts.pop(session_key, None)
|
||||
|
||||
# Wait for cancelled tasks
|
||||
for task in [progress_task, interrupt_monitor, tracking_task]:
|
||||
for task in [progress_task, interrupt_monitor, tracking_task, _notify_task]:
|
||||
if task:
|
||||
try:
|
||||
await task
|
||||
|
||||
Reference in New Issue
Block a user