forked from Rockachopa/Timmy-time-dashboard
committed by
GitHub
parent
11ba21418a
commit
82fb2417e3
@@ -1,14 +1,18 @@
|
||||
"""Async Event Bus for inter-agent communication.
|
||||
|
||||
Agents publish and subscribe to events for loose coupling.
|
||||
Events are typed and carry structured data.
|
||||
Events are typed and carry structured data. Optionally persists
|
||||
events to SQLite for durability and replay.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import sqlite3
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Callable, Coroutine
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Coroutine, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -27,32 +31,181 @@ class Event:
|
||||
# Type alias for event handlers
|
||||
EventHandler = Callable[[Event], Coroutine[Any, Any, None]]
|
||||
|
||||
# Schema for the unified events table
|
||||
_EVENTS_SCHEMA = """
|
||||
CREATE TABLE IF NOT EXISTS events (
|
||||
id TEXT PRIMARY KEY,
|
||||
event_type TEXT NOT NULL,
|
||||
source TEXT DEFAULT '',
|
||||
task_id TEXT DEFAULT '',
|
||||
agent_id TEXT DEFAULT '',
|
||||
data TEXT DEFAULT '{}',
|
||||
timestamp TEXT NOT NULL
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_events_type ON events(event_type);
|
||||
CREATE INDEX IF NOT EXISTS idx_events_source ON events(source);
|
||||
CREATE INDEX IF NOT EXISTS idx_events_task ON events(task_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_events_ts ON events(timestamp);
|
||||
"""
|
||||
|
||||
|
||||
class EventBus:
|
||||
"""Async event bus for publish/subscribe pattern.
|
||||
|
||||
Supports optional SQLite persistence via enable_persistence().
|
||||
When enabled, all published events are durably stored and can be
|
||||
replayed via the replay() method.
|
||||
|
||||
Usage:
|
||||
bus = EventBus()
|
||||
bus.enable_persistence(Path("data/events.db"))
|
||||
|
||||
# Subscribe to events
|
||||
@bus.subscribe("agent.task.*")
|
||||
async def handle_task(event: Event):
|
||||
print(f"Task event: {event.data}")
|
||||
|
||||
# Publish events
|
||||
await bus.publish(Event(
|
||||
type="agent.task.assigned",
|
||||
source="default",
|
||||
data={"task_id": "123", "agent": "forge"}
|
||||
))
|
||||
|
||||
# Replay persisted events
|
||||
events = bus.replay(event_type="task.created")
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._subscribers: dict[str, list[EventHandler]] = {}
|
||||
self._history: list[Event] = []
|
||||
self._max_history = 1000
|
||||
self._persistence_db_path: Optional[Path] = None
|
||||
logger.info("EventBus initialized")
|
||||
|
||||
# ── Persistence ──────────────────────────────────────────────────────
|
||||
|
||||
def enable_persistence(self, db_path: Path) -> None:
|
||||
"""Enable SQLite persistence for all published events.
|
||||
|
||||
Args:
|
||||
db_path: Path to the SQLite database file.
|
||||
"""
|
||||
self._persistence_db_path = db_path
|
||||
self._init_persistence_db()
|
||||
logger.info("EventBus persistence enabled at %s", db_path)
|
||||
|
||||
def _init_persistence_db(self) -> None:
|
||||
"""Initialize the persistence database with schema."""
|
||||
if self._persistence_db_path is None:
|
||||
return
|
||||
self._persistence_db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
conn = sqlite3.connect(str(self._persistence_db_path))
|
||||
try:
|
||||
conn.execute("PRAGMA journal_mode=WAL")
|
||||
conn.execute("PRAGMA busy_timeout=5000")
|
||||
conn.executescript(_EVENTS_SCHEMA)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def _get_persistence_conn(self) -> Optional[sqlite3.Connection]:
|
||||
"""Get a connection to the persistence database."""
|
||||
if self._persistence_db_path is None:
|
||||
return None
|
||||
conn = sqlite3.connect(str(self._persistence_db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.execute("PRAGMA busy_timeout=5000")
|
||||
return conn
|
||||
|
||||
def _persist_event(self, event: Event) -> None:
|
||||
"""Write an event to the persistence database."""
|
||||
conn = self._get_persistence_conn()
|
||||
if conn is None:
|
||||
return
|
||||
try:
|
||||
task_id = event.data.get("task_id", "")
|
||||
agent_id = event.data.get("agent_id", "")
|
||||
conn.execute(
|
||||
"INSERT OR IGNORE INTO events "
|
||||
"(id, event_type, source, task_id, agent_id, data, timestamp) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
(
|
||||
event.id,
|
||||
event.type,
|
||||
event.source,
|
||||
task_id,
|
||||
agent_id,
|
||||
json.dumps(event.data),
|
||||
event.timestamp,
|
||||
),
|
||||
)
|
||||
conn.commit()
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to persist event: %s", exc)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
# ── Replay ───────────────────────────────────────────────────────────
|
||||
|
||||
def replay(
|
||||
self,
|
||||
event_type: Optional[str] = None,
|
||||
source: Optional[str] = None,
|
||||
task_id: Optional[str] = None,
|
||||
limit: int = 100,
|
||||
) -> list[Event]:
|
||||
"""Replay persisted events from SQLite with optional filters.
|
||||
|
||||
Args:
|
||||
event_type: Filter by exact event type.
|
||||
source: Filter by event source.
|
||||
task_id: Filter by task_id.
|
||||
limit: Max events to return (most recent first).
|
||||
|
||||
Returns:
|
||||
List of Event objects from persistent storage.
|
||||
"""
|
||||
conn = self._get_persistence_conn()
|
||||
if conn is None:
|
||||
return []
|
||||
|
||||
try:
|
||||
conditions = []
|
||||
params: list = []
|
||||
|
||||
if event_type:
|
||||
conditions.append("event_type = ?")
|
||||
params.append(event_type)
|
||||
if source:
|
||||
conditions.append("source = ?")
|
||||
params.append(source)
|
||||
if task_id:
|
||||
conditions.append("task_id = ?")
|
||||
params.append(task_id)
|
||||
|
||||
where = " AND ".join(conditions) if conditions else "1=1"
|
||||
sql = f"SELECT * FROM events WHERE {where} ORDER BY timestamp DESC LIMIT ?"
|
||||
params.append(limit)
|
||||
|
||||
rows = conn.execute(sql, params).fetchall()
|
||||
|
||||
return [
|
||||
Event(
|
||||
id=row["id"],
|
||||
type=row["event_type"],
|
||||
source=row["source"],
|
||||
data=json.loads(row["data"]) if row["data"] else {},
|
||||
timestamp=row["timestamp"],
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
except Exception as exc:
|
||||
logger.debug("Failed to replay events: %s", exc)
|
||||
return []
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
# ── Subscribe / Publish ──────────────────────────────────────────────
|
||||
|
||||
def subscribe(self, event_pattern: str) -> Callable[[EventHandler], EventHandler]:
|
||||
"""Decorator to subscribe to events matching a pattern.
|
||||
|
||||
@@ -87,10 +240,15 @@ class EventBus:
|
||||
async def publish(self, event: Event) -> int:
|
||||
"""Publish an event to all matching subscribers.
|
||||
|
||||
If persistence is enabled, the event is also written to SQLite.
|
||||
|
||||
Returns:
|
||||
Number of handlers invoked
|
||||
"""
|
||||
# Store in history
|
||||
# Persist to SQLite (graceful — never crash on persistence failure)
|
||||
self._persist_event(event)
|
||||
|
||||
# Store in in-memory history
|
||||
self._history.append(event)
|
||||
if len(self._history) > self._max_history:
|
||||
self._history = self._history[-self._max_history :]
|
||||
@@ -105,7 +263,8 @@ class EventBus:
|
||||
# Invoke handlers concurrently
|
||||
if handlers:
|
||||
await asyncio.gather(
|
||||
*[self._invoke_handler(h, event) for h in handlers], return_exceptions=True
|
||||
*[self._invoke_handler(h, event) for h in handlers],
|
||||
return_exceptions=True,
|
||||
)
|
||||
|
||||
logger.debug("Published event '%s' to %d handlers", event.type, len(handlers))
|
||||
@@ -147,18 +306,39 @@ class EventBus:
|
||||
return events[-limit:]
|
||||
|
||||
def clear_history(self) -> None:
|
||||
"""Clear event history."""
|
||||
"""Clear in-memory event history."""
|
||||
self._history.clear()
|
||||
|
||||
|
||||
# Module-level singleton
|
||||
event_bus = EventBus()
|
||||
# ── Lazy singleton ────────────────────────────────────────────────────────────
|
||||
_event_bus: EventBus | None = None
|
||||
|
||||
|
||||
def get_event_bus() -> EventBus:
|
||||
"""Return the module-level EventBus, creating it on first access."""
|
||||
global _event_bus
|
||||
if _event_bus is None:
|
||||
_event_bus = EventBus()
|
||||
return _event_bus
|
||||
|
||||
|
||||
def init_event_bus_persistence(db_path: Optional[Path] = None) -> None:
|
||||
"""Enable persistence on the module-level EventBus singleton.
|
||||
|
||||
Call this during app startup to enable durable event storage.
|
||||
If db_path is not provided, uses `data/events.db`.
|
||||
"""
|
||||
bus = get_event_bus()
|
||||
if bus._persistence_db_path is not None:
|
||||
return # already initialized
|
||||
path = db_path or Path("data/events.db")
|
||||
bus.enable_persistence(path)
|
||||
|
||||
|
||||
# Convenience functions
|
||||
async def emit(event_type: str, source: str, data: dict) -> int:
|
||||
"""Quick emit an event."""
|
||||
return await event_bus.publish(
|
||||
return await get_event_bus().publish(
|
||||
Event(
|
||||
type=event_type,
|
||||
source=source,
|
||||
@@ -169,4 +349,11 @@ async def emit(event_type: str, source: str, data: dict) -> int:
|
||||
|
||||
def on(event_pattern: str) -> Callable[[EventHandler], EventHandler]:
|
||||
"""Quick subscribe decorator."""
|
||||
return event_bus.subscribe(event_pattern)
|
||||
return get_event_bus().subscribe(event_pattern)
|
||||
|
||||
|
||||
def __getattr__(name: str):
|
||||
"""Module-level __getattr__ for lazy backward-compatible access to event_bus."""
|
||||
if name == "event_bus":
|
||||
return get_event_bus()
|
||||
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
|
||||
|
||||
Reference in New Issue
Block a user