This repository has been archived on 2026-03-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
Timmy-time-dashboard/src/infrastructure/events/bus.py

357 lines
12 KiB
Python

"""Async Event Bus for inter-agent communication.
Agents publish and subscribe to events for loose coupling.
Events are typed and carry structured data. Optionally persists
events to SQLite for durability and replay.
"""
import asyncio
import json
import logging
import sqlite3
from collections.abc import Callable, Coroutine, Generator
from contextlib import closing, contextmanager
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
@dataclass
class Event:
"""A typed event in the system."""
type: str # e.g., "agent.task.assigned", "tool.execution.completed"
source: str # Agent or component that emitted the event
data: dict = field(default_factory=dict)
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
id: str = field(default_factory=lambda: f"evt_{datetime.now(UTC).timestamp()}")
# Type alias for event handlers
EventHandler = Callable[[Event], Coroutine[Any, Any, None]]
# Schema for the unified events table
_EVENTS_SCHEMA = """
CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY,
event_type TEXT NOT NULL,
source TEXT DEFAULT '',
task_id TEXT DEFAULT '',
agent_id TEXT DEFAULT '',
data TEXT DEFAULT '{}',
timestamp TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type);
CREATE INDEX IF NOT EXISTS idx_events_source ON events(source);
CREATE INDEX IF NOT EXISTS idx_events_task ON events(task_id);
CREATE INDEX IF NOT EXISTS idx_events_ts ON events(timestamp);
"""
class EventBus:
"""Async event bus for publish/subscribe pattern.
Supports optional SQLite persistence via enable_persistence().
When enabled, all published events are durably stored and can be
replayed via the replay() method.
Usage:
bus = EventBus()
bus.enable_persistence(Path("data/events.db"))
@bus.subscribe("agent.task.*")
async def handle_task(event: Event):
logger.debug(f"Task event: {event.data}")
await bus.publish(Event(
type="agent.task.assigned",
source="default",
data={"task_id": "123", "agent": "forge"}
))
# Replay persisted events
events = bus.replay(event_type="task.created")
"""
def __init__(self) -> None:
self._subscribers: dict[str, list[EventHandler]] = {}
self._history: list[Event] = []
self._max_history = 1000
self._persistence_db_path: Path | None = None
logger.info("EventBus initialized")
# ── Persistence ──────────────────────────────────────────────────────
def enable_persistence(self, db_path: Path) -> None:
"""Enable SQLite persistence for all published events.
Args:
db_path: Path to the SQLite database file.
"""
self._persistence_db_path = db_path
self._init_persistence_db()
logger.info("EventBus persistence enabled at %s", db_path)
def _init_persistence_db(self) -> None:
"""Initialize the persistence database with schema."""
if self._persistence_db_path is None:
return
self._persistence_db_path.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(self._persistence_db_path))) as conn:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
conn.executescript(_EVENTS_SCHEMA)
conn.commit()
@contextmanager
def _get_persistence_conn(self) -> Generator[sqlite3.Connection | None, None, None]:
"""Get a connection to the persistence database."""
if self._persistence_db_path is None:
yield None
return
with closing(sqlite3.connect(str(self._persistence_db_path))) as conn:
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA busy_timeout=5000")
yield conn
def _persist_event(self, event: Event) -> None:
"""Write an event to the persistence database."""
with self._get_persistence_conn() as conn:
if conn is None:
return
try:
task_id = event.data.get("task_id", "")
agent_id = event.data.get("agent_id", "")
conn.execute(
"INSERT OR IGNORE INTO events "
"(id, event_type, source, task_id, agent_id, data, timestamp) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
(
event.id,
event.type,
event.source,
task_id,
agent_id,
json.dumps(event.data),
event.timestamp,
),
)
conn.commit()
except Exception as exc:
logger.debug("Failed to persist event: %s", exc)
# ── Replay ───────────────────────────────────────────────────────────
def replay(
self,
event_type: str | None = None,
source: str | None = None,
task_id: str | None = None,
limit: int = 100,
) -> list[Event]:
"""Replay persisted events from SQLite with optional filters.
Args:
event_type: Filter by exact event type.
source: Filter by event source.
task_id: Filter by task_id.
limit: Max events to return (most recent first).
Returns:
List of Event objects from persistent storage.
"""
with self._get_persistence_conn() as conn:
if conn is None:
return []
try:
conditions = []
params: list = []
if event_type:
conditions.append("event_type = ?")
params.append(event_type)
if source:
conditions.append("source = ?")
params.append(source)
if task_id:
conditions.append("task_id = ?")
params.append(task_id)
where = " AND ".join(conditions) if conditions else "1=1"
sql = f"SELECT * FROM events WHERE {where} ORDER BY timestamp DESC LIMIT ?"
params.append(limit)
rows = conn.execute(sql, params).fetchall()
return [
Event(
id=row["id"],
type=row["event_type"],
source=row["source"],
data=json.loads(row["data"]) if row["data"] else {},
timestamp=row["timestamp"],
)
for row in rows
]
except Exception as exc:
logger.debug("Failed to replay events: %s", exc)
return []
# ── Subscribe / Publish ──────────────────────────────────────────────
def subscribe(self, event_pattern: str) -> Callable[[EventHandler], EventHandler]:
"""Decorator to subscribe to events matching a pattern.
Patterns support wildcards:
- "agent.task.assigned" — exact match
- "agent.task.*" — any task event
- "agent.*" — any agent event
- "*" — all events
"""
def decorator(handler: EventHandler) -> EventHandler:
if event_pattern not in self._subscribers:
self._subscribers[event_pattern] = []
self._subscribers[event_pattern].append(handler)
logger.debug("Subscribed handler to '%s'", event_pattern)
return handler
return decorator
def unsubscribe(self, event_pattern: str, handler: EventHandler) -> bool:
"""Remove a handler from a subscription."""
if event_pattern not in self._subscribers:
return False
if handler in self._subscribers[event_pattern]:
self._subscribers[event_pattern].remove(handler)
logger.debug("Unsubscribed handler from '%s'", event_pattern)
return True
return False
async def publish(self, event: Event) -> int:
"""Publish an event to all matching subscribers.
If persistence is enabled, the event is also written to SQLite.
Returns:
Number of handlers invoked
"""
# Persist to SQLite (graceful — never crash on persistence failure)
self._persist_event(event)
# Store in in-memory history
self._history.append(event)
if len(self._history) > self._max_history:
self._history = self._history[-self._max_history :]
# Find matching handlers
handlers: list[EventHandler] = []
for pattern, pattern_handlers in self._subscribers.items():
if self._match_pattern(event.type, pattern):
handlers.extend(pattern_handlers)
# Invoke handlers concurrently
if handlers:
await asyncio.gather(
*[self._invoke_handler(h, event) for h in handlers],
return_exceptions=True,
)
logger.debug("Published event '%s' to %d handlers", event.type, len(handlers))
return len(handlers)
async def _invoke_handler(self, handler: EventHandler, event: Event) -> None:
"""Invoke a handler with error handling."""
try:
await handler(event)
except Exception as exc:
logger.error("Event handler failed for '%s': %s", event.type, exc)
def _match_pattern(self, event_type: str, pattern: str) -> bool:
"""Check if event type matches a wildcard pattern."""
if pattern == "*":
return True
if pattern.endswith(".*"):
prefix = pattern[:-2]
return event_type.startswith(prefix + ".")
return event_type == pattern
def get_history(
self,
event_type: str | None = None,
source: str | None = None,
limit: int = 100,
) -> list[Event]:
"""Get recent event history with optional filtering."""
events = self._history
if event_type:
events = [e for e in events if e.type == event_type]
if source:
events = [e for e in events if e.source == source]
return events[-limit:]
def clear_history(self) -> None:
"""Clear in-memory event history."""
self._history.clear()
# ── Lazy singleton ────────────────────────────────────────────────────────────
_event_bus: EventBus | None = None
def get_event_bus() -> EventBus:
"""Return the module-level EventBus, creating it on first access."""
global _event_bus
if _event_bus is None:
_event_bus = EventBus()
return _event_bus
def init_event_bus_persistence(db_path: Path | None = None) -> None:
"""Enable persistence on the module-level EventBus singleton.
Call this during app startup to enable durable event storage.
If db_path is not provided, uses `data/events.db`.
"""
bus = get_event_bus()
if bus._persistence_db_path is not None:
return # already initialized
path = db_path or Path("data/events.db")
bus.enable_persistence(path)
# Convenience functions
async def emit(event_type: str, source: str, data: dict) -> int:
"""Quick emit an event."""
return await get_event_bus().publish(
Event(
type=event_type,
source=source,
data=data,
)
)
def on(event_pattern: str) -> Callable[[EventHandler], EventHandler]:
"""Quick subscribe decorator."""
return get_event_bus().subscribe(event_pattern)
def __getattr__(name: str):
"""Module-level __getattr__ for lazy backward-compatible access to event_bus."""
if name == "event_bus":
return get_event_bus()
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")