fix(gateway): prevent concurrent agent runs for the same session

Place a sentinel in _running_agents immediately after the "already
running" guard check passes — before any await.  Without this, the
numerous await points between the guard (line 1324) and agent
registration (track_agent at line 4790) create a window where a
second message for the same session can bypass the guard and start
a duplicate agent, corrupting the transcript.

The await gap includes: hook emissions, vision enrichment (external
API call), audio transcription (external API call), session hygiene
compression, and the run_in_executor call itself.  For messages with
media attachments the window can be several seconds wide.

The sentinel is wrapped in try/finally so it is always cleaned up —
even if the handler raises or takes an early-return path.  When the
real AIAgent is created, track_agent() overwrites the sentinel with
the actual instance (preserving interrupt support).

Also handles the edge case where a message arrives while the sentinel
is set but no real agent exists yet: the message is queued via the
adapter's pending-message mechanism instead of attempting to call
interrupt() on the sentinel object.
This commit is contained in:
Gutslabs
2026-03-19 22:32:37 +03:00
committed by Test
parent 02954c1a10
commit aaa96713d4
2 changed files with 235 additions and 3 deletions

View File

@@ -222,6 +222,12 @@ from gateway.platforms.base import BasePlatformAdapter, MessageEvent, MessageTyp
logger = logging.getLogger(__name__)
# Sentinel placed into _running_agents immediately when a session starts
# processing, *before* any await. Prevents a second message for the same
# session from bypassing the "already running" guard during the async gap
# between the guard check and actual agent creation.
_AGENT_PENDING_SENTINEL = object()
def _resolve_runtime_agent_kwargs() -> dict:
"""Resolve provider credentials for gateway-created AIAgent instances."""
@@ -1346,7 +1352,14 @@ class GatewayRunner:
adapter._pending_messages[_quick_key] = event
return None
running_agent = self._running_agents[_quick_key]
running_agent = self._running_agents.get(_quick_key)
if running_agent is _AGENT_PENDING_SENTINEL:
# Agent is being set up but not ready yet — queue the message
# so it will be picked up after the agent starts.
adapter = self.adapters.get(source.platform)
if adapter:
adapter._pending_messages[_quick_key] = event
return None
logger.debug("PRIORITY interrupt for session %s", _quick_key[:20])
running_agent.interrupt(event.text)
if _quick_key in self._pending_messages:
@@ -1354,7 +1367,7 @@ class GatewayRunner:
else:
self._pending_messages[_quick_key] = event.text
return None
# Check for commands
command = event.get_command()
@@ -1527,7 +1540,29 @@ class GatewayRunner:
# Pending exec approvals are handled by /approve and /deny commands above.
# No bare text matching — "yes" in normal conversation must not trigger
# execution of a dangerous command.
# ── Claim this session before any await ───────────────────────
# Between here and _run_agent registering the real AIAgent, there
# are numerous await points (hooks, vision enrichment, STT,
# session hygiene compression). Without this sentinel a second
# message arriving during any of those yields would pass the
# "already running" guard and spin up a duplicate agent for the
# same session — corrupting the transcript.
self._running_agents[_quick_key] = _AGENT_PENDING_SENTINEL
try:
return await self._handle_message_with_agent(event, source, _quick_key)
finally:
# If _run_agent replaced the sentinel with a real agent and
# then cleaned it up, this is a no-op. If we exited early
# (exception, command fallthrough, etc.) the sentinel must
# not linger or the session would be permanently locked out.
if self._running_agents.get(_quick_key) is _AGENT_PENDING_SENTINEL:
del self._running_agents[_quick_key]
async def _handle_message_with_agent(self, event, source, _quick_key: str):
"""Inner handler that runs under the _running_agents sentinel guard."""
# Get or create session
session_entry = self.session_store.get_or_create_session(source)
session_key = session_entry.session_key

View File

@@ -0,0 +1,197 @@
"""Tests for the session race guard that prevents concurrent agent runs.
The sentinel-based guard ensures that when _handle_message passes the
"is an agent already running?" check and proceeds to the slow async
setup path (vision enrichment, STT, hooks, session hygiene), a second
message for the same session is correctly recognized as "already running"
and routed through the interrupt/queue path instead of spawning a
duplicate agent.
"""
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from gateway.config import GatewayConfig, Platform, PlatformConfig
from gateway.platforms.base import MessageEvent, MessageType
from gateway.run import GatewayRunner, _AGENT_PENDING_SENTINEL
from gateway.session import SessionSource, build_session_key
class _FakeAdapter:
"""Minimal adapter stub for testing."""
def __init__(self):
self._pending_messages = {}
async def send(self, chat_id, text, **kwargs):
pass
def _make_runner():
runner = object.__new__(GatewayRunner)
runner.config = GatewayConfig(
platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="***")}
)
runner.adapters = {Platform.TELEGRAM: _FakeAdapter()}
runner._running_agents = {}
runner._pending_messages = {}
runner._pending_approvals = {}
runner._voice_mode = {}
runner._is_user_authorized = lambda _source: True
return runner
def _make_event(text="hello", chat_id="12345"):
source = SessionSource(
platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm"
)
return MessageEvent(text=text, message_type=MessageType.TEXT, source=source)
# ------------------------------------------------------------------
# Test 1: Sentinel is placed before _handle_message_with_agent runs
# ------------------------------------------------------------------
@pytest.mark.asyncio
async def test_sentinel_placed_before_agent_setup():
"""After passing the 'not running' guard, the sentinel must be
written into _running_agents *before* any await, so that a
concurrent message sees the session as occupied."""
runner = _make_runner()
event = _make_event()
session_key = build_session_key(event.source)
# Patch _handle_message_with_agent to capture state at entry
sentinel_was_set = False
async def mock_inner(self_inner, ev, src, qk):
nonlocal sentinel_was_set
sentinel_was_set = runner._running_agents.get(qk) is _AGENT_PENDING_SENTINEL
return "ok"
with patch.object(GatewayRunner, "_handle_message_with_agent", mock_inner):
await runner._handle_message(event)
assert sentinel_was_set, (
"Sentinel must be in _running_agents when _handle_message_with_agent starts"
)
# ------------------------------------------------------------------
# Test 2: Sentinel is cleaned up after _handle_message_with_agent
# ------------------------------------------------------------------
@pytest.mark.asyncio
async def test_sentinel_cleaned_up_after_handler_returns():
"""If _handle_message_with_agent returns normally, the sentinel
must be removed so the session is not permanently locked."""
runner = _make_runner()
event = _make_event()
session_key = build_session_key(event.source)
async def mock_inner(self_inner, ev, src, qk):
return "ok"
with patch.object(GatewayRunner, "_handle_message_with_agent", mock_inner):
await runner._handle_message(event)
assert session_key not in runner._running_agents, (
"Sentinel must be removed after handler completes"
)
# ------------------------------------------------------------------
# Test 3: Sentinel cleaned up on exception
# ------------------------------------------------------------------
@pytest.mark.asyncio
async def test_sentinel_cleaned_up_on_exception():
"""If _handle_message_with_agent raises, the sentinel must still
be cleaned up so the session is not permanently locked."""
runner = _make_runner()
event = _make_event()
session_key = build_session_key(event.source)
async def mock_inner(self_inner, ev, src, qk):
raise RuntimeError("boom")
with patch.object(GatewayRunner, "_handle_message_with_agent", mock_inner):
with pytest.raises(RuntimeError, match="boom"):
await runner._handle_message(event)
assert session_key not in runner._running_agents, (
"Sentinel must be removed even if handler raises"
)
# ------------------------------------------------------------------
# Test 4: Second message during sentinel sees "already running"
# ------------------------------------------------------------------
@pytest.mark.asyncio
async def test_second_message_during_sentinel_queued_not_duplicate():
"""While the sentinel is set (agent setup in progress), a second
message for the same session must hit the 'already running' branch
and be queued — not start a second agent."""
runner = _make_runner()
event1 = _make_event(text="first message")
event2 = _make_event(text="second message")
session_key = build_session_key(event1.source)
barrier = asyncio.Event()
async def slow_inner(self_inner, ev, src, qk):
# Simulate slow setup — wait until test tells us to proceed
await barrier.wait()
return "ok"
with patch.object(GatewayRunner, "_handle_message_with_agent", slow_inner):
# Start first message (will block at barrier)
task1 = asyncio.create_task(runner._handle_message(event1))
# Yield so task1 enters slow_inner and sentinel is set
await asyncio.sleep(0)
# Verify sentinel is set
assert runner._running_agents.get(session_key) is _AGENT_PENDING_SENTINEL
# Second message should see "already running" and be queued
result2 = await runner._handle_message(event2)
assert result2 is None, "Second message should return None (queued)"
# The second message should have been queued in adapter pending
adapter = runner.adapters[Platform.TELEGRAM]
assert session_key in adapter._pending_messages, (
"Second message should be queued as pending"
)
assert adapter._pending_messages[session_key] is event2
# Let first message complete
barrier.set()
await task1
# ------------------------------------------------------------------
# Test 5: Sentinel not placed for command messages
# ------------------------------------------------------------------
@pytest.mark.asyncio
async def test_command_messages_do_not_leave_sentinel():
"""Slash commands (/help, /status, etc.) return early from
_handle_message. They must NOT leave a sentinel behind."""
runner = _make_runner()
source = SessionSource(
platform=Platform.TELEGRAM, chat_id="12345", chat_type="dm"
)
event = MessageEvent(
text="/help", message_type=MessageType.TEXT, source=source
)
session_key = build_session_key(source)
# Mock the help handler to avoid needing full runner setup
runner._handle_help_command = AsyncMock(return_value="Help text")
# Need hooks for command emission
runner.hooks = MagicMock()
runner.hooks.emit = AsyncMock()
await runner._handle_message(event)
assert session_key not in runner._running_agents, (
"Command handlers must not leave sentinel in _running_agents"
)