forked from Rockachopa/Timmy-time-dashboard
feat: SensoryEvent model + SensoryBus dispatcher (#318)
Co-authored-by: Kimi Agent <kimi@timmy.local> Co-committed-by: Kimi Agent <kimi@timmy.local>
This commit is contained in:
111
tests/timmy/test_event_bus.py
Normal file
111
tests/timmy/test_event_bus.py
Normal file
@@ -0,0 +1,111 @@
|
||||
"""Tests for timmy.event_bus — SensoryBus dispatcher."""
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.event_bus import SensoryBus, get_sensory_bus
|
||||
from timmy.events import SensoryEvent
|
||||
|
||||
|
||||
def _make_event(event_type: str = "push", source: str = "gitea") -> SensoryEvent:
|
||||
return SensoryEvent(source=source, event_type=event_type)
|
||||
|
||||
|
||||
class TestSensoryBusEmitReceive:
|
||||
@pytest.mark.asyncio
|
||||
async def test_emit_calls_subscriber(self):
|
||||
bus = SensoryBus()
|
||||
received = []
|
||||
bus.subscribe("push", lambda ev: received.append(ev))
|
||||
|
||||
ev = _make_event("push")
|
||||
count = await bus.emit(ev)
|
||||
|
||||
assert count == 1
|
||||
assert received == [ev]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_emit_async_handler(self):
|
||||
bus = SensoryBus()
|
||||
received = []
|
||||
|
||||
async def handler(ev: SensoryEvent):
|
||||
received.append(ev.event_type)
|
||||
|
||||
bus.subscribe("morning", handler)
|
||||
await bus.emit(_make_event("morning", source="time"))
|
||||
|
||||
assert received == ["morning"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_match_returns_zero(self):
|
||||
bus = SensoryBus()
|
||||
bus.subscribe("push", lambda ev: None)
|
||||
|
||||
count = await bus.emit(_make_event("issue_opened"))
|
||||
assert count == 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_wildcard_subscriber(self):
|
||||
bus = SensoryBus()
|
||||
received = []
|
||||
bus.subscribe("*", lambda ev: received.append(ev.event_type))
|
||||
|
||||
await bus.emit(_make_event("push"))
|
||||
await bus.emit(_make_event("morning"))
|
||||
|
||||
assert received == ["push", "morning"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handler_error_isolated(self):
|
||||
"""A failing handler must not prevent other handlers from running."""
|
||||
bus = SensoryBus()
|
||||
received = []
|
||||
|
||||
def bad_handler(ev: SensoryEvent):
|
||||
raise RuntimeError("boom")
|
||||
|
||||
bus.subscribe("push", bad_handler)
|
||||
bus.subscribe("push", lambda ev: received.append("ok"))
|
||||
|
||||
count = await bus.emit(_make_event("push"))
|
||||
assert count == 2
|
||||
assert received == ["ok"]
|
||||
|
||||
|
||||
class TestSensoryBusRecent:
|
||||
@pytest.mark.asyncio
|
||||
async def test_recent_returns_last_n(self):
|
||||
bus = SensoryBus()
|
||||
for i in range(5):
|
||||
await bus.emit(_make_event(f"ev_{i}"))
|
||||
|
||||
last_3 = bus.recent(3)
|
||||
assert len(last_3) == 3
|
||||
assert [e.event_type for e in last_3] == ["ev_2", "ev_3", "ev_4"]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_recent_default(self):
|
||||
bus = SensoryBus()
|
||||
for i in range(3):
|
||||
await bus.emit(_make_event(f"ev_{i}"))
|
||||
|
||||
assert len(bus.recent()) == 3
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_history_capped(self):
|
||||
bus = SensoryBus(max_history=5)
|
||||
for i in range(10):
|
||||
await bus.emit(_make_event(f"ev_{i}"))
|
||||
|
||||
assert len(bus.recent(100)) == 5
|
||||
|
||||
|
||||
class TestGetSensoryBus:
|
||||
def test_singleton(self):
|
||||
import timmy.event_bus as mod
|
||||
|
||||
mod._bus = None # reset
|
||||
a = get_sensory_bus()
|
||||
b = get_sensory_bus()
|
||||
assert a is b
|
||||
mod._bus = None # cleanup
|
||||
Reference in New Issue
Block a user