fix(gateway): track background task references in GatewayRunner (#3254)
Asyncio tasks created with create_task() but never stored can be garbage collected mid-execution. Add self._background_tasks set to hold references, with add_done_callback cleanup. Tracks: - /background command task - session-reset memory flush task - session-resume memory flush task Cancel all pending tasks in stop(). Update test fixtures that construct GatewayRunner via object.__new__() to include the new _background_tasks attribute. Cherry-picked from PR #3167 by memosr. The original PR also deleted the DM topic auto-skill loading code — that deletion was excluded from this salvage as it removes a shipped feature (#2598). Co-authored-by: memosr.eth <96793918+memosr@users.noreply.github.com>
This commit is contained in:
@@ -414,6 +414,9 @@ class GatewayRunner:
|
||||
# Per-chat voice reply mode: "off" | "voice_only" | "all"
|
||||
self._voice_mode: Dict[str, str] = self._load_voice_modes()
|
||||
|
||||
# Track background tasks to prevent garbage collection mid-execution
|
||||
self._background_tasks: set = set()
|
||||
|
||||
def _get_or_create_gateway_honcho(self, session_key: str):
|
||||
"""Return a persistent Honcho manager/config pair for this gateway session."""
|
||||
if not hasattr(self, "_honcho_managers"):
|
||||
@@ -1298,6 +1301,11 @@ class GatewayRunner:
|
||||
except Exception as e:
|
||||
logger.error("✗ %s disconnect error: %s", platform.value, e)
|
||||
|
||||
# Cancel any pending background tasks
|
||||
for _task in list(self._background_tasks):
|
||||
_task.cancel()
|
||||
self._background_tasks.clear()
|
||||
|
||||
self.adapters.clear()
|
||||
self._running_agents.clear()
|
||||
self._pending_messages.clear()
|
||||
@@ -2737,9 +2745,11 @@ class GatewayRunner:
|
||||
try:
|
||||
old_entry = self.session_store._entries.get(session_key)
|
||||
if old_entry:
|
||||
asyncio.create_task(
|
||||
_flush_task = asyncio.create_task(
|
||||
self._async_flush_memories(old_entry.session_id, session_key)
|
||||
)
|
||||
self._background_tasks.add(_flush_task)
|
||||
_flush_task.add_done_callback(self._background_tasks.discard)
|
||||
except Exception as e:
|
||||
logger.debug("Gateway memory flush on reset failed: %s", e)
|
||||
|
||||
@@ -3552,9 +3562,11 @@ class GatewayRunner:
|
||||
task_id = f"bg_{datetime.now().strftime('%H%M%S')}_{os.urandom(3).hex()}"
|
||||
|
||||
# Fire-and-forget the background task
|
||||
asyncio.create_task(
|
||||
_task = asyncio.create_task(
|
||||
self._run_background_task(prompt, source, task_id)
|
||||
)
|
||||
self._background_tasks.add(_task)
|
||||
_task.add_done_callback(self._background_tasks.discard)
|
||||
|
||||
preview = prompt[:60] + ("..." if len(prompt) > 60 else "")
|
||||
return f'🔄 Background task started: "{preview}"\nTask ID: {task_id}\nYou can keep chatting — results will appear when done.'
|
||||
@@ -3929,9 +3941,11 @@ class GatewayRunner:
|
||||
|
||||
# Flush memories for current session before switching
|
||||
try:
|
||||
asyncio.create_task(
|
||||
_flush_task = asyncio.create_task(
|
||||
self._async_flush_memories(current_entry.session_id, session_key)
|
||||
)
|
||||
self._background_tasks.add(_flush_task)
|
||||
_flush_task.add_done_callback(self._background_tasks.discard)
|
||||
except Exception as e:
|
||||
logger.debug("Memory flush on resume failed: %s", e)
|
||||
|
||||
|
||||
@@ -38,6 +38,7 @@ def _make_runner():
|
||||
runner._provider_routing = {}
|
||||
runner._fallback_model = None
|
||||
runner._running_agents = {}
|
||||
runner._background_tasks = set()
|
||||
|
||||
mock_store = MagicMock()
|
||||
runner.session_store = mock_store
|
||||
|
||||
@@ -72,6 +72,7 @@ async def test_gateway_stop_interrupts_running_agents_and_cancels_adapter_tasks(
|
||||
runner._exit_reason = None
|
||||
runner._pending_messages = {"session": "pending text"}
|
||||
runner._pending_approvals = {"session": {"command": "rm -rf /tmp/x"}}
|
||||
runner._background_tasks = set()
|
||||
runner._shutdown_all_gateway_honcho = lambda: None
|
||||
|
||||
adapter = StubAdapter()
|
||||
|
||||
@@ -39,6 +39,7 @@ def _make_runner():
|
||||
runner._pending_messages = {}
|
||||
runner._pending_approvals = {}
|
||||
runner._voice_mode = {}
|
||||
runner._background_tasks = set()
|
||||
runner._is_user_authorized = lambda _source: True
|
||||
return runner
|
||||
|
||||
|
||||
Reference in New Issue
Block a user