From aaa96713d44991227e048e89760c9dff96cf781f Mon Sep 17 00:00:00 2001 From: Gutslabs Date: Thu, 19 Mar 2026 22:32:37 +0300 Subject: [PATCH] fix(gateway): prevent concurrent agent runs for the same session MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Place a sentinel in _running_agents immediately after the "already running" guard check passes — before any await. Without this, the numerous await points between the guard (line 1324) and agent registration (track_agent at line 4790) create a window where a second message for the same session can bypass the guard and start a duplicate agent, corrupting the transcript. The await gap includes: hook emissions, vision enrichment (external API call), audio transcription (external API call), session hygiene compression, and the run_in_executor call itself. For messages with media attachments the window can be several seconds wide. The sentinel is wrapped in try/finally so it is always cleaned up — even if the handler raises or takes an early-return path. When the real AIAgent is created, track_agent() overwrites the sentinel with the actual instance (preserving interrupt support). Also handles the edge case where a message arrives while the sentinel is set but no real agent exists yet: the message is queued via the adapter's pending-message mechanism instead of attempting to call interrupt() on the sentinel object. --- gateway/run.py | 41 ++++- tests/gateway/test_session_race_guard.py | 197 +++++++++++++++++++++++ 2 files changed, 235 insertions(+), 3 deletions(-) create mode 100644 tests/gateway/test_session_race_guard.py diff --git a/gateway/run.py b/gateway/run.py index e5efbe226..08beabc91 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -222,6 +222,12 @@ from gateway.platforms.base import BasePlatformAdapter, MessageEvent, MessageTyp logger = logging.getLogger(__name__) +# Sentinel placed into _running_agents immediately when a session starts +# processing, *before* any await. Prevents a second message for the same +# session from bypassing the "already running" guard during the async gap +# between the guard check and actual agent creation. +_AGENT_PENDING_SENTINEL = object() + def _resolve_runtime_agent_kwargs() -> dict: """Resolve provider credentials for gateway-created AIAgent instances.""" @@ -1346,7 +1352,14 @@ class GatewayRunner: adapter._pending_messages[_quick_key] = event return None - running_agent = self._running_agents[_quick_key] + running_agent = self._running_agents.get(_quick_key) + if running_agent is _AGENT_PENDING_SENTINEL: + # Agent is being set up but not ready yet — queue the message + # so it will be picked up after the agent starts. + adapter = self.adapters.get(source.platform) + if adapter: + adapter._pending_messages[_quick_key] = event + return None logger.debug("PRIORITY interrupt for session %s", _quick_key[:20]) running_agent.interrupt(event.text) if _quick_key in self._pending_messages: @@ -1354,7 +1367,7 @@ class GatewayRunner: else: self._pending_messages[_quick_key] = event.text return None - + # Check for commands command = event.get_command() @@ -1527,7 +1540,29 @@ class GatewayRunner: # Pending exec approvals are handled by /approve and /deny commands above. # No bare text matching — "yes" in normal conversation must not trigger # execution of a dangerous command. - + + # ── Claim this session before any await ─────────────────────── + # Between here and _run_agent registering the real AIAgent, there + # are numerous await points (hooks, vision enrichment, STT, + # session hygiene compression). Without this sentinel a second + # message arriving during any of those yields would pass the + # "already running" guard and spin up a duplicate agent for the + # same session — corrupting the transcript. + self._running_agents[_quick_key] = _AGENT_PENDING_SENTINEL + + try: + return await self._handle_message_with_agent(event, source, _quick_key) + finally: + # If _run_agent replaced the sentinel with a real agent and + # then cleaned it up, this is a no-op. If we exited early + # (exception, command fallthrough, etc.) the sentinel must + # not linger or the session would be permanently locked out. + if self._running_agents.get(_quick_key) is _AGENT_PENDING_SENTINEL: + del self._running_agents[_quick_key] + + async def _handle_message_with_agent(self, event, source, _quick_key: str): + """Inner handler that runs under the _running_agents sentinel guard.""" + # Get or create session session_entry = self.session_store.get_or_create_session(source) session_key = session_entry.session_key diff --git a/tests/gateway/test_session_race_guard.py b/tests/gateway/test_session_race_guard.py new file mode 100644 index 000000000..0161b44c7 --- /dev/null +++ b/tests/gateway/test_session_race_guard.py @@ -0,0 +1,197 @@ +"""Tests for the session race guard that prevents concurrent agent runs. + +The sentinel-based guard ensures that when _handle_message passes the +"is an agent already running?" check and proceeds to the slow async +setup path (vision enrichment, STT, hooks, session hygiene), a second +message for the same session is correctly recognized as "already running" +and routed through the interrupt/queue path instead of spawning a +duplicate agent. +""" + +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from gateway.config import GatewayConfig, Platform, PlatformConfig +from gateway.platforms.base import MessageEvent, MessageType +from gateway.run import GatewayRunner, _AGENT_PENDING_SENTINEL +from gateway.session import SessionSource, build_session_key + + +class _FakeAdapter: + """Minimal adapter stub for testing.""" + + def __init__(self): + self._pending_messages = {} + + async def send(self, chat_id, text, **kwargs): + pass + + +def _make_runner(): + runner = object.__new__(GatewayRunner) + runner.config = GatewayConfig( + platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="***")} + ) + runner.adapters = {Platform.TELEGRAM: _FakeAdapter()} + runner._running_agents = {} + runner._pending_messages = {} + runner._pending_approvals = {} + runner._voice_mode = {} + runner._is_user_authorized = lambda _source: True + return runner + + +def _make_event(text="hello", chat_id="12345"): + source = SessionSource( + platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm" + ) + return MessageEvent(text=text, message_type=MessageType.TEXT, source=source) + + +# ------------------------------------------------------------------ +# Test 1: Sentinel is placed before _handle_message_with_agent runs +# ------------------------------------------------------------------ +@pytest.mark.asyncio +async def test_sentinel_placed_before_agent_setup(): + """After passing the 'not running' guard, the sentinel must be + written into _running_agents *before* any await, so that a + concurrent message sees the session as occupied.""" + runner = _make_runner() + event = _make_event() + session_key = build_session_key(event.source) + + # Patch _handle_message_with_agent to capture state at entry + sentinel_was_set = False + + async def mock_inner(self_inner, ev, src, qk): + nonlocal sentinel_was_set + sentinel_was_set = runner._running_agents.get(qk) is _AGENT_PENDING_SENTINEL + return "ok" + + with patch.object(GatewayRunner, "_handle_message_with_agent", mock_inner): + await runner._handle_message(event) + + assert sentinel_was_set, ( + "Sentinel must be in _running_agents when _handle_message_with_agent starts" + ) + + +# ------------------------------------------------------------------ +# Test 2: Sentinel is cleaned up after _handle_message_with_agent +# ------------------------------------------------------------------ +@pytest.mark.asyncio +async def test_sentinel_cleaned_up_after_handler_returns(): + """If _handle_message_with_agent returns normally, the sentinel + must be removed so the session is not permanently locked.""" + runner = _make_runner() + event = _make_event() + session_key = build_session_key(event.source) + + async def mock_inner(self_inner, ev, src, qk): + return "ok" + + with patch.object(GatewayRunner, "_handle_message_with_agent", mock_inner): + await runner._handle_message(event) + + assert session_key not in runner._running_agents, ( + "Sentinel must be removed after handler completes" + ) + + +# ------------------------------------------------------------------ +# Test 3: Sentinel cleaned up on exception +# ------------------------------------------------------------------ +@pytest.mark.asyncio +async def test_sentinel_cleaned_up_on_exception(): + """If _handle_message_with_agent raises, the sentinel must still + be cleaned up so the session is not permanently locked.""" + runner = _make_runner() + event = _make_event() + session_key = build_session_key(event.source) + + async def mock_inner(self_inner, ev, src, qk): + raise RuntimeError("boom") + + with patch.object(GatewayRunner, "_handle_message_with_agent", mock_inner): + with pytest.raises(RuntimeError, match="boom"): + await runner._handle_message(event) + + assert session_key not in runner._running_agents, ( + "Sentinel must be removed even if handler raises" + ) + + +# ------------------------------------------------------------------ +# Test 4: Second message during sentinel sees "already running" +# ------------------------------------------------------------------ +@pytest.mark.asyncio +async def test_second_message_during_sentinel_queued_not_duplicate(): + """While the sentinel is set (agent setup in progress), a second + message for the same session must hit the 'already running' branch + and be queued — not start a second agent.""" + runner = _make_runner() + event1 = _make_event(text="first message") + event2 = _make_event(text="second message") + session_key = build_session_key(event1.source) + + barrier = asyncio.Event() + + async def slow_inner(self_inner, ev, src, qk): + # Simulate slow setup — wait until test tells us to proceed + await barrier.wait() + return "ok" + + with patch.object(GatewayRunner, "_handle_message_with_agent", slow_inner): + # Start first message (will block at barrier) + task1 = asyncio.create_task(runner._handle_message(event1)) + # Yield so task1 enters slow_inner and sentinel is set + await asyncio.sleep(0) + + # Verify sentinel is set + assert runner._running_agents.get(session_key) is _AGENT_PENDING_SENTINEL + + # Second message should see "already running" and be queued + result2 = await runner._handle_message(event2) + assert result2 is None, "Second message should return None (queued)" + + # The second message should have been queued in adapter pending + adapter = runner.adapters[Platform.TELEGRAM] + assert session_key in adapter._pending_messages, ( + "Second message should be queued as pending" + ) + assert adapter._pending_messages[session_key] is event2 + + # Let first message complete + barrier.set() + await task1 + + +# ------------------------------------------------------------------ +# Test 5: Sentinel not placed for command messages +# ------------------------------------------------------------------ +@pytest.mark.asyncio +async def test_command_messages_do_not_leave_sentinel(): + """Slash commands (/help, /status, etc.) return early from + _handle_message. They must NOT leave a sentinel behind.""" + runner = _make_runner() + source = SessionSource( + platform=Platform.TELEGRAM, chat_id="12345", chat_type="dm" + ) + event = MessageEvent( + text="/help", message_type=MessageType.TEXT, source=source + ) + session_key = build_session_key(source) + + # Mock the help handler to avoid needing full runner setup + runner._handle_help_command = AsyncMock(return_value="Help text") + # Need hooks for command emission + runner.hooks = MagicMock() + runner.hooks.emit = AsyncMock() + + await runner._handle_message(event) + + assert session_key not in runner._running_agents, ( + "Command handlers must not leave sentinel in _running_agents" + )