1
0
This repository has been archived on 2026-03-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
Timmy-time-dashboard/src/timmy/events.py
2026-03-18 19:02:12 -04:00

40 lines
1.3 KiB
Python

"""SensoryEvent — normalized event model for stream adapters.
Every adapter (gitea, time, bitcoin, terminal, etc.) emits SensoryEvents
into the EventBus so that Timmy's cognitive layer sees a uniform stream.
"""
import json
from dataclasses import asdict, dataclass, field
from datetime import UTC, datetime
@dataclass
class SensoryEvent:
"""A single sensory event from an external stream."""
source: str # "gitea", "time", "bitcoin", "terminal"
event_type: str # "push", "issue_opened", "new_block", "morning"
timestamp: datetime = field(default_factory=lambda: datetime.now(UTC))
data: dict = field(default_factory=dict)
actor: str = "" # who caused it (username, "system", etc.)
def to_dict(self) -> dict:
"""Return a JSON-serializable dictionary."""
d = asdict(self)
d["timestamp"] = self.timestamp.isoformat()
return d
def to_json(self) -> str:
"""Return a JSON string."""
return json.dumps(self.to_dict())
@classmethod
def from_dict(cls, data: dict) -> "SensoryEvent":
"""Reconstruct a SensoryEvent from a dictionary."""
data = dict(data) # shallow copy
ts = data.get("timestamp")
if isinstance(ts, str):
data["timestamp"] = datetime.fromisoformat(ts)
return cls(**data)