* fix(gateway): /stop and /new bypass Level 1 active-session guard The base adapter's Level 1 guard intercepted ALL messages while an agent was running, including /stop and /new. These commands were queued as pending messages instead of being dispatched to the gateway runner's Level 2 handler. When the agent eventually stopped (via the interrupt mechanism), the command text leaked into the conversation as a user message — the model would receive '/stop' as input and respond to it. Fix: Add /stop, /new, and /reset to the bypass set in base.py alongside /approve, /deny, and /status. Consolidate the three separate bypass blocks into one. Commands in the bypass set are dispatched inline to the gateway runner, where Level 2 handles them correctly (hard-kill for /stop, session reset for /new). Also add a safety net in _run_agent's pending-message processing: if the pending text resolves to a known slash command, discard it instead of passing it to the agent. This catches edge cases where command text leaks through the interrupt_message fallback. Refs: #5244 * test: regression tests for command bypass of active-session guard 17 tests covering: - /stop, /new, /reset bypass the Level 1 guard when agent is running - /approve, /deny, /status bypass (existing behavior, now tested) - Regular text and unknown commands still queued (not bypassed) - File paths like '/path/to/file' not treated as commands - Telegram @botname suffix handled correctly - Safety net command resolution (resolve_command detects known commands)
314 lines
11 KiB
Python
314 lines
11 KiB
Python
"""Regression tests: slash commands must bypass the base adapter's active-session guard.
|
|
|
|
When an agent is running, the base adapter's Level 1 guard in
|
|
handle_message() intercepts all incoming messages and queues them as
|
|
pending. Certain commands (/stop, /new, /reset, /approve, /deny,
|
|
/status) must bypass this guard and be dispatched directly to the gateway
|
|
runner — otherwise they are queued as user text and either:
|
|
- leak into the conversation as agent input (/stop, /new), or
|
|
- deadlock (/approve, /deny — agent blocks on Event.wait)
|
|
|
|
These tests verify that the bypass works at the adapter level and that
|
|
the safety net in _run_agent discards leaked command text.
|
|
"""
|
|
|
|
import asyncio
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import pytest
|
|
|
|
from gateway.config import Platform, PlatformConfig
|
|
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, MessageType
|
|
from gateway.session import SessionSource, build_session_key
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class _StubAdapter(BasePlatformAdapter):
|
|
"""Concrete adapter with abstract methods stubbed out."""
|
|
|
|
async def connect(self):
|
|
pass
|
|
|
|
async def disconnect(self):
|
|
pass
|
|
|
|
async def send(self, chat_id, text, **kwargs):
|
|
pass
|
|
|
|
async def get_chat_info(self, chat_id):
|
|
return {}
|
|
|
|
|
|
def _make_adapter():
|
|
"""Create a minimal adapter for testing the active-session guard."""
|
|
config = PlatformConfig(enabled=True, token="test-token")
|
|
adapter = _StubAdapter(config, Platform.TELEGRAM)
|
|
adapter.sent_responses = []
|
|
|
|
async def _mock_handler(event):
|
|
cmd = event.get_command()
|
|
return f"handled:{cmd}" if cmd else f"handled:text:{event.text}"
|
|
|
|
adapter._message_handler = _mock_handler
|
|
|
|
async def _mock_send_retry(chat_id, content, **kwargs):
|
|
adapter.sent_responses.append(content)
|
|
|
|
adapter._send_with_retry = _mock_send_retry
|
|
return adapter
|
|
|
|
|
|
def _make_event(text="/stop", chat_id="12345"):
|
|
source = SessionSource(
|
|
platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm"
|
|
)
|
|
return MessageEvent(text=text, message_type=MessageType.TEXT, source=source)
|
|
|
|
|
|
def _session_key(chat_id="12345"):
|
|
source = SessionSource(
|
|
platform=Platform.TELEGRAM, chat_id=chat_id, chat_type="dm"
|
|
)
|
|
return build_session_key(source)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: commands bypass Level 1 when session is active
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCommandBypassActiveSession:
|
|
"""Commands that must bypass the active-session guard."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stop_bypasses_guard(self):
|
|
"""/stop must be dispatched directly, not queued."""
|
|
adapter = _make_adapter()
|
|
sk = _session_key()
|
|
adapter._active_sessions[sk] = asyncio.Event()
|
|
|
|
await adapter.handle_message(_make_event("/stop"))
|
|
|
|
assert sk not in adapter._pending_messages, (
|
|
"/stop was queued as a pending message instead of being dispatched"
|
|
)
|
|
assert any("handled:stop" in r for r in adapter.sent_responses), (
|
|
"/stop response was not sent back to the user"
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_new_bypasses_guard(self):
|
|
"""/new must be dispatched directly, not queued."""
|
|
adapter = _make_adapter()
|
|
sk = _session_key()
|
|
adapter._active_sessions[sk] = asyncio.Event()
|
|
|
|
await adapter.handle_message(_make_event("/new"))
|
|
|
|
assert sk not in adapter._pending_messages
|
|
assert any("handled:new" in r for r in adapter.sent_responses)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reset_bypasses_guard(self):
|
|
"""/reset (alias for /new) must be dispatched directly."""
|
|
adapter = _make_adapter()
|
|
sk = _session_key()
|
|
adapter._active_sessions[sk] = asyncio.Event()
|
|
|
|
await adapter.handle_message(_make_event("/reset"))
|
|
|
|
assert sk not in adapter._pending_messages
|
|
assert any("handled:reset" in r for r in adapter.sent_responses)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_approve_bypasses_guard(self):
|
|
"""/approve must bypass (deadlock prevention)."""
|
|
adapter = _make_adapter()
|
|
sk = _session_key()
|
|
adapter._active_sessions[sk] = asyncio.Event()
|
|
|
|
await adapter.handle_message(_make_event("/approve"))
|
|
|
|
assert sk not in adapter._pending_messages
|
|
assert any("handled:approve" in r for r in adapter.sent_responses)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_deny_bypasses_guard(self):
|
|
"""/deny must bypass (deadlock prevention)."""
|
|
adapter = _make_adapter()
|
|
sk = _session_key()
|
|
adapter._active_sessions[sk] = asyncio.Event()
|
|
|
|
await adapter.handle_message(_make_event("/deny"))
|
|
|
|
assert sk not in adapter._pending_messages
|
|
assert any("handled:deny" in r for r in adapter.sent_responses)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_bypasses_guard(self):
|
|
"""/status must bypass so it returns a system response."""
|
|
adapter = _make_adapter()
|
|
sk = _session_key()
|
|
adapter._active_sessions[sk] = asyncio.Event()
|
|
|
|
await adapter.handle_message(_make_event("/status"))
|
|
|
|
assert sk not in adapter._pending_messages
|
|
assert any("handled:status" in r for r in adapter.sent_responses)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: non-bypass messages still get queued
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestNonBypassStillQueued:
|
|
"""Regular messages and unknown commands must be queued, not dispatched."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_regular_text_queued(self):
|
|
"""Plain text while agent is running must be queued as pending."""
|
|
adapter = _make_adapter()
|
|
sk = _session_key()
|
|
adapter._active_sessions[sk] = asyncio.Event()
|
|
|
|
await adapter.handle_message(_make_event("hello world"))
|
|
|
|
assert sk in adapter._pending_messages, (
|
|
"Regular text was not queued — it should be pending"
|
|
)
|
|
assert len(adapter.sent_responses) == 0, (
|
|
"Regular text should not produce a direct response"
|
|
)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unknown_command_queued(self):
|
|
"""Unknown /commands must be queued, not dispatched."""
|
|
adapter = _make_adapter()
|
|
sk = _session_key()
|
|
adapter._active_sessions[sk] = asyncio.Event()
|
|
|
|
await adapter.handle_message(_make_event("/foobar"))
|
|
|
|
assert sk in adapter._pending_messages
|
|
assert len(adapter.sent_responses) == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_path_not_treated_as_command(self):
|
|
"""A message like '/path/to/file' must not bypass the guard."""
|
|
adapter = _make_adapter()
|
|
sk = _session_key()
|
|
adapter._active_sessions[sk] = asyncio.Event()
|
|
|
|
await adapter.handle_message(_make_event("/path/to/file.py"))
|
|
|
|
assert sk in adapter._pending_messages
|
|
assert len(adapter.sent_responses) == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: no active session — commands go through normally
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestNoActiveSessionNormalDispatch:
|
|
"""When no agent is running, messages spawn a background task normally."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stop_when_no_session_active(self):
|
|
"""/stop without an active session spawns a background task
|
|
(the Level 2 handler will return 'No active task')."""
|
|
adapter = _make_adapter()
|
|
sk = _session_key()
|
|
|
|
# No active session — _active_sessions is empty
|
|
assert sk not in adapter._active_sessions
|
|
|
|
await adapter.handle_message(_make_event("/stop"))
|
|
|
|
# Should have gone through the normal path (background task spawned)
|
|
# and NOT be in _pending_messages (that's the queued-during-active path)
|
|
assert sk not in adapter._pending_messages
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: safety net in _run_agent discards command text from pending queue
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPendingCommandSafetyNet:
|
|
"""The safety net in gateway/run.py _run_agent must discard command text
|
|
that leaks into the pending queue via interrupt_message fallback."""
|
|
|
|
def test_stop_command_detected(self):
|
|
"""resolve_command must recognize /stop so the safety net can
|
|
discard it."""
|
|
from hermes_cli.commands import resolve_command
|
|
|
|
assert resolve_command("stop") is not None
|
|
assert resolve_command("stop").name == "stop"
|
|
|
|
def test_new_command_detected(self):
|
|
from hermes_cli.commands import resolve_command
|
|
|
|
assert resolve_command("new") is not None
|
|
assert resolve_command("new").name == "new"
|
|
|
|
def test_reset_alias_detected(self):
|
|
from hermes_cli.commands import resolve_command
|
|
|
|
assert resolve_command("reset") is not None
|
|
assert resolve_command("reset").name == "new" # alias
|
|
|
|
def test_unknown_command_not_detected(self):
|
|
from hermes_cli.commands import resolve_command
|
|
|
|
assert resolve_command("foobar") is None
|
|
|
|
def test_file_path_not_detected_as_command(self):
|
|
"""'/path/to/file' should not resolve as a command."""
|
|
from hermes_cli.commands import resolve_command
|
|
|
|
# The safety net splits on whitespace and takes the first word
|
|
# after stripping '/'. For '/path/to/file', that's 'path/to/file'.
|
|
assert resolve_command("path/to/file") is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests: bypass with @botname suffix (Telegram-style)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestBypassWithBotnameSuffix:
|
|
"""Telegram appends @botname to commands. The bypass must still work."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_stop_with_botname(self):
|
|
"""/stop@MyHermesBot must bypass the guard."""
|
|
adapter = _make_adapter()
|
|
sk = _session_key()
|
|
adapter._active_sessions[sk] = asyncio.Event()
|
|
|
|
await adapter.handle_message(_make_event("/stop@MyHermesBot"))
|
|
|
|
assert sk not in adapter._pending_messages, (
|
|
"/stop@MyHermesBot was queued instead of bypassing"
|
|
)
|
|
assert any("handled:stop" in r for r in adapter.sent_responses)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_new_with_botname(self):
|
|
"""/new@MyHermesBot must bypass the guard."""
|
|
adapter = _make_adapter()
|
|
sk = _session_key()
|
|
adapter._active_sessions[sk] = asyncio.Event()
|
|
|
|
await adapter.handle_message(_make_event("/new@MyHermesBot"))
|
|
|
|
assert sk not in adapter._pending_messages
|
|
assert any("handled:new" in r for r in adapter.sent_responses)
|