From 9a21a4b0ff5a652a8cb702b741177c703e856bfd Mon Sep 17 00:00:00 2001 From: Kimi Agent Date: Wed, 18 Mar 2026 19:02:12 -0400 Subject: [PATCH] feat: SensoryEvent model + SensoryBus dispatcher (#318) Co-authored-by: Kimi Agent Co-committed-by: Kimi Agent --- src/timmy/event_bus.py | 79 ++++++++++++++++++++++++ src/timmy/events.py | 39 ++++++++++++ tests/timmy/test_event_bus.py | 111 ++++++++++++++++++++++++++++++++++ tests/timmy/test_events.py | 64 ++++++++++++++++++++ 4 files changed, 293 insertions(+) create mode 100644 src/timmy/event_bus.py create mode 100644 src/timmy/events.py create mode 100644 tests/timmy/test_event_bus.py create mode 100644 tests/timmy/test_events.py diff --git a/src/timmy/event_bus.py b/src/timmy/event_bus.py new file mode 100644 index 0000000..fa49b12 --- /dev/null +++ b/src/timmy/event_bus.py @@ -0,0 +1,79 @@ +"""Sensory EventBus — simple pub/sub for SensoryEvents. + +Thin facade over the infrastructure EventBus that speaks in +SensoryEvent objects instead of raw infrastructure Events. +""" + +import asyncio +import logging +from collections.abc import Awaitable, Callable + +from timmy.events import SensoryEvent + +logger = logging.getLogger(__name__) + +# Handler: sync or async callable that receives a SensoryEvent +SensoryHandler = Callable[[SensoryEvent], None | Awaitable[None]] + + +class SensoryBus: + """Pub/sub dispatcher for SensoryEvents.""" + + def __init__(self, max_history: int = 500) -> None: + self._subscribers: dict[str, list[SensoryHandler]] = {} + self._history: list[SensoryEvent] = [] + self._max_history = max_history + + # ── Public API ──────────────────────────────────────────────────────── + + async def emit(self, event: SensoryEvent) -> int: + """Push *event* to all subscribers whose event_type filter matches. + + Returns the number of handlers invoked. + """ + self._history.append(event) + if len(self._history) > self._max_history: + self._history = self._history[-self._max_history :] + + handlers = self._matching_handlers(event.event_type) + for h in handlers: + try: + result = h(event) + if asyncio.iscoroutine(result): + await result + except Exception as exc: + logger.error("SensoryBus handler error for '%s': %s", event.event_type, exc) + + return len(handlers) + + def subscribe(self, event_type: str, callback: SensoryHandler) -> None: + """Register *callback* for events matching *event_type*. + + Use ``"*"`` to subscribe to all event types. + """ + self._subscribers.setdefault(event_type, []).append(callback) + + def recent(self, n: int = 10) -> list[SensoryEvent]: + """Return the last *n* events (most recent last).""" + return self._history[-n:] + + # ── Internals ───────────────────────────────────────────────────────── + + def _matching_handlers(self, event_type: str) -> list[SensoryHandler]: + handlers: list[SensoryHandler] = [] + for pattern, cbs in self._subscribers.items(): + if pattern == "*" or pattern == event_type: + handlers.extend(cbs) + return handlers + + +# ── Module-level singleton ──────────────────────────────────────────────────── +_bus: SensoryBus | None = None + + +def get_sensory_bus() -> SensoryBus: + """Return the module-level SensoryBus singleton.""" + global _bus + if _bus is None: + _bus = SensoryBus() + return _bus diff --git a/src/timmy/events.py b/src/timmy/events.py new file mode 100644 index 0000000..3cd6d71 --- /dev/null +++ b/src/timmy/events.py @@ -0,0 +1,39 @@ +"""SensoryEvent — normalized event model for stream adapters. + +Every adapter (gitea, time, bitcoin, terminal, etc.) emits SensoryEvents +into the EventBus so that Timmy's cognitive layer sees a uniform stream. +""" + +import json +from dataclasses import asdict, dataclass, field +from datetime import UTC, datetime + + +@dataclass +class SensoryEvent: + """A single sensory event from an external stream.""" + + source: str # "gitea", "time", "bitcoin", "terminal" + event_type: str # "push", "issue_opened", "new_block", "morning" + timestamp: datetime = field(default_factory=lambda: datetime.now(UTC)) + data: dict = field(default_factory=dict) + actor: str = "" # who caused it (username, "system", etc.) + + def to_dict(self) -> dict: + """Return a JSON-serializable dictionary.""" + d = asdict(self) + d["timestamp"] = self.timestamp.isoformat() + return d + + def to_json(self) -> str: + """Return a JSON string.""" + return json.dumps(self.to_dict()) + + @classmethod + def from_dict(cls, data: dict) -> "SensoryEvent": + """Reconstruct a SensoryEvent from a dictionary.""" + data = dict(data) # shallow copy + ts = data.get("timestamp") + if isinstance(ts, str): + data["timestamp"] = datetime.fromisoformat(ts) + return cls(**data) diff --git a/tests/timmy/test_event_bus.py b/tests/timmy/test_event_bus.py new file mode 100644 index 0000000..ef8f734 --- /dev/null +++ b/tests/timmy/test_event_bus.py @@ -0,0 +1,111 @@ +"""Tests for timmy.event_bus — SensoryBus dispatcher.""" + +import pytest + +from timmy.event_bus import SensoryBus, get_sensory_bus +from timmy.events import SensoryEvent + + +def _make_event(event_type: str = "push", source: str = "gitea") -> SensoryEvent: + return SensoryEvent(source=source, event_type=event_type) + + +class TestSensoryBusEmitReceive: + @pytest.mark.asyncio + async def test_emit_calls_subscriber(self): + bus = SensoryBus() + received = [] + bus.subscribe("push", lambda ev: received.append(ev)) + + ev = _make_event("push") + count = await bus.emit(ev) + + assert count == 1 + assert received == [ev] + + @pytest.mark.asyncio + async def test_emit_async_handler(self): + bus = SensoryBus() + received = [] + + async def handler(ev: SensoryEvent): + received.append(ev.event_type) + + bus.subscribe("morning", handler) + await bus.emit(_make_event("morning", source="time")) + + assert received == ["morning"] + + @pytest.mark.asyncio + async def test_no_match_returns_zero(self): + bus = SensoryBus() + bus.subscribe("push", lambda ev: None) + + count = await bus.emit(_make_event("issue_opened")) + assert count == 0 + + @pytest.mark.asyncio + async def test_wildcard_subscriber(self): + bus = SensoryBus() + received = [] + bus.subscribe("*", lambda ev: received.append(ev.event_type)) + + await bus.emit(_make_event("push")) + await bus.emit(_make_event("morning")) + + assert received == ["push", "morning"] + + @pytest.mark.asyncio + async def test_handler_error_isolated(self): + """A failing handler must not prevent other handlers from running.""" + bus = SensoryBus() + received = [] + + def bad_handler(ev: SensoryEvent): + raise RuntimeError("boom") + + bus.subscribe("push", bad_handler) + bus.subscribe("push", lambda ev: received.append("ok")) + + count = await bus.emit(_make_event("push")) + assert count == 2 + assert received == ["ok"] + + +class TestSensoryBusRecent: + @pytest.mark.asyncio + async def test_recent_returns_last_n(self): + bus = SensoryBus() + for i in range(5): + await bus.emit(_make_event(f"ev_{i}")) + + last_3 = bus.recent(3) + assert len(last_3) == 3 + assert [e.event_type for e in last_3] == ["ev_2", "ev_3", "ev_4"] + + @pytest.mark.asyncio + async def test_recent_default(self): + bus = SensoryBus() + for i in range(3): + await bus.emit(_make_event(f"ev_{i}")) + + assert len(bus.recent()) == 3 + + @pytest.mark.asyncio + async def test_history_capped(self): + bus = SensoryBus(max_history=5) + for i in range(10): + await bus.emit(_make_event(f"ev_{i}")) + + assert len(bus.recent(100)) == 5 + + +class TestGetSensoryBus: + def test_singleton(self): + import timmy.event_bus as mod + + mod._bus = None # reset + a = get_sensory_bus() + b = get_sensory_bus() + assert a is b + mod._bus = None # cleanup diff --git a/tests/timmy/test_events.py b/tests/timmy/test_events.py new file mode 100644 index 0000000..b5ccb59 --- /dev/null +++ b/tests/timmy/test_events.py @@ -0,0 +1,64 @@ +"""Tests for timmy.events — SensoryEvent model.""" + +import json +from datetime import UTC, datetime + +from timmy.events import SensoryEvent + + +class TestSensoryEvent: + def test_defaults(self): + ev = SensoryEvent(source="gitea", event_type="push") + assert ev.source == "gitea" + assert ev.event_type == "push" + assert ev.actor == "" + assert ev.data == {} + assert isinstance(ev.timestamp, datetime) + + def test_custom_fields(self): + ts = datetime(2025, 1, 1, tzinfo=UTC) + ev = SensoryEvent( + source="bitcoin", + event_type="new_block", + timestamp=ts, + data={"height": 900_000}, + actor="network", + ) + assert ev.data["height"] == 900_000 + assert ev.actor == "network" + assert ev.timestamp == ts + + def test_to_dict(self): + ev = SensoryEvent(source="time", event_type="morning") + d = ev.to_dict() + assert d["source"] == "time" + assert d["event_type"] == "morning" + assert isinstance(d["timestamp"], str) + + def test_to_json(self): + ev = SensoryEvent(source="terminal", event_type="command", data={"cmd": "ls"}) + raw = ev.to_json() + parsed = json.loads(raw) + assert parsed["source"] == "terminal" + assert parsed["data"]["cmd"] == "ls" + + def test_from_dict_roundtrip(self): + ev = SensoryEvent( + source="gitea", + event_type="issue_opened", + data={"number": 42}, + actor="alice", + ) + d = ev.to_dict() + restored = SensoryEvent.from_dict(d) + assert restored.source == ev.source + assert restored.event_type == ev.event_type + assert restored.data == ev.data + assert restored.actor == ev.actor + + def test_json_serializable(self): + """SensoryEvent must be JSON-serializable (acceptance criterion).""" + ev = SensoryEvent(source="gitea", event_type="push", data={"ref": "main"}) + raw = ev.to_json() + parsed = json.loads(raw) + assert parsed["source"] == "gitea"