Merge PR #840: background process notification modes + fix spinner line spam
- feat(gateway): configurable background_process_notifications (off/result/error/all) - fix(display): rate-limit spinner flushes to prevent line spam under patch_stdout Background notifications inspired by @PeterFile (PR #593).
This commit is contained in:
198
tests/gateway/test_background_process_notifications.py
Normal file
198
tests/gateway/test_background_process_notifications.py
Normal file
@@ -0,0 +1,198 @@
|
||||
"""Tests for configurable background process notification modes.
|
||||
|
||||
The gateway process watcher pushes status updates to users' chats when
|
||||
background terminal commands run. ``display.background_process_notifications``
|
||||
controls verbosity: off | result | error | all (default).
|
||||
|
||||
Contributed by @PeterFile (PR #593), reimplemented on current main.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import GatewayConfig, Platform
|
||||
from gateway.run import GatewayRunner
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class _FakeRegistry:
|
||||
"""Return pre-canned sessions, then None once exhausted."""
|
||||
|
||||
def __init__(self, sessions):
|
||||
self._sessions = list(sessions)
|
||||
|
||||
def get(self, session_id):
|
||||
if self._sessions:
|
||||
return self._sessions.pop(0)
|
||||
return None
|
||||
|
||||
|
||||
def _build_runner(monkeypatch, tmp_path, mode: str) -> GatewayRunner:
|
||||
"""Create a GatewayRunner with a fake config for the given mode."""
|
||||
(tmp_path / "config.yaml").write_text(
|
||||
f"display:\n background_process_notifications: {mode}\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
import gateway.run as gateway_run
|
||||
|
||||
monkeypatch.setattr(gateway_run, "_hermes_home", tmp_path)
|
||||
|
||||
runner = GatewayRunner(GatewayConfig())
|
||||
adapter = SimpleNamespace(send=AsyncMock())
|
||||
runner.adapters[Platform.TELEGRAM] = adapter
|
||||
return runner
|
||||
|
||||
|
||||
def _watcher_dict(session_id="proc_test"):
|
||||
return {
|
||||
"session_id": session_id,
|
||||
"check_interval": 0,
|
||||
"platform": "telegram",
|
||||
"chat_id": "123",
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _load_background_notifications_mode unit tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestLoadBackgroundNotificationsMode:
|
||||
|
||||
def test_defaults_to_all(self, monkeypatch, tmp_path):
|
||||
import gateway.run as gw
|
||||
monkeypatch.setattr(gw, "_hermes_home", tmp_path)
|
||||
monkeypatch.delenv("HERMES_BACKGROUND_NOTIFICATIONS", raising=False)
|
||||
assert GatewayRunner._load_background_notifications_mode() == "all"
|
||||
|
||||
def test_reads_config_yaml(self, monkeypatch, tmp_path):
|
||||
(tmp_path / "config.yaml").write_text(
|
||||
"display:\n background_process_notifications: error\n"
|
||||
)
|
||||
import gateway.run as gw
|
||||
monkeypatch.setattr(gw, "_hermes_home", tmp_path)
|
||||
monkeypatch.delenv("HERMES_BACKGROUND_NOTIFICATIONS", raising=False)
|
||||
assert GatewayRunner._load_background_notifications_mode() == "error"
|
||||
|
||||
def test_env_var_overrides_config(self, monkeypatch, tmp_path):
|
||||
(tmp_path / "config.yaml").write_text(
|
||||
"display:\n background_process_notifications: error\n"
|
||||
)
|
||||
import gateway.run as gw
|
||||
monkeypatch.setattr(gw, "_hermes_home", tmp_path)
|
||||
monkeypatch.setenv("HERMES_BACKGROUND_NOTIFICATIONS", "off")
|
||||
assert GatewayRunner._load_background_notifications_mode() == "off"
|
||||
|
||||
def test_false_value_maps_to_off(self, monkeypatch, tmp_path):
|
||||
(tmp_path / "config.yaml").write_text(
|
||||
"display:\n background_process_notifications: false\n"
|
||||
)
|
||||
import gateway.run as gw
|
||||
monkeypatch.setattr(gw, "_hermes_home", tmp_path)
|
||||
monkeypatch.delenv("HERMES_BACKGROUND_NOTIFICATIONS", raising=False)
|
||||
assert GatewayRunner._load_background_notifications_mode() == "off"
|
||||
|
||||
def test_invalid_value_defaults_to_all(self, monkeypatch, tmp_path):
|
||||
(tmp_path / "config.yaml").write_text(
|
||||
"display:\n background_process_notifications: banana\n"
|
||||
)
|
||||
import gateway.run as gw
|
||||
monkeypatch.setattr(gw, "_hermes_home", tmp_path)
|
||||
monkeypatch.delenv("HERMES_BACKGROUND_NOTIFICATIONS", raising=False)
|
||||
assert GatewayRunner._load_background_notifications_mode() == "all"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _run_process_watcher integration tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
("mode", "sessions", "expected_calls", "expected_fragment"),
|
||||
[
|
||||
# all mode: running output → sends update
|
||||
(
|
||||
"all",
|
||||
[
|
||||
SimpleNamespace(output_buffer="building...\n", exited=False, exit_code=None),
|
||||
None, # process disappears → watcher exits
|
||||
],
|
||||
1,
|
||||
"is still running",
|
||||
),
|
||||
# result mode: running output → no update
|
||||
(
|
||||
"result",
|
||||
[
|
||||
SimpleNamespace(output_buffer="building...\n", exited=False, exit_code=None),
|
||||
None,
|
||||
],
|
||||
0,
|
||||
None,
|
||||
),
|
||||
# off mode: exited process → no notification
|
||||
(
|
||||
"off",
|
||||
[SimpleNamespace(output_buffer="done\n", exited=True, exit_code=0)],
|
||||
0,
|
||||
None,
|
||||
),
|
||||
# result mode: exited → notifies
|
||||
(
|
||||
"result",
|
||||
[SimpleNamespace(output_buffer="done\n", exited=True, exit_code=0)],
|
||||
1,
|
||||
"finished with exit code 0",
|
||||
),
|
||||
# error mode: exit 0 → no notification
|
||||
(
|
||||
"error",
|
||||
[SimpleNamespace(output_buffer="done\n", exited=True, exit_code=0)],
|
||||
0,
|
||||
None,
|
||||
),
|
||||
# error mode: exit 1 → notifies
|
||||
(
|
||||
"error",
|
||||
[SimpleNamespace(output_buffer="traceback\n", exited=True, exit_code=1)],
|
||||
1,
|
||||
"finished with exit code 1",
|
||||
),
|
||||
# all mode: exited → notifies
|
||||
(
|
||||
"all",
|
||||
[SimpleNamespace(output_buffer="ok\n", exited=True, exit_code=0)],
|
||||
1,
|
||||
"finished with exit code 0",
|
||||
),
|
||||
],
|
||||
)
|
||||
async def test_run_process_watcher_respects_notification_mode(
|
||||
monkeypatch, tmp_path, mode, sessions, expected_calls, expected_fragment
|
||||
):
|
||||
import tools.process_registry as pr_module
|
||||
|
||||
monkeypatch.setattr(pr_module, "process_registry", _FakeRegistry(sessions))
|
||||
|
||||
# Patch asyncio.sleep to avoid real delays
|
||||
async def _instant_sleep(*_a, **_kw):
|
||||
pass
|
||||
monkeypatch.setattr(asyncio, "sleep", _instant_sleep)
|
||||
|
||||
runner = _build_runner(monkeypatch, tmp_path, mode)
|
||||
adapter = runner.adapters[Platform.TELEGRAM]
|
||||
|
||||
await runner._run_process_watcher(_watcher_dict())
|
||||
|
||||
assert adapter.send.await_count == expected_calls, (
|
||||
f"mode={mode}: expected {expected_calls} sends, got {adapter.send.await_count}"
|
||||
)
|
||||
if expected_fragment is not None:
|
||||
sent_message = adapter.send.await_args.args[1]
|
||||
assert expected_fragment in sent_message
|
||||
Reference in New Issue
Block a user