"""Shared WebSocket message protocol for the Matrix frontend. Defines all WebSocket message types as an enum and typed dataclasses with ``to_json()`` / ``from_json()`` helpers so every producer and the gateway speak the same language. Message wire format ------------------- .. code-block:: json {"type": "agent_state", "agent_id": "timmy", "data": {...}, "ts": 1234567890} """ import json import logging import time from dataclasses import asdict, dataclass, field from enum import StrEnum from typing import Any logger = logging.getLogger(__name__) class MessageType(StrEnum): """All WebSocket message types defined by the Matrix PROTOCOL.md.""" AGENT_STATE = "agent_state" VISITOR_STATE = "visitor_state" BARK = "bark" THOUGHT = "thought" SYSTEM_STATUS = "system_status" CONNECTION_ACK = "connection_ack" ERROR = "error" TASK_UPDATE = "task_update" MEMORY_FLASH = "memory_flash" # --------------------------------------------------------------------------- # Base message # --------------------------------------------------------------------------- @dataclass class WSMessage: """Base WebSocket message with common envelope fields.""" type: str ts: float = field(default_factory=time.time) def to_json(self) -> str: """Serialise the message to a JSON string.""" return json.dumps(asdict(self)) @classmethod def from_json(cls, raw: str) -> "WSMessage": """Deserialise a JSON string into the correct message subclass. Falls back to the base ``WSMessage`` when the ``type`` field is unrecognised. """ data = json.loads(raw) msg_type = data.get("type") sub = _REGISTRY.get(msg_type) if sub is not None: return sub.from_json(raw) return cls(**data) # --------------------------------------------------------------------------- # Concrete message types # --------------------------------------------------------------------------- @dataclass class AgentStateMessage(WSMessage): """State update for a single agent.""" type: str = field(default=MessageType.AGENT_STATE) agent_id: str = "" data: dict[str, Any] = field(default_factory=dict) @classmethod def from_json(cls, raw: str) -> "AgentStateMessage": payload = json.loads(raw) return cls( type=payload.get("type", MessageType.AGENT_STATE), ts=payload.get("ts", time.time()), agent_id=payload.get("agent_id", ""), data=payload.get("data", {}), ) @dataclass class VisitorStateMessage(WSMessage): """State update for a visitor / user session.""" type: str = field(default=MessageType.VISITOR_STATE) visitor_id: str = "" data: dict[str, Any] = field(default_factory=dict) @classmethod def from_json(cls, raw: str) -> "VisitorStateMessage": payload = json.loads(raw) return cls( type=payload.get("type", MessageType.VISITOR_STATE), ts=payload.get("ts", time.time()), visitor_id=payload.get("visitor_id", ""), data=payload.get("data", {}), ) @dataclass class BarkMessage(WSMessage): """A bark (chat-like utterance) from an agent.""" type: str = field(default=MessageType.BARK) agent_id: str = "" content: str = "" @classmethod def from_json(cls, raw: str) -> "BarkMessage": payload = json.loads(raw) return cls( type=payload.get("type", MessageType.BARK), ts=payload.get("ts", time.time()), agent_id=payload.get("agent_id", ""), content=payload.get("content", ""), ) @dataclass class ThoughtMessage(WSMessage): """An inner thought from an agent.""" type: str = field(default=MessageType.THOUGHT) agent_id: str = "" content: str = "" @classmethod def from_json(cls, raw: str) -> "ThoughtMessage": payload = json.loads(raw) return cls( type=payload.get("type", MessageType.THOUGHT), ts=payload.get("ts", time.time()), agent_id=payload.get("agent_id", ""), content=payload.get("content", ""), ) @dataclass class SystemStatusMessage(WSMessage): """System-wide status broadcast.""" type: str = field(default=MessageType.SYSTEM_STATUS) status: str = "" data: dict[str, Any] = field(default_factory=dict) @classmethod def from_json(cls, raw: str) -> "SystemStatusMessage": payload = json.loads(raw) return cls( type=payload.get("type", MessageType.SYSTEM_STATUS), ts=payload.get("ts", time.time()), status=payload.get("status", ""), data=payload.get("data", {}), ) @dataclass class ConnectionAckMessage(WSMessage): """Acknowledgement sent when a client connects.""" type: str = field(default=MessageType.CONNECTION_ACK) client_id: str = "" @classmethod def from_json(cls, raw: str) -> "ConnectionAckMessage": payload = json.loads(raw) return cls( type=payload.get("type", MessageType.CONNECTION_ACK), ts=payload.get("ts", time.time()), client_id=payload.get("client_id", ""), ) @dataclass class ErrorMessage(WSMessage): """Error message sent to a client.""" type: str = field(default=MessageType.ERROR) code: str = "" message: str = "" @classmethod def from_json(cls, raw: str) -> "ErrorMessage": payload = json.loads(raw) return cls( type=payload.get("type", MessageType.ERROR), ts=payload.get("ts", time.time()), code=payload.get("code", ""), message=payload.get("message", ""), ) @dataclass class TaskUpdateMessage(WSMessage): """Update about a task (created, assigned, completed, etc.).""" type: str = field(default=MessageType.TASK_UPDATE) task_id: str = "" status: str = "" data: dict[str, Any] = field(default_factory=dict) @classmethod def from_json(cls, raw: str) -> "TaskUpdateMessage": payload = json.loads(raw) return cls( type=payload.get("type", MessageType.TASK_UPDATE), ts=payload.get("ts", time.time()), task_id=payload.get("task_id", ""), status=payload.get("status", ""), data=payload.get("data", {}), ) @dataclass class MemoryFlashMessage(WSMessage): """A flash of memory — a recalled or stored memory event.""" type: str = field(default=MessageType.MEMORY_FLASH) agent_id: str = "" memory_key: str = "" content: str = "" @classmethod def from_json(cls, raw: str) -> "MemoryFlashMessage": payload = json.loads(raw) return cls( type=payload.get("type", MessageType.MEMORY_FLASH), ts=payload.get("ts", time.time()), agent_id=payload.get("agent_id", ""), memory_key=payload.get("memory_key", ""), content=payload.get("content", ""), ) # --------------------------------------------------------------------------- # Registry for from_json dispatch # --------------------------------------------------------------------------- _REGISTRY: dict[str, type[WSMessage]] = { MessageType.AGENT_STATE: AgentStateMessage, MessageType.VISITOR_STATE: VisitorStateMessage, MessageType.BARK: BarkMessage, MessageType.THOUGHT: ThoughtMessage, MessageType.SYSTEM_STATUS: SystemStatusMessage, MessageType.CONNECTION_ACK: ConnectionAckMessage, MessageType.ERROR: ErrorMessage, MessageType.TASK_UPDATE: TaskUpdateMessage, MessageType.MEMORY_FLASH: MemoryFlashMessage, }