When an agent was actively processing a message, /status sent via Telegram (or any gateway) was queued as a pending interrupt instead of being dispatched immediately. The base platform adapter's handle_message() only had special-case bypass logic for /approve and /deny, so /status fell through to the default interrupt path and was never processed as a system command. Apply the same bypass pattern used by /approve//deny: detect cmd == 'status' inside the active-session guard, dispatch directly to the message handler, and send the response without touching session lifecycle or interrupt state. Adds a regression test that verifies /status is dispatched and responded to immediately even when _active_sessions contains an entry for the session.
189 lines
6.3 KiB
Python
189 lines
6.3 KiB
Python
"""Tests for gateway /status behavior and token persistence."""
|
|
|
|
from datetime import datetime
|
|
from types import SimpleNamespace
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import pytest
|
|
|
|
from gateway.config import GatewayConfig, Platform, PlatformConfig
|
|
from gateway.platforms.base import MessageEvent
|
|
from gateway.session import SessionEntry, SessionSource, build_session_key
|
|
|
|
|
|
def _make_source() -> SessionSource:
|
|
return SessionSource(
|
|
platform=Platform.TELEGRAM,
|
|
user_id="u1",
|
|
chat_id="c1",
|
|
user_name="tester",
|
|
chat_type="dm",
|
|
)
|
|
|
|
|
|
def _make_event(text: str) -> MessageEvent:
|
|
return MessageEvent(
|
|
text=text,
|
|
source=_make_source(),
|
|
message_id="m1",
|
|
)
|
|
|
|
|
|
def _make_runner(session_entry: SessionEntry):
|
|
from gateway.run import GatewayRunner
|
|
|
|
runner = object.__new__(GatewayRunner)
|
|
runner.config = GatewayConfig(
|
|
platforms={Platform.TELEGRAM: PlatformConfig(enabled=True, token="***")}
|
|
)
|
|
adapter = MagicMock()
|
|
adapter.send = AsyncMock()
|
|
runner.adapters = {Platform.TELEGRAM: adapter}
|
|
runner._voice_mode = {}
|
|
runner.hooks = SimpleNamespace(emit=AsyncMock(), loaded_hooks=False)
|
|
runner.session_store = MagicMock()
|
|
runner.session_store.get_or_create_session.return_value = session_entry
|
|
runner.session_store.load_transcript.return_value = []
|
|
runner.session_store.has_any_sessions.return_value = True
|
|
runner.session_store.append_to_transcript = MagicMock()
|
|
runner.session_store.rewrite_transcript = MagicMock()
|
|
runner.session_store.update_session = MagicMock()
|
|
runner._running_agents = {}
|
|
runner._pending_messages = {}
|
|
runner._pending_approvals = {}
|
|
runner._session_db = None
|
|
runner._reasoning_config = None
|
|
runner._provider_routing = {}
|
|
runner._fallback_model = None
|
|
runner._show_reasoning = False
|
|
runner._is_user_authorized = lambda _source: True
|
|
runner._set_session_env = lambda _context: None
|
|
runner._should_send_voice_reply = lambda *_args, **_kwargs: False
|
|
runner._send_voice_reply = AsyncMock()
|
|
runner._capture_gateway_honcho_if_configured = lambda *args, **kwargs: None
|
|
runner._emit_gateway_run_progress = AsyncMock()
|
|
return runner
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_command_reports_running_agent_without_interrupt(monkeypatch):
|
|
session_entry = SessionEntry(
|
|
session_key=build_session_key(_make_source()),
|
|
session_id="sess-1",
|
|
created_at=datetime.now(),
|
|
updated_at=datetime.now(),
|
|
platform=Platform.TELEGRAM,
|
|
chat_type="dm",
|
|
total_tokens=321,
|
|
)
|
|
runner = _make_runner(session_entry)
|
|
running_agent = MagicMock()
|
|
runner._running_agents[build_session_key(_make_source())] = running_agent
|
|
|
|
result = await runner._handle_message(_make_event("/status"))
|
|
|
|
assert "**Tokens:** 321" in result
|
|
assert "**Agent Running:** Yes ⚡" in result
|
|
running_agent.interrupt.assert_not_called()
|
|
assert runner._pending_messages == {}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handle_message_persists_agent_token_counts(monkeypatch):
|
|
import gateway.run as gateway_run
|
|
|
|
session_entry = SessionEntry(
|
|
session_key=build_session_key(_make_source()),
|
|
session_id="sess-1",
|
|
created_at=datetime.now(),
|
|
updated_at=datetime.now(),
|
|
platform=Platform.TELEGRAM,
|
|
chat_type="dm",
|
|
)
|
|
runner = _make_runner(session_entry)
|
|
runner.session_store.load_transcript.return_value = [{"role": "user", "content": "earlier"}]
|
|
runner._run_agent = AsyncMock(
|
|
return_value={
|
|
"final_response": "ok",
|
|
"messages": [],
|
|
"tools": [],
|
|
"history_offset": 0,
|
|
"last_prompt_tokens": 80,
|
|
"input_tokens": 120,
|
|
"output_tokens": 45,
|
|
"model": "openai/test-model",
|
|
}
|
|
)
|
|
|
|
monkeypatch.setattr(gateway_run, "_resolve_runtime_agent_kwargs", lambda: {"api_key": "***"})
|
|
monkeypatch.setattr(
|
|
"agent.model_metadata.get_model_context_length",
|
|
lambda *_args, **_kwargs: 100000,
|
|
)
|
|
|
|
result = await runner._handle_message(_make_event("hello"))
|
|
|
|
assert result == "ok"
|
|
runner.session_store.update_session.assert_called_once_with(
|
|
session_entry.session_key,
|
|
last_prompt_tokens=80,
|
|
)
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_command_bypasses_active_session_guard():
|
|
"""When an agent is running, /status must be dispatched immediately via
|
|
base.handle_message — not queued or treated as an interrupt (#5046)."""
|
|
import asyncio
|
|
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, MessageType
|
|
from gateway.session import build_session_key
|
|
from gateway.config import Platform, PlatformConfig, GatewayConfig
|
|
|
|
source = _make_source()
|
|
session_key = build_session_key(source)
|
|
|
|
handler_called_with = []
|
|
|
|
async def fake_handler(event):
|
|
handler_called_with.append(event)
|
|
return "📊 **Hermes Gateway Status**\n**Agent Running:** Yes ⚡"
|
|
|
|
# Concrete subclass to avoid abstract method errors
|
|
class _ConcreteAdapter(BasePlatformAdapter):
|
|
platform = Platform.TELEGRAM
|
|
|
|
async def connect(self): pass
|
|
async def disconnect(self): pass
|
|
async def send(self, chat_id, content, **kwargs): pass
|
|
async def get_chat_info(self, chat_id): return {}
|
|
|
|
platform_config = PlatformConfig(enabled=True, token="***")
|
|
adapter = _ConcreteAdapter(platform_config, Platform.TELEGRAM)
|
|
adapter.set_message_handler(fake_handler)
|
|
|
|
sent = []
|
|
|
|
async def fake_send_with_retry(chat_id, content, reply_to=None, metadata=None):
|
|
sent.append(content)
|
|
|
|
adapter._send_with_retry = fake_send_with_retry
|
|
|
|
# Simulate an active session
|
|
interrupt_event = asyncio.Event()
|
|
adapter._active_sessions[session_key] = interrupt_event
|
|
|
|
event = MessageEvent(
|
|
text="/status",
|
|
source=source,
|
|
message_id="m1",
|
|
message_type=MessageType.COMMAND,
|
|
)
|
|
await adapter.handle_message(event)
|
|
|
|
assert handler_called_with, "/status handler was never called (event was queued or dropped)"
|
|
assert sent, "/status response was never sent"
|
|
assert "Agent Running" in sent[0]
|
|
assert not interrupt_event.is_set(), "/status incorrectly triggered an agent interrupt"
|
|
assert session_key not in adapter._pending_messages, "/status was incorrectly queued"
|