- update managed-server compatibility tests to match the current ServerManager.tool_parser wiring used by hermes_base_env - make quick-command CLI assertions accept Rich Text objects, which is how ANSI-safe output is rendered now - set HERMES_HOME explicitly in the Discord auto-thread config bridge test so it loads the intended temporary config file Validated with the targeted test set and the full pytest suite.
149 lines
6.2 KiB
Python
149 lines
6.2 KiB
Python
"""Tests for user-defined quick commands that bypass the agent loop."""
|
|
import subprocess
|
|
from unittest.mock import MagicMock, patch, AsyncMock
|
|
from rich.text import Text
|
|
import pytest
|
|
|
|
|
|
# ── CLI tests ──────────────────────────────────────────────────────────────
|
|
|
|
class TestCLIQuickCommands:
|
|
"""Test quick command dispatch in HermesCLI.process_command."""
|
|
|
|
@staticmethod
|
|
def _printed_plain(call_arg):
|
|
if isinstance(call_arg, Text):
|
|
return call_arg.plain
|
|
return str(call_arg)
|
|
|
|
def _make_cli(self, quick_commands):
|
|
from cli import HermesCLI
|
|
cli = HermesCLI.__new__(HermesCLI)
|
|
cli.config = {"quick_commands": quick_commands}
|
|
cli.console = MagicMock()
|
|
cli.agent = None
|
|
cli.conversation_history = []
|
|
return cli
|
|
|
|
def test_exec_command_runs_and_prints_output(self):
|
|
cli = self._make_cli({"dn": {"type": "exec", "command": "echo daily-note"}})
|
|
result = cli.process_command("/dn")
|
|
assert result is True
|
|
cli.console.print.assert_called_once()
|
|
printed = self._printed_plain(cli.console.print.call_args[0][0])
|
|
assert printed == "daily-note"
|
|
|
|
def test_exec_command_stderr_shown_on_no_stdout(self):
|
|
cli = self._make_cli({"err": {"type": "exec", "command": "echo error >&2"}})
|
|
result = cli.process_command("/err")
|
|
assert result is True
|
|
# stderr fallback — should print something
|
|
cli.console.print.assert_called_once()
|
|
|
|
def test_exec_command_no_output_shows_fallback(self):
|
|
cli = self._make_cli({"empty": {"type": "exec", "command": "true"}})
|
|
cli.process_command("/empty")
|
|
cli.console.print.assert_called_once()
|
|
args = cli.console.print.call_args[0][0]
|
|
assert "no output" in args.lower()
|
|
|
|
def test_unsupported_type_shows_error(self):
|
|
cli = self._make_cli({"bad": {"type": "prompt", "command": "echo hi"}})
|
|
cli.process_command("/bad")
|
|
cli.console.print.assert_called_once()
|
|
args = cli.console.print.call_args[0][0]
|
|
assert "unsupported type" in args.lower()
|
|
|
|
def test_missing_command_field_shows_error(self):
|
|
cli = self._make_cli({"oops": {"type": "exec"}})
|
|
cli.process_command("/oops")
|
|
cli.console.print.assert_called_once()
|
|
args = cli.console.print.call_args[0][0]
|
|
assert "no command defined" in args.lower()
|
|
|
|
def test_quick_command_takes_priority_over_skill_commands(self):
|
|
"""Quick commands must be checked before skill slash commands."""
|
|
cli = self._make_cli({"mygif": {"type": "exec", "command": "echo overridden"}})
|
|
with patch("cli._skill_commands", {"/mygif": {"name": "gif-search"}}):
|
|
cli.process_command("/mygif")
|
|
cli.console.print.assert_called_once()
|
|
printed = self._printed_plain(cli.console.print.call_args[0][0])
|
|
assert printed == "overridden"
|
|
|
|
def test_unknown_command_still_shows_error(self):
|
|
cli = self._make_cli({})
|
|
cli.process_command("/nonexistent")
|
|
cli.console.print.assert_called()
|
|
args = cli.console.print.call_args_list[0][0][0]
|
|
assert "unknown command" in args.lower()
|
|
|
|
def test_timeout_shows_error(self):
|
|
cli = self._make_cli({"slow": {"type": "exec", "command": "sleep 100"}})
|
|
with patch("subprocess.run", side_effect=subprocess.TimeoutExpired("sleep", 30)):
|
|
cli.process_command("/slow")
|
|
cli.console.print.assert_called_once()
|
|
args = cli.console.print.call_args[0][0]
|
|
assert "timed out" in args.lower()
|
|
|
|
|
|
# ── Gateway tests ──────────────────────────────────────────────────────────
|
|
|
|
class TestGatewayQuickCommands:
|
|
"""Test quick command dispatch in GatewayRunner._handle_message."""
|
|
|
|
def _make_event(self, command, args=""):
|
|
event = MagicMock()
|
|
event.get_command.return_value = command
|
|
event.get_command_args.return_value = args
|
|
event.text = f"/{command} {args}".strip()
|
|
event.source = MagicMock()
|
|
event.source.user_id = "test_user"
|
|
event.source.user_name = "Test User"
|
|
event.source.platform.value = "telegram"
|
|
event.source.chat_type = "dm"
|
|
event.source.chat_id = "123"
|
|
return event
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_exec_command_returns_output(self):
|
|
from gateway.run import GatewayRunner
|
|
runner = GatewayRunner.__new__(GatewayRunner)
|
|
runner.config = {"quick_commands": {"limits": {"type": "exec", "command": "echo ok"}}}
|
|
runner._running_agents = {}
|
|
runner._pending_messages = {}
|
|
runner._is_user_authorized = MagicMock(return_value=True)
|
|
|
|
event = self._make_event("limits")
|
|
result = await runner._handle_message(event)
|
|
assert result == "ok"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_unsupported_type_returns_error(self):
|
|
from gateway.run import GatewayRunner
|
|
runner = GatewayRunner.__new__(GatewayRunner)
|
|
runner.config = {"quick_commands": {"bad": {"type": "prompt", "command": "echo hi"}}}
|
|
runner._running_agents = {}
|
|
runner._pending_messages = {}
|
|
runner._is_user_authorized = MagicMock(return_value=True)
|
|
|
|
event = self._make_event("bad")
|
|
result = await runner._handle_message(event)
|
|
assert result is not None
|
|
assert "unsupported type" in result.lower()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_timeout_returns_error(self):
|
|
from gateway.run import GatewayRunner
|
|
import asyncio
|
|
runner = GatewayRunner.__new__(GatewayRunner)
|
|
runner.config = {"quick_commands": {"slow": {"type": "exec", "command": "sleep 100"}}}
|
|
runner._running_agents = {}
|
|
runner._pending_messages = {}
|
|
runner._is_user_authorized = MagicMock(return_value=True)
|
|
|
|
event = self._make_event("slow")
|
|
with patch("asyncio.wait_for", side_effect=asyncio.TimeoutError):
|
|
result = await runner._handle_message(event)
|
|
assert result is not None
|
|
assert "timed out" in result.lower()
|