From 8aa72f8a21f631343dc5be4f6b9070503abc8f94 Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Tue, 24 Mar 2026 14:54:21 -0400 Subject: [PATCH] refactor: split world.py into focused submodules (#1360) --- src/dashboard/routes/world.py | 1065 --------------------- src/dashboard/routes/world/__init__.py | 124 +++ src/dashboard/routes/world/bark.py | 212 ++++ src/dashboard/routes/world/commitments.py | 77 ++ src/dashboard/routes/world/matrix.py | 397 ++++++++ src/dashboard/routes/world/state.py | 75 ++ src/dashboard/routes/world/utils.py | 85 ++ src/dashboard/routes/world/websocket.py | 160 ++++ 8 files changed, 1130 insertions(+), 1065 deletions(-) delete mode 100644 src/dashboard/routes/world.py create mode 100644 src/dashboard/routes/world/__init__.py create mode 100644 src/dashboard/routes/world/bark.py create mode 100644 src/dashboard/routes/world/commitments.py create mode 100644 src/dashboard/routes/world/matrix.py create mode 100644 src/dashboard/routes/world/state.py create mode 100644 src/dashboard/routes/world/utils.py create mode 100644 src/dashboard/routes/world/websocket.py diff --git a/src/dashboard/routes/world.py b/src/dashboard/routes/world.py deleted file mode 100644 index d043eb3e..00000000 --- a/src/dashboard/routes/world.py +++ /dev/null @@ -1,1065 +0,0 @@ -"""Workshop world state API and WebSocket relay. - -Serves Timmy's current presence state to the Workshop 3D renderer. -The primary consumer is the browser on first load — before any -WebSocket events arrive, the client needs a full state snapshot. - -The ``/ws/world`` endpoint streams ``timmy_state`` messages whenever -the heartbeat detects a state change. It also accepts ``visitor_message`` -frames from the 3D client and responds with ``timmy_speech`` barks. - -Source of truth: ``~/.timmy/presence.json`` written by -:class:`~timmy.workshop_state.WorkshopHeartbeat`. -Falls back to a live ``get_state_dict()`` call if the file is stale -or missing. -""" - -import asyncio -import json -import logging -import math -import re -import time -from collections import deque -from datetime import UTC, datetime -from pathlib import Path -from typing import Any - -import yaml -from fastapi import APIRouter, Request, WebSocket -from fastapi.responses import JSONResponse -from pydantic import BaseModel - -from config import settings -from infrastructure.presence import produce_bark, serialize_presence -from timmy.memory_system import search_memories -from timmy.workshop_state import PRESENCE_FILE - -logger = logging.getLogger(__name__) - -router = APIRouter(prefix="/api/world", tags=["world"]) -matrix_router = APIRouter(prefix="/api/matrix", tags=["matrix"]) - -# --------------------------------------------------------------------------- -# Matrix Bark Endpoint — HTTP fallback for bark messages -# --------------------------------------------------------------------------- - -# Rate limiting: 1 request per 3 seconds per visitor_id -_BARK_RATE_LIMIT_SECONDS = 3 -_bark_last_request: dict[str, float] = {} - - -class BarkRequest(BaseModel): - """Request body for POST /api/matrix/bark.""" - - text: str - visitor_id: str - - -@matrix_router.post("/bark") -async def post_matrix_bark(request: BarkRequest) -> JSONResponse: - """Generate a bark response for a visitor message. - - HTTP fallback for when WebSocket isn't available. The Matrix frontend - can POST a message and get Timmy's bark response back as JSON. - - Rate limited to 1 request per 3 seconds per visitor_id. - - Request body: - - text: The visitor's message text - - visitor_id: Unique identifier for the visitor (used for rate limiting) - - Returns: - - 200: Bark message in produce_bark() format - - 429: Rate limit exceeded (try again later) - - 422: Invalid request (missing/invalid fields) - """ - # Validate inputs - text = request.text.strip() if request.text else "" - visitor_id = request.visitor_id.strip() if request.visitor_id else "" - - if not text: - return JSONResponse( - status_code=422, - content={"error": "text is required"}, - ) - - if not visitor_id: - return JSONResponse( - status_code=422, - content={"error": "visitor_id is required"}, - ) - - # Rate limiting check - now = time.time() - last_request = _bark_last_request.get(visitor_id, 0) - time_since_last = now - last_request - - if time_since_last < _BARK_RATE_LIMIT_SECONDS: - retry_after = _BARK_RATE_LIMIT_SECONDS - time_since_last - return JSONResponse( - status_code=429, - content={"error": "Rate limit exceeded. Try again later."}, - headers={"Retry-After": str(int(retry_after) + 1)}, - ) - - # Record this request - _bark_last_request[visitor_id] = now - - # Generate bark response - try: - reply = await _generate_bark(text) - except Exception as exc: - logger.warning("Bark generation failed: %s", exc) - reply = "Hmm, my thoughts are a bit tangled right now." - - # Build bark response using produce_bark format - bark = produce_bark(agent_id="timmy", text=reply, style="speech") - - return JSONResponse( - content=bark, - headers={"Cache-Control": "no-cache, no-store"}, - ) - - -# --------------------------------------------------------------------------- -# Matrix Agent Registry — serves agents to the Matrix visualization -# --------------------------------------------------------------------------- - -# Agent color mapping — consistent with Matrix visual identity -_AGENT_COLORS: dict[str, str] = { - "timmy": "#FFD700", # Gold - "orchestrator": "#FFD700", # Gold - "perplexity": "#3B82F6", # Blue - "replit": "#F97316", # Orange - "kimi": "#06B6D4", # Cyan - "claude": "#A855F7", # Purple - "researcher": "#10B981", # Emerald - "coder": "#EF4444", # Red - "writer": "#EC4899", # Pink - "memory": "#8B5CF6", # Violet - "experimenter": "#14B8A6", # Teal - "forge": "#EF4444", # Red (coder alias) - "seer": "#10B981", # Emerald (researcher alias) - "quill": "#EC4899", # Pink (writer alias) - "echo": "#8B5CF6", # Violet (memory alias) - "lab": "#14B8A6", # Teal (experimenter alias) -} - -# Agent shape mapping for 3D visualization -_AGENT_SHAPES: dict[str, str] = { - "timmy": "sphere", - "orchestrator": "sphere", - "perplexity": "cube", - "replit": "cylinder", - "kimi": "dodecahedron", - "claude": "octahedron", - "researcher": "icosahedron", - "coder": "cube", - "writer": "cone", - "memory": "torus", - "experimenter": "tetrahedron", - "forge": "cube", - "seer": "icosahedron", - "quill": "cone", - "echo": "torus", - "lab": "tetrahedron", -} - -# Default fallback values -_DEFAULT_COLOR = "#9CA3AF" # Gray -_DEFAULT_SHAPE = "sphere" -_DEFAULT_STATUS = "available" - - -def _get_agent_color(agent_id: str) -> str: - """Get the Matrix color for an agent.""" - return _AGENT_COLORS.get(agent_id.lower(), _DEFAULT_COLOR) - - -def _get_agent_shape(agent_id: str) -> str: - """Get the Matrix shape for an agent.""" - return _AGENT_SHAPES.get(agent_id.lower(), _DEFAULT_SHAPE) - - -def _compute_circular_positions(count: int, radius: float = 3.0) -> list[dict[str, float]]: - """Compute circular positions for agents in the Matrix. - - Agents are arranged in a circle on the XZ plane at y=0. - """ - positions = [] - for i in range(count): - angle = (2 * math.pi * i) / count - x = radius * math.cos(angle) - z = radius * math.sin(angle) - positions.append({"x": round(x, 2), "y": 0.0, "z": round(z, 2)}) - return positions - - -def _build_matrix_agents_response() -> list[dict[str, Any]]: - """Build the Matrix agent registry response. - - Reads from agents.yaml and returns agents with Matrix-compatible - formatting including colors, shapes, and positions. - """ - try: - from timmy.agents.loader import list_agents - - agents = list_agents() - if not agents: - return [] - - positions = _compute_circular_positions(len(agents)) - - result = [] - for i, agent in enumerate(agents): - agent_id = agent.get("id", "") - result.append( - { - "id": agent_id, - "display_name": agent.get("name", agent_id.title()), - "role": agent.get("role", "general"), - "color": _get_agent_color(agent_id), - "position": positions[i], - "shape": _get_agent_shape(agent_id), - "status": agent.get("status", _DEFAULT_STATUS), - } - ) - - return result - except Exception as exc: - logger.warning("Failed to load agents for Matrix: %s", exc) - return [] - - -logger = logging.getLogger(__name__) - -router = APIRouter(prefix="/api/world", tags=["world"]) - -# --------------------------------------------------------------------------- -# WebSocket relay for live state changes -# --------------------------------------------------------------------------- - -_ws_clients: list[WebSocket] = [] - -_STALE_THRESHOLD = 90 # seconds — file older than this triggers live rebuild - -# Recent conversation buffer — kept in memory for the Workshop overlay. -# Stores the last _MAX_EXCHANGES (visitor_text, timmy_text) pairs. -_MAX_EXCHANGES = 3 -_conversation: deque[dict] = deque(maxlen=_MAX_EXCHANGES) - -_WORKSHOP_SESSION_ID = "workshop" - -_HEARTBEAT_INTERVAL = 15 # seconds — ping to detect dead iPad/Safari connections - -# --------------------------------------------------------------------------- -# Conversation grounding — commitment tracking (rescued from PR #408) -# --------------------------------------------------------------------------- - -# Patterns that indicate Timmy is committing to an action. -_COMMITMENT_PATTERNS: list[re.Pattern[str]] = [ - re.compile(r"I'll (.+?)(?:\.|!|\?|$)", re.IGNORECASE), - re.compile(r"I will (.+?)(?:\.|!|\?|$)", re.IGNORECASE), - re.compile(r"[Ll]et me (.+?)(?:\.|!|\?|$)", re.IGNORECASE), -] - -# After this many messages without follow-up, surface open commitments. -_REMIND_AFTER = 5 -_MAX_COMMITMENTS = 10 - -# In-memory list of open commitments. -# Each entry: {"text": str, "created_at": float, "messages_since": int} -_commitments: list[dict] = [] - - -def _extract_commitments(text: str) -> list[str]: - """Pull commitment phrases from Timmy's reply text.""" - found: list[str] = [] - for pattern in _COMMITMENT_PATTERNS: - for match in pattern.finditer(text): - phrase = match.group(1).strip() - if len(phrase) > 5: # skip trivially short matches - found.append(phrase[:120]) - return found - - -def _record_commitments(reply: str) -> None: - """Scan a Timmy reply for commitments and store them.""" - for phrase in _extract_commitments(reply): - # Avoid near-duplicate commitments - if any(c["text"] == phrase for c in _commitments): - continue - _commitments.append({"text": phrase, "created_at": time.time(), "messages_since": 0}) - if len(_commitments) > _MAX_COMMITMENTS: - _commitments.pop(0) - - -def _tick_commitments() -> None: - """Increment messages_since for every open commitment.""" - for c in _commitments: - c["messages_since"] += 1 - - -def _build_commitment_context() -> str: - """Return a grounding note if any commitments are overdue for follow-up.""" - overdue = [c for c in _commitments if c["messages_since"] >= _REMIND_AFTER] - if not overdue: - return "" - lines = [f"- {c['text']}" for c in overdue] - return ( - "[Open commitments Timmy made earlier — " - "weave awareness naturally, don't list robotically]\n" + "\n".join(lines) - ) - - -def close_commitment(index: int) -> bool: - """Remove a commitment by index. Returns True if removed.""" - if 0 <= index < len(_commitments): - _commitments.pop(index) - return True - return False - - -def get_commitments() -> list[dict]: - """Return a copy of open commitments (for testing / API).""" - return list(_commitments) - - -def reset_commitments() -> None: - """Clear all commitments (for testing / session reset).""" - _commitments.clear() - - -# Conversation grounding — anchor to opening topic so Timmy doesn't drift. -_ground_topic: str | None = None -_ground_set_at: float = 0.0 -_GROUND_TTL = 300 # seconds of inactivity before the anchor expires - - -def _read_presence_file() -> dict | None: - """Read presence.json if it exists and is fresh enough.""" - try: - if not PRESENCE_FILE.exists(): - return None - age = time.time() - PRESENCE_FILE.stat().st_mtime - if age > _STALE_THRESHOLD: - logger.debug("presence.json is stale (%.0fs old)", age) - return None - return json.loads(PRESENCE_FILE.read_text()) - except (OSError, json.JSONDecodeError) as exc: - logger.warning("Failed to read presence.json: %s", exc) - return None - - -def _build_world_state(presence: dict) -> dict: - """Transform presence dict into the world/state API response.""" - return serialize_presence(presence) - - -def _get_current_state() -> dict: - """Build the current world-state dict from best available source.""" - presence = _read_presence_file() - - if presence is None: - try: - from timmy.workshop_state import get_state_dict - - presence = get_state_dict() - except Exception as exc: - logger.warning("Live state build failed: %s", exc) - presence = { - "version": 1, - "liveness": datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ"), - "mood": "calm", - "current_focus": "", - "active_threads": [], - "recent_events": [], - "concerns": [], - } - - return _build_world_state(presence) - - -@router.get("/state") -async def get_world_state() -> JSONResponse: - """Return Timmy's current world state for Workshop bootstrap. - - Reads from ``~/.timmy/presence.json`` if fresh, otherwise - rebuilds live from cognitive state. - """ - return JSONResponse( - content=_get_current_state(), - headers={"Cache-Control": "no-cache, no-store"}, - ) - - -# --------------------------------------------------------------------------- -# WebSocket endpoint — streams timmy_state changes to Workshop clients -# --------------------------------------------------------------------------- - - -async def _heartbeat(websocket: WebSocket) -> None: - """Send periodic pings to detect dead connections (iPad resilience). - - Safari suspends background tabs, killing the TCP socket silently. - A 15-second ping ensures we notice within one interval. - - Rescued from stale PR #399. - """ - try: - while True: - await asyncio.sleep(_HEARTBEAT_INTERVAL) - await websocket.send_text(json.dumps({"type": "ping"})) - except Exception: - logger.debug("Heartbeat stopped — connection gone") - - -async def _authenticate_ws(websocket: WebSocket) -> bool: - """Authenticate WebSocket connection using matrix_ws_token. - - Checks for token in query param ?token= first. If no query param, - accepts the connection and waits for first message with - {"type": "auth", "token": "..."}. - - Returns True if authenticated (or if auth is disabled). - Returns False and closes connection with code 4001 if invalid. - """ - token_setting = settings.matrix_ws_token - - # Auth disabled in dev mode (empty/unset token) - if not token_setting: - return True - - # Check query param first (can validate before accept) - query_token = websocket.query_params.get("token", "") - if query_token: - if query_token == token_setting: - return True - # Invalid token in query param - we need to accept to close properly - await websocket.accept() - await websocket.close(code=4001, reason="Invalid token") - return False - - # No query token - accept and wait for auth message - await websocket.accept() - - # Wait for auth message as first message - try: - raw = await websocket.receive_text() - data = json.loads(raw) - if data.get("type") == "auth" and data.get("token") == token_setting: - return True - # Invalid auth message - await websocket.close(code=4001, reason="Invalid token") - return False - except (json.JSONDecodeError, TypeError): - # Non-JSON first message without valid token - await websocket.close(code=4001, reason="Authentication required") - return False - - -@router.websocket("/ws") -async def world_ws(websocket: WebSocket) -> None: - """Accept a Workshop client and keep it alive for state broadcasts. - - Sends a full ``world_state`` snapshot immediately on connect so the - client never starts from a blank slate. Incoming frames are parsed - as JSON — ``visitor_message`` triggers a bark response. A background - heartbeat ping runs every 15 s to detect dead connections early. - - Authentication: - - If matrix_ws_token is configured, clients must provide it via - ?token= query param or in the first message as - {"type": "auth", "token": "..."}. - - Invalid token results in close code 4001. - - Valid token receives a connection_ack message. - """ - # Authenticate (may accept connection internally) - is_authed = await _authenticate_ws(websocket) - if not is_authed: - logger.info("World WS connection rejected — invalid token") - return - - # Auth passed - accept if not already accepted - if websocket.client_state.name != "CONNECTED": - await websocket.accept() - - # Send connection_ack if auth was required - if settings.matrix_ws_token: - await websocket.send_text(json.dumps({"type": "connection_ack"})) - - _ws_clients.append(websocket) - logger.info("World WS connected — %d clients", len(_ws_clients)) - - # Send full world-state snapshot so client bootstraps instantly - try: - snapshot = _get_current_state() - await websocket.send_text(json.dumps({"type": "world_state", **snapshot})) - except Exception as exc: - logger.warning("Failed to send WS snapshot: %s", exc) - - ping_task = asyncio.create_task(_heartbeat(websocket)) - try: - while True: - raw = await websocket.receive_text() - await _handle_client_message(raw) - except Exception: - logger.debug("WebSocket receive loop ended") - finally: - ping_task.cancel() - if websocket in _ws_clients: - _ws_clients.remove(websocket) - logger.info("World WS disconnected — %d clients", len(_ws_clients)) - - -async def _broadcast(message: str) -> None: - """Send *message* to every connected Workshop client, pruning dead ones.""" - dead: list[WebSocket] = [] - for ws in _ws_clients: - try: - await ws.send_text(message) - except Exception: - logger.debug("Pruning dead WebSocket client") - dead.append(ws) - for ws in dead: - if ws in _ws_clients: - _ws_clients.remove(ws) - - -async def broadcast_world_state(presence: dict) -> None: - """Broadcast a ``timmy_state`` message to all connected Workshop clients. - - Called by :class:`~timmy.workshop_state.WorkshopHeartbeat` via its - ``on_change`` callback. - """ - state = _build_world_state(presence) - await _broadcast(json.dumps({"type": "timmy_state", **state["timmyState"]})) - - -# --------------------------------------------------------------------------- -# Visitor chat — bark engine -# --------------------------------------------------------------------------- - - -async def _handle_client_message(raw: str) -> None: - """Dispatch an incoming WebSocket frame from the Workshop client.""" - try: - data = json.loads(raw) - except (json.JSONDecodeError, TypeError): - return # ignore non-JSON keep-alive pings - - if data.get("type") == "visitor_message": - text = (data.get("text") or "").strip() - if text: - task = asyncio.create_task(_bark_and_broadcast(text)) - task.add_done_callback(_log_bark_failure) - - -def _log_bark_failure(task: asyncio.Task) -> None: - """Log unhandled exceptions from fire-and-forget bark tasks.""" - if task.cancelled(): - return - exc = task.exception() - if exc is not None: - logger.error("Bark task failed: %s", exc) - - -def reset_conversation_ground() -> None: - """Clear the conversation grounding anchor (e.g. after inactivity).""" - global _ground_topic, _ground_set_at - _ground_topic = None - _ground_set_at = 0.0 - - -def _refresh_ground(visitor_text: str) -> None: - """Set or refresh the conversation grounding anchor. - - The first visitor message in a session (or after the TTL expires) - becomes the anchor topic. Subsequent messages are grounded against it. - """ - global _ground_topic, _ground_set_at - now = time.time() - if _ground_topic is None or (now - _ground_set_at) > _GROUND_TTL: - _ground_topic = visitor_text[:120] - logger.debug("Ground topic set: %s", _ground_topic) - _ground_set_at = now - - -async def _bark_and_broadcast(visitor_text: str) -> None: - """Generate a bark response and broadcast it to all Workshop clients.""" - await _broadcast(json.dumps({"type": "timmy_thinking"})) - - # Notify Pip that a visitor spoke - try: - from timmy.familiar import pip_familiar - - pip_familiar.on_event("visitor_spoke") - except Exception: - logger.debug("Pip familiar notification failed (optional)") - - _refresh_ground(visitor_text) - _tick_commitments() - reply = await _generate_bark(visitor_text) - _record_commitments(reply) - - _conversation.append({"visitor": visitor_text, "timmy": reply}) - - await _broadcast( - json.dumps( - { - "type": "timmy_speech", - "text": reply, - "recentExchanges": list(_conversation), - } - ) - ) - - -async def _generate_bark(visitor_text: str) -> str: - """Generate a short in-character bark response. - - Uses the existing Timmy session with a dedicated workshop session ID. - When a grounding anchor exists, the opening topic is prepended so the - model stays on-topic across long sessions. - Gracefully degrades to a canned response if inference fails. - """ - try: - from timmy import session as _session - - grounded = visitor_text - commitment_ctx = _build_commitment_context() - if commitment_ctx: - grounded = f"{commitment_ctx}\n{grounded}" - if _ground_topic and visitor_text != _ground_topic: - grounded = f"[Workshop conversation topic: {_ground_topic}]\n{grounded}" - response = await _session.chat(grounded, session_id=_WORKSHOP_SESSION_ID) - return response - except Exception as exc: - logger.warning("Bark generation failed: %s", exc) - return "Hmm, my thoughts are a bit tangled right now." - - -# --------------------------------------------------------------------------- -# Matrix Configuration Endpoint -# --------------------------------------------------------------------------- - -# Default Matrix configuration (fallback when matrix.yaml is missing/corrupt) -_DEFAULT_MATRIX_CONFIG: dict[str, Any] = { - "lighting": { - "ambient_color": "#1a1a2e", - "ambient_intensity": 0.4, - "point_lights": [ - {"color": "#FFD700", "intensity": 1.2, "position": {"x": 0, "y": 5, "z": 0}}, - {"color": "#3B82F6", "intensity": 0.8, "position": {"x": -5, "y": 3, "z": -5}}, - {"color": "#A855F7", "intensity": 0.6, "position": {"x": 5, "y": 3, "z": 5}}, - ], - }, - "environment": { - "rain_enabled": False, - "starfield_enabled": True, - "fog_color": "#0f0f23", - "fog_density": 0.02, - }, - "features": { - "chat_enabled": True, - "visitor_avatars": True, - "pip_familiar": True, - "workshop_portal": True, - }, -} - - -def _load_matrix_config() -> dict[str, Any]: - """Load Matrix world configuration from matrix.yaml with fallback to defaults. - - Returns a dict with sections: lighting, environment, features. - If the config file is missing or invalid, returns sensible defaults. - """ - try: - config_path = Path(settings.repo_root) / "config" / "matrix.yaml" - if not config_path.exists(): - logger.debug("matrix.yaml not found, using default config") - return _DEFAULT_MATRIX_CONFIG.copy() - - raw = config_path.read_text() - config = yaml.safe_load(raw) - if not isinstance(config, dict): - logger.warning("matrix.yaml invalid format, using defaults") - return _DEFAULT_MATRIX_CONFIG.copy() - - # Merge with defaults to ensure all required fields exist - result: dict[str, Any] = { - "lighting": { - **_DEFAULT_MATRIX_CONFIG["lighting"], - **config.get("lighting", {}), - }, - "environment": { - **_DEFAULT_MATRIX_CONFIG["environment"], - **config.get("environment", {}), - }, - "features": { - **_DEFAULT_MATRIX_CONFIG["features"], - **config.get("features", {}), - }, - } - - # Ensure point_lights is a list - if "point_lights" in config.get("lighting", {}): - result["lighting"]["point_lights"] = config["lighting"]["point_lights"] - else: - result["lighting"]["point_lights"] = _DEFAULT_MATRIX_CONFIG["lighting"]["point_lights"] - - return result - except Exception as exc: - logger.warning("Failed to load matrix config: %s, using defaults", exc) - return _DEFAULT_MATRIX_CONFIG.copy() - - -@matrix_router.get("/config") -async def get_matrix_config() -> JSONResponse: - """Return Matrix world configuration. - - Serves lighting presets, environment settings, and feature flags - to the Matrix frontend so it can be config-driven rather than - hardcoded. Reads from config/matrix.yaml with sensible defaults. - - Response structure: - - lighting: ambient_color, ambient_intensity, point_lights[] - - environment: rain_enabled, starfield_enabled, fog_color, fog_density - - features: chat_enabled, visitor_avatars, pip_familiar, workshop_portal - """ - config = _load_matrix_config() - return JSONResponse( - content=config, - headers={"Cache-Control": "no-cache, no-store"}, - ) - - -# --------------------------------------------------------------------------- -# Matrix Agent Registry Endpoint -# --------------------------------------------------------------------------- - - -@matrix_router.get("/agents") -async def get_matrix_agents() -> JSONResponse: - """Return the agent registry for Matrix visualization. - - Serves agents from agents.yaml with Matrix-compatible formatting: - - id: agent identifier - - display_name: human-readable name - - role: functional role - - color: hex color code for visualization - - position: {x, y, z} coordinates in 3D space - - shape: 3D shape type - - status: availability status - - Agents are arranged in a circular layout by default. - Returns 200 with empty list if no agents configured. - """ - agents = _build_matrix_agents_response() - return JSONResponse( - content=agents, - headers={"Cache-Control": "no-cache, no-store"}, - ) - - -# --------------------------------------------------------------------------- -# Matrix Thoughts Endpoint — Timmy's recent thought stream for Matrix display -# --------------------------------------------------------------------------- - -_MAX_THOUGHT_LIMIT = 50 # Maximum thoughts allowed per request -_DEFAULT_THOUGHT_LIMIT = 10 # Default number of thoughts to return -_MAX_THOUGHT_TEXT_LEN = 500 # Max characters for thought text - - -def _build_matrix_thoughts_response(limit: int = _DEFAULT_THOUGHT_LIMIT) -> list[dict[str, Any]]: - """Build the Matrix thoughts response from the thinking engine. - - Returns recent thoughts formatted for Matrix display: - - id: thought UUID - - text: thought content (truncated to 500 chars) - - created_at: ISO-8601 timestamp - - chain_id: parent thought ID (or null if root thought) - - Returns empty list if thinking engine is disabled or fails. - """ - try: - from timmy.thinking import thinking_engine - - thoughts = thinking_engine.get_recent_thoughts(limit=limit) - return [ - { - "id": t.id, - "text": t.content[:_MAX_THOUGHT_TEXT_LEN], - "created_at": t.created_at, - "chain_id": t.parent_id, - } - for t in thoughts - ] - except Exception as exc: - logger.warning("Failed to load thoughts for Matrix: %s", exc) - return [] - - -@matrix_router.get("/thoughts") -async def get_matrix_thoughts(limit: int = _DEFAULT_THOUGHT_LIMIT) -> JSONResponse: - """Return Timmy's recent thoughts formatted for Matrix display. - - This is the REST companion to the thought WebSocket messages, - allowing the Matrix frontend to display what Timmy is actually - thinking about rather than canned contextual lines. - - Query params: - - limit: Number of thoughts to return (default 10, max 50) - - Response: JSON array of thought objects: - - id: thought UUID - - text: thought content (truncated to 500 chars) - - created_at: ISO-8601 timestamp - - chain_id: parent thought ID (null if root thought) - - Returns empty array if thinking engine is disabled or fails. - """ - # Clamp limit to valid range - if limit < 1: - limit = 1 - elif limit > _MAX_THOUGHT_LIMIT: - limit = _MAX_THOUGHT_LIMIT - - thoughts = _build_matrix_thoughts_response(limit=limit) - return JSONResponse( - content=thoughts, - headers={"Cache-Control": "no-cache, no-store"}, - ) - - -# --------------------------------------------------------------------------- -# Matrix Health Endpoint — backend capability discovery -# --------------------------------------------------------------------------- - -# Health check cache (5-second TTL for capability checks) -_health_cache: dict | None = None -_health_cache_ts: float = 0.0 -_HEALTH_CACHE_TTL = 5.0 - - -def _check_capability_thinking() -> bool: - """Check if thinking engine is available.""" - try: - from timmy.thinking import thinking_engine - - # Check if the engine has been initialized (has a db path) - return hasattr(thinking_engine, "_db") and thinking_engine._db is not None - except Exception: - return False - - -def _check_capability_memory() -> bool: - """Check if memory system is available.""" - try: - from timmy.memory_system import HOT_MEMORY_PATH - - return HOT_MEMORY_PATH.exists() - except Exception: - return False - - -def _check_capability_bark() -> bool: - """Check if bark production is available.""" - try: - from infrastructure.presence import produce_bark - - return callable(produce_bark) - except Exception: - return False - - -def _check_capability_familiar() -> bool: - """Check if familiar (Pip) is available.""" - try: - from timmy.familiar import pip_familiar - - return pip_familiar is not None - except Exception: - return False - - -def _check_capability_lightning() -> bool: - """Check if Lightning payments are available.""" - # Lightning is currently disabled per health.py - # Returns False until properly re-implemented - return False - - -def _build_matrix_health_response() -> dict[str, Any]: - """Build the Matrix health response with capability checks. - - Performs lightweight checks (<100ms total) to determine which features - are available. Returns 200 even if some capabilities are degraded. - """ - capabilities = { - "thinking": _check_capability_thinking(), - "memory": _check_capability_memory(), - "bark": _check_capability_bark(), - "familiar": _check_capability_familiar(), - "lightning": _check_capability_lightning(), - } - - # Status is ok if core capabilities (thinking, memory, bark) are available - core_caps = ["thinking", "memory", "bark"] - core_available = all(capabilities[c] for c in core_caps) - status = "ok" if core_available else "degraded" - - return { - "status": status, - "version": "1.0.0", - "capabilities": capabilities, - } - - -@matrix_router.get("/health") -async def get_matrix_health() -> JSONResponse: - """Return health status and capability availability for Matrix frontend. - - This endpoint allows the Matrix frontend to discover what backend - capabilities are available so it can show/hide UI elements: - - thinking: Show thought bubbles if enabled - - memory: Show crystal ball memory search if available - - bark: Enable visitor chat responses - - familiar: Show Pip the familiar - - lightning: Enable payment features - - Response time is <100ms (no heavy checks). Returns 200 even if - some capabilities are degraded. - - Response: - - status: "ok" or "degraded" - - version: API version string - - capabilities: dict of feature:bool - """ - response = _build_matrix_health_response() - status_code = 200 # Always 200, even if degraded - - return JSONResponse( - content=response, - status_code=status_code, - headers={"Cache-Control": "no-cache, no-store"}, - ) - - -# --------------------------------------------------------------------------- -# Matrix Memory Search Endpoint — visitors query Timmy's memory -# --------------------------------------------------------------------------- - -# Rate limiting: 1 search per 5 seconds per IP -_MEMORY_SEARCH_RATE_LIMIT_SECONDS = 5 -_memory_search_last_request: dict[str, float] = {} -_MAX_MEMORY_RESULTS = 5 -_MAX_MEMORY_TEXT_LENGTH = 200 - - -def _get_client_ip(request) -> str: - """Extract client IP from request, respecting X-Forwarded-For header.""" - # Check for forwarded IP (when behind proxy) - forwarded = request.headers.get("X-Forwarded-For") - if forwarded: - # Take the first IP in the chain - return forwarded.split(",")[0].strip() - # Fall back to direct client IP - if request.client: - return request.client.host - return "unknown" - - -def _build_matrix_memory_response( - memories: list, -) -> list[dict[str, Any]]: - """Build the Matrix memory search response. - - Formats memory entries for Matrix display: - - text: truncated to 200 characters - - relevance: 0-1 score from relevance_score - - created_at: ISO-8601 timestamp - - context_type: the memory type - - Results are capped at _MAX_MEMORY_RESULTS. - """ - results = [] - for mem in memories[:_MAX_MEMORY_RESULTS]: - text = mem.content - if len(text) > _MAX_MEMORY_TEXT_LENGTH: - text = text[:_MAX_MEMORY_TEXT_LENGTH] + "..." - - results.append( - { - "text": text, - "relevance": round(mem.relevance_score or 0.0, 4), - "created_at": mem.timestamp, - "context_type": mem.context_type, - } - ) - return results - - -@matrix_router.get("/memory/search") -async def get_matrix_memory_search(request: Request, q: str | None = None) -> JSONResponse: - """Search Timmy's memory for relevant snippets. - - Allows Matrix visitors to query Timmy's memory ("what do you remember - about sovereignty?"). Results appear as floating crystal-ball text - in the Workshop room. - - Query params: - - q: Search query text (required) - - Response: JSON array of memory objects: - - text: Memory content (truncated to 200 chars) - - relevance: Similarity score 0-1 - - created_at: ISO-8601 timestamp - - context_type: Memory type (conversation, fact, etc.) - - Rate limited to 1 search per 5 seconds per IP. - - Returns: - - 200: JSON array of memory results (max 5) - - 400: Missing or empty query parameter - - 429: Rate limit exceeded - """ - # Validate query parameter - query = q.strip() if q else "" - if not query: - return JSONResponse( - status_code=400, - content={"error": "Query parameter 'q' is required"}, - ) - - # Rate limiting check by IP - client_ip = _get_client_ip(request) - now = time.time() - last_request = _memory_search_last_request.get(client_ip, 0) - time_since_last = now - last_request - - if time_since_last < _MEMORY_SEARCH_RATE_LIMIT_SECONDS: - retry_after = _MEMORY_SEARCH_RATE_LIMIT_SECONDS - time_since_last - return JSONResponse( - status_code=429, - content={"error": "Rate limit exceeded. Try again later."}, - headers={"Retry-After": str(int(retry_after) + 1)}, - ) - - # Record this request - _memory_search_last_request[client_ip] = now - - # Search memories - try: - memories = search_memories(query, limit=_MAX_MEMORY_RESULTS) - results = _build_matrix_memory_response(memories) - except Exception as exc: - logger.warning("Memory search failed: %s", exc) - results = [] - - return JSONResponse( - content=results, - headers={"Cache-Control": "no-cache, no-store"}, - ) diff --git a/src/dashboard/routes/world/__init__.py b/src/dashboard/routes/world/__init__.py new file mode 100644 index 00000000..914c8203 --- /dev/null +++ b/src/dashboard/routes/world/__init__.py @@ -0,0 +1,124 @@ +"""Workshop world state API and WebSocket relay. + +Serves Timmy's current presence state to the Workshop 3D renderer. +The primary consumer is the browser on first load — before any +WebSocket events arrive, the client needs a full state snapshot. + +The ``/ws/world`` endpoint streams ``timmy_state`` messages whenever +the heartbeat detects a state change. It also accepts ``visitor_message`` +frames from the 3D client and responds with ``timmy_speech`` barks. + +Source of truth: ``~/.timmy/presence.json`` written by +:class:`~timmy.workshop_state.WorkshopHeartbeat`. +Falls back to a live ``get_state_dict()`` call if the file is stale +or missing. +""" + +from fastapi import APIRouter + +# Import submodule routers +from .bark import matrix_router as _bark_matrix_router +from .matrix import matrix_router as _matrix_matrix_router +from .state import router as _state_router +from .websocket import router as _ws_router + +# --------------------------------------------------------------------------- +# Combine sub-routers into the two top-level routers that app.py expects +# --------------------------------------------------------------------------- + +router = APIRouter(prefix="/api/world", tags=["world"]) + +# Include state routes (GET /state) +for route in _state_router.routes: + router.routes.append(route) + +# Include websocket routes (WS /ws) +for route in _ws_router.routes: + router.routes.append(route) + +# Combine matrix sub-routers +matrix_router = APIRouter(prefix="/api/matrix", tags=["matrix"]) + +for route in _bark_matrix_router.routes: + matrix_router.routes.append(route) + +for route in _matrix_matrix_router.routes: + matrix_router.routes.append(route) + +# --------------------------------------------------------------------------- +# Re-export public API for backward compatibility +# --------------------------------------------------------------------------- + +# Used by src/dashboard/app.py +from .websocket import broadcast_world_state # noqa: E402, F401 + +# Used by src/infrastructure/presence.py +from .websocket import _ws_clients # noqa: E402, F401 + +# Used by tests +from .bark import ( # noqa: E402, F401 + BarkRequest, + _BARK_RATE_LIMIT_SECONDS, + _GROUND_TTL, + _MAX_EXCHANGES, + _bark_and_broadcast, + _bark_last_request, + _conversation, + _generate_bark, + _handle_client_message, + _log_bark_failure, + _refresh_ground, + post_matrix_bark, + reset_conversation_ground, +) +from .commitments import ( # noqa: E402, F401 + _COMMITMENT_PATTERNS, + _MAX_COMMITMENTS, + _REMIND_AFTER, + _build_commitment_context, + _commitments, + _extract_commitments, + _record_commitments, + _tick_commitments, + close_commitment, + get_commitments, + reset_commitments, +) +from .matrix import ( # noqa: E402, F401 + _DEFAULT_MATRIX_CONFIG, + _build_matrix_agents_response, + _build_matrix_health_response, + _build_matrix_memory_response, + _build_matrix_thoughts_response, + _check_capability_bark, + _check_capability_familiar, + _check_capability_lightning, + _check_capability_memory, + _check_capability_thinking, + _load_matrix_config, + _memory_search_last_request, + get_matrix_agents, + get_matrix_config, + get_matrix_health, + get_matrix_memory_search, + get_matrix_thoughts, +) +from .state import ( # noqa: E402, F401 + _STALE_THRESHOLD, + _build_world_state, + _get_current_state, + _read_presence_file, + get_world_state, +) +from .utils import ( # noqa: E402, F401 + _compute_circular_positions, + _get_agent_color, + _get_agent_shape, + _get_client_ip, +) +from .websocket import ( # noqa: E402, F401 + _authenticate_ws, + _broadcast, + _heartbeat, + world_ws, +) diff --git a/src/dashboard/routes/world/bark.py b/src/dashboard/routes/world/bark.py new file mode 100644 index 00000000..dc1f2ef9 --- /dev/null +++ b/src/dashboard/routes/world/bark.py @@ -0,0 +1,212 @@ +"""Bark/conversation — visitor chat engine and Matrix bark endpoint.""" + +import asyncio +import json +import logging +import time +from collections import deque + +from fastapi import APIRouter +from fastapi.responses import JSONResponse +from pydantic import BaseModel + +from infrastructure.presence import produce_bark + +from .commitments import ( + _build_commitment_context, + _record_commitments, + _tick_commitments, +) + +logger = logging.getLogger(__name__) + +matrix_router = APIRouter(prefix="/api/matrix", tags=["matrix"]) + +# Rate limiting: 1 request per 3 seconds per visitor_id +_BARK_RATE_LIMIT_SECONDS = 3 +_bark_last_request: dict[str, float] = {} + +# Recent conversation buffer — kept in memory for the Workshop overlay. +# Stores the last _MAX_EXCHANGES (visitor_text, timmy_text) pairs. +_MAX_EXCHANGES = 3 +_conversation: deque[dict] = deque(maxlen=_MAX_EXCHANGES) + +_WORKSHOP_SESSION_ID = "workshop" + +# Conversation grounding — anchor to opening topic so Timmy doesn't drift. +_ground_topic: str | None = None +_ground_set_at: float = 0.0 +_GROUND_TTL = 300 # seconds of inactivity before the anchor expires + + +class BarkRequest(BaseModel): + """Request body for POST /api/matrix/bark.""" + + text: str + visitor_id: str + + +@matrix_router.post("/bark") +async def post_matrix_bark(request: BarkRequest) -> JSONResponse: + """Generate a bark response for a visitor message. + + HTTP fallback for when WebSocket isn't available. The Matrix frontend + can POST a message and get Timmy's bark response back as JSON. + + Rate limited to 1 request per 3 seconds per visitor_id. + + Request body: + - text: The visitor's message text + - visitor_id: Unique identifier for the visitor (used for rate limiting) + + Returns: + - 200: Bark message in produce_bark() format + - 429: Rate limit exceeded (try again later) + - 422: Invalid request (missing/invalid fields) + """ + # Validate inputs + text = request.text.strip() if request.text else "" + visitor_id = request.visitor_id.strip() if request.visitor_id else "" + + if not text: + return JSONResponse( + status_code=422, + content={"error": "text is required"}, + ) + + if not visitor_id: + return JSONResponse( + status_code=422, + content={"error": "visitor_id is required"}, + ) + + # Rate limiting check + now = time.time() + last_request = _bark_last_request.get(visitor_id, 0) + time_since_last = now - last_request + + if time_since_last < _BARK_RATE_LIMIT_SECONDS: + retry_after = _BARK_RATE_LIMIT_SECONDS - time_since_last + return JSONResponse( + status_code=429, + content={"error": "Rate limit exceeded. Try again later."}, + headers={"Retry-After": str(int(retry_after) + 1)}, + ) + + # Record this request + _bark_last_request[visitor_id] = now + + # Generate bark response + try: + reply = await _generate_bark(text) + except Exception as exc: + logger.warning("Bark generation failed: %s", exc) + reply = "Hmm, my thoughts are a bit tangled right now." + + # Build bark response using produce_bark format + bark = produce_bark(agent_id="timmy", text=reply, style="speech") + + return JSONResponse( + content=bark, + headers={"Cache-Control": "no-cache, no-store"}, + ) + + +def reset_conversation_ground() -> None: + """Clear the conversation grounding anchor (e.g. after inactivity).""" + global _ground_topic, _ground_set_at + _ground_topic = None + _ground_set_at = 0.0 + + +def _refresh_ground(visitor_text: str) -> None: + """Set or refresh the conversation grounding anchor. + + The first visitor message in a session (or after the TTL expires) + becomes the anchor topic. Subsequent messages are grounded against it. + """ + global _ground_topic, _ground_set_at + now = time.time() + if _ground_topic is None or (now - _ground_set_at) > _GROUND_TTL: + _ground_topic = visitor_text[:120] + logger.debug("Ground topic set: %s", _ground_topic) + _ground_set_at = now + + +async def _bark_and_broadcast(visitor_text: str) -> None: + """Generate a bark response and broadcast it to all Workshop clients.""" + from .websocket import _broadcast + + await _broadcast(json.dumps({"type": "timmy_thinking"})) + + # Notify Pip that a visitor spoke + try: + from timmy.familiar import pip_familiar + + pip_familiar.on_event("visitor_spoke") + except Exception: + logger.debug("Pip familiar notification failed (optional)") + + _refresh_ground(visitor_text) + _tick_commitments() + reply = await _generate_bark(visitor_text) + _record_commitments(reply) + + _conversation.append({"visitor": visitor_text, "timmy": reply}) + + await _broadcast( + json.dumps( + { + "type": "timmy_speech", + "text": reply, + "recentExchanges": list(_conversation), + } + ) + ) + + +async def _generate_bark(visitor_text: str) -> str: + """Generate a short in-character bark response. + + Uses the existing Timmy session with a dedicated workshop session ID. + When a grounding anchor exists, the opening topic is prepended so the + model stays on-topic across long sessions. + Gracefully degrades to a canned response if inference fails. + """ + try: + from timmy import session as _session + + grounded = visitor_text + commitment_ctx = _build_commitment_context() + if commitment_ctx: + grounded = f"{commitment_ctx}\n{grounded}" + if _ground_topic and visitor_text != _ground_topic: + grounded = f"[Workshop conversation topic: {_ground_topic}]\n{grounded}" + response = await _session.chat(grounded, session_id=_WORKSHOP_SESSION_ID) + return response + except Exception as exc: + logger.warning("Bark generation failed: %s", exc) + return "Hmm, my thoughts are a bit tangled right now." + + +def _log_bark_failure(task: asyncio.Task) -> None: + """Log unhandled exceptions from fire-and-forget bark tasks.""" + if task.cancelled(): + return + exc = task.exception() + if exc is not None: + logger.error("Bark task failed: %s", exc) + + +async def _handle_client_message(raw: str) -> None: + """Dispatch an incoming WebSocket frame from the Workshop client.""" + try: + data = json.loads(raw) + except (json.JSONDecodeError, TypeError): + return # ignore non-JSON keep-alive pings + + if data.get("type") == "visitor_message": + text = (data.get("text") or "").strip() + if text: + task = asyncio.create_task(_bark_and_broadcast(text)) + task.add_done_callback(_log_bark_failure) diff --git a/src/dashboard/routes/world/commitments.py b/src/dashboard/routes/world/commitments.py new file mode 100644 index 00000000..20c0c227 --- /dev/null +++ b/src/dashboard/routes/world/commitments.py @@ -0,0 +1,77 @@ +"""Conversation grounding — commitment tracking (rescued from PR #408).""" + +import re +import time + +# Patterns that indicate Timmy is committing to an action. +_COMMITMENT_PATTERNS: list[re.Pattern[str]] = [ + re.compile(r"I'll (.+?)(?:\.|!|\?|$)", re.IGNORECASE), + re.compile(r"I will (.+?)(?:\.|!|\?|$)", re.IGNORECASE), + re.compile(r"[Ll]et me (.+?)(?:\.|!|\?|$)", re.IGNORECASE), +] + +# After this many messages without follow-up, surface open commitments. +_REMIND_AFTER = 5 +_MAX_COMMITMENTS = 10 + +# In-memory list of open commitments. +# Each entry: {"text": str, "created_at": float, "messages_since": int} +_commitments: list[dict] = [] + + +def _extract_commitments(text: str) -> list[str]: + """Pull commitment phrases from Timmy's reply text.""" + found: list[str] = [] + for pattern in _COMMITMENT_PATTERNS: + for match in pattern.finditer(text): + phrase = match.group(1).strip() + if len(phrase) > 5: # skip trivially short matches + found.append(phrase[:120]) + return found + + +def _record_commitments(reply: str) -> None: + """Scan a Timmy reply for commitments and store them.""" + for phrase in _extract_commitments(reply): + # Avoid near-duplicate commitments + if any(c["text"] == phrase for c in _commitments): + continue + _commitments.append({"text": phrase, "created_at": time.time(), "messages_since": 0}) + if len(_commitments) > _MAX_COMMITMENTS: + _commitments.pop(0) + + +def _tick_commitments() -> None: + """Increment messages_since for every open commitment.""" + for c in _commitments: + c["messages_since"] += 1 + + +def _build_commitment_context() -> str: + """Return a grounding note if any commitments are overdue for follow-up.""" + overdue = [c for c in _commitments if c["messages_since"] >= _REMIND_AFTER] + if not overdue: + return "" + lines = [f"- {c['text']}" for c in overdue] + return ( + "[Open commitments Timmy made earlier — " + "weave awareness naturally, don't list robotically]\n" + "\n".join(lines) + ) + + +def close_commitment(index: int) -> bool: + """Remove a commitment by index. Returns True if removed.""" + if 0 <= index < len(_commitments): + _commitments.pop(index) + return True + return False + + +def get_commitments() -> list[dict]: + """Return a copy of open commitments (for testing / API).""" + return list(_commitments) + + +def reset_commitments() -> None: + """Clear all commitments (for testing / session reset).""" + _commitments.clear() diff --git a/src/dashboard/routes/world/matrix.py b/src/dashboard/routes/world/matrix.py new file mode 100644 index 00000000..6024ab90 --- /dev/null +++ b/src/dashboard/routes/world/matrix.py @@ -0,0 +1,397 @@ +"""Matrix API endpoints — config, agents, health, thoughts, memory search.""" + +import logging +import time +from pathlib import Path +from typing import Any + +import yaml +from fastapi import APIRouter, Request +from fastapi.responses import JSONResponse + +from config import settings +from timmy.memory_system import search_memories + +from .utils import ( + _DEFAULT_STATUS, + _compute_circular_positions, + _get_agent_color, + _get_agent_shape, + _get_client_ip, +) + +logger = logging.getLogger(__name__) + +matrix_router = APIRouter(prefix="/api/matrix", tags=["matrix"]) + +# Default Matrix configuration (fallback when matrix.yaml is missing/corrupt) +_DEFAULT_MATRIX_CONFIG: dict[str, Any] = { + "lighting": { + "ambient_color": "#1a1a2e", + "ambient_intensity": 0.4, + "point_lights": [ + {"color": "#FFD700", "intensity": 1.2, "position": {"x": 0, "y": 5, "z": 0}}, + {"color": "#3B82F6", "intensity": 0.8, "position": {"x": -5, "y": 3, "z": -5}}, + {"color": "#A855F7", "intensity": 0.6, "position": {"x": 5, "y": 3, "z": 5}}, + ], + }, + "environment": { + "rain_enabled": False, + "starfield_enabled": True, + "fog_color": "#0f0f23", + "fog_density": 0.02, + }, + "features": { + "chat_enabled": True, + "visitor_avatars": True, + "pip_familiar": True, + "workshop_portal": True, + }, +} + + +def _load_matrix_config() -> dict[str, Any]: + """Load Matrix world configuration from matrix.yaml with fallback to defaults. + + Returns a dict with sections: lighting, environment, features. + If the config file is missing or invalid, returns sensible defaults. + """ + try: + config_path = Path(settings.repo_root) / "config" / "matrix.yaml" + if not config_path.exists(): + logger.debug("matrix.yaml not found, using default config") + return _DEFAULT_MATRIX_CONFIG.copy() + + raw = config_path.read_text() + config = yaml.safe_load(raw) + if not isinstance(config, dict): + logger.warning("matrix.yaml invalid format, using defaults") + return _DEFAULT_MATRIX_CONFIG.copy() + + # Merge with defaults to ensure all required fields exist + result: dict[str, Any] = { + "lighting": { + **_DEFAULT_MATRIX_CONFIG["lighting"], + **config.get("lighting", {}), + }, + "environment": { + **_DEFAULT_MATRIX_CONFIG["environment"], + **config.get("environment", {}), + }, + "features": { + **_DEFAULT_MATRIX_CONFIG["features"], + **config.get("features", {}), + }, + } + + # Ensure point_lights is a list + if "point_lights" in config.get("lighting", {}): + result["lighting"]["point_lights"] = config["lighting"]["point_lights"] + else: + result["lighting"]["point_lights"] = _DEFAULT_MATRIX_CONFIG["lighting"]["point_lights"] + + return result + except Exception as exc: + logger.warning("Failed to load matrix config: %s, using defaults", exc) + return _DEFAULT_MATRIX_CONFIG.copy() + + +@matrix_router.get("/config") +async def get_matrix_config() -> JSONResponse: + """Return Matrix world configuration. + + Serves lighting presets, environment settings, and feature flags + to the Matrix frontend so it can be config-driven rather than + hardcoded. Reads from config/matrix.yaml with sensible defaults. + + """ + config = _load_matrix_config() + return JSONResponse( + content=config, + headers={"Cache-Control": "no-cache, no-store"}, + ) + + +def _build_matrix_agents_response() -> list[dict[str, Any]]: + """Build the Matrix agent registry response. + + Reads from agents.yaml and returns agents with Matrix-compatible + formatting including colors, shapes, and positions. + """ + try: + from timmy.agents.loader import list_agents + + agents = list_agents() + if not agents: + return [] + + positions = _compute_circular_positions(len(agents)) + + result = [] + for i, agent in enumerate(agents): + agent_id = agent.get("id", "") + result.append( + { + "id": agent_id, + "display_name": agent.get("name", agent_id.title()), + "role": agent.get("role", "general"), + "color": _get_agent_color(agent_id), + "position": positions[i], + "shape": _get_agent_shape(agent_id), + "status": agent.get("status", _DEFAULT_STATUS), + } + ) + + return result + except Exception as exc: + logger.warning("Failed to load agents for Matrix: %s", exc) + return [] + + +@matrix_router.get("/agents") +async def get_matrix_agents() -> JSONResponse: + """Return the agent registry for Matrix visualization. + + Serves agents from agents.yaml with Matrix-compatible formatting. + Returns 200 with empty list if no agents configured. + """ + agents = _build_matrix_agents_response() + return JSONResponse( + content=agents, + headers={"Cache-Control": "no-cache, no-store"}, + ) + + +_MAX_THOUGHT_LIMIT = 50 # Maximum thoughts allowed per request +_DEFAULT_THOUGHT_LIMIT = 10 # Default number of thoughts to return +_MAX_THOUGHT_TEXT_LEN = 500 # Max characters for thought text + + +def _build_matrix_thoughts_response(limit: int = _DEFAULT_THOUGHT_LIMIT) -> list[dict[str, Any]]: + """Build the Matrix thoughts response from the thinking engine. + + Returns recent thoughts formatted for Matrix display: + - id: thought UUID + - text: thought content (truncated to 500 chars) + - created_at: ISO-8601 timestamp + - chain_id: parent thought ID (or null if root thought) + + Returns empty list if thinking engine is disabled or fails. + """ + try: + from timmy.thinking import thinking_engine + + thoughts = thinking_engine.get_recent_thoughts(limit=limit) + return [ + { + "id": t.id, + "text": t.content[:_MAX_THOUGHT_TEXT_LEN], + "created_at": t.created_at, + "chain_id": t.parent_id, + } + for t in thoughts + ] + except Exception as exc: + logger.warning("Failed to load thoughts for Matrix: %s", exc) + return [] + + +@matrix_router.get("/thoughts") +async def get_matrix_thoughts(limit: int = _DEFAULT_THOUGHT_LIMIT) -> JSONResponse: + """Return Timmy's recent thoughts formatted for Matrix display. + + Query params: + - limit: Number of thoughts to return (default 10, max 50) + + Returns empty array if thinking engine is disabled or fails. + """ + # Clamp limit to valid range + if limit < 1: + limit = 1 + elif limit > _MAX_THOUGHT_LIMIT: + limit = _MAX_THOUGHT_LIMIT + + thoughts = _build_matrix_thoughts_response(limit=limit) + return JSONResponse( + content=thoughts, + headers={"Cache-Control": "no-cache, no-store"}, + ) + + +# Health check cache (5-second TTL for capability checks) +_health_cache: dict | None = None +_health_cache_ts: float = 0.0 +_HEALTH_CACHE_TTL = 5.0 + + +def _check_capability_thinking() -> bool: + """Check if thinking engine is available.""" + try: + from timmy.thinking import thinking_engine + + # Check if the engine has been initialized (has a db path) + return hasattr(thinking_engine, "_db") and thinking_engine._db is not None + except Exception: + return False + + +def _check_capability_memory() -> bool: + """Check if memory system is available.""" + try: + from timmy.memory_system import HOT_MEMORY_PATH + + return HOT_MEMORY_PATH.exists() + except Exception: + return False + + +def _check_capability_bark() -> bool: + """Check if bark production is available.""" + try: + from infrastructure.presence import produce_bark + + return callable(produce_bark) + except Exception: + return False + + +def _check_capability_familiar() -> bool: + """Check if familiar (Pip) is available.""" + try: + from timmy.familiar import pip_familiar + + return pip_familiar is not None + except Exception: + return False + + +def _check_capability_lightning() -> bool: + """Check if Lightning payments are available.""" + # Lightning is currently disabled per health.py + # Returns False until properly re-implemented + return False + + +def _build_matrix_health_response() -> dict[str, Any]: + """Build the Matrix health response with capability checks. + + Performs lightweight checks (<100ms total) to determine which features + are available. Returns 200 even if some capabilities are degraded. + """ + capabilities = { + "thinking": _check_capability_thinking(), + "memory": _check_capability_memory(), + "bark": _check_capability_bark(), + "familiar": _check_capability_familiar(), + "lightning": _check_capability_lightning(), + } + + # Status is ok if core capabilities (thinking, memory, bark) are available + core_caps = ["thinking", "memory", "bark"] + core_available = all(capabilities[c] for c in core_caps) + status = "ok" if core_available else "degraded" + + return { + "status": status, + "version": "1.0.0", + "capabilities": capabilities, + } + + +@matrix_router.get("/health") +async def get_matrix_health() -> JSONResponse: + """Return health status and capability availability for Matrix frontend. + + Returns 200 even if some capabilities are degraded. + """ + response = _build_matrix_health_response() + status_code = 200 # Always 200, even if degraded + + return JSONResponse( + content=response, + status_code=status_code, + headers={"Cache-Control": "no-cache, no-store"}, + ) + + +# Rate limiting: 1 search per 5 seconds per IP +_MEMORY_SEARCH_RATE_LIMIT_SECONDS = 5 +_memory_search_last_request: dict[str, float] = {} +_MAX_MEMORY_RESULTS = 5 +_MAX_MEMORY_TEXT_LENGTH = 200 + + +def _build_matrix_memory_response( + memories: list, +) -> list[dict[str, Any]]: + """Build the Matrix memory search response. + + Formats memory entries for Matrix display: + - text: truncated to 200 characters + - relevance: 0-1 score from relevance_score + - created_at: ISO-8601 timestamp + - context_type: the memory type + + Results are capped at _MAX_MEMORY_RESULTS. + """ + results = [] + for mem in memories[:_MAX_MEMORY_RESULTS]: + text = mem.content + if len(text) > _MAX_MEMORY_TEXT_LENGTH: + text = text[:_MAX_MEMORY_TEXT_LENGTH] + "..." + + results.append( + { + "text": text, + "relevance": round(mem.relevance_score or 0.0, 4), + "created_at": mem.timestamp, + "context_type": mem.context_type, + } + ) + return results + + +@matrix_router.get("/memory/search") +async def get_matrix_memory_search(request: Request, q: str | None = None) -> JSONResponse: + """Search Timmy's memory for relevant snippets. + + Rate limited to 1 search per 5 seconds per IP. + Returns 200 with results, 400 if missing query, or 429 if rate limited. + """ + # Validate query parameter + query = q.strip() if q else "" + if not query: + return JSONResponse( + status_code=400, + content={"error": "Query parameter 'q' is required"}, + ) + + # Rate limiting check by IP + client_ip = _get_client_ip(request) + now = time.time() + last_request = _memory_search_last_request.get(client_ip, 0) + time_since_last = now - last_request + + if time_since_last < _MEMORY_SEARCH_RATE_LIMIT_SECONDS: + retry_after = _MEMORY_SEARCH_RATE_LIMIT_SECONDS - time_since_last + return JSONResponse( + status_code=429, + content={"error": "Rate limit exceeded. Try again later."}, + headers={"Retry-After": str(int(retry_after) + 1)}, + ) + + # Record this request + _memory_search_last_request[client_ip] = now + + # Search memories + try: + memories = search_memories(query, limit=_MAX_MEMORY_RESULTS) + results = _build_matrix_memory_response(memories) + except Exception as exc: + logger.warning("Memory search failed: %s", exc) + results = [] + + return JSONResponse( + content=results, + headers={"Cache-Control": "no-cache, no-store"}, + ) diff --git a/src/dashboard/routes/world/state.py b/src/dashboard/routes/world/state.py new file mode 100644 index 00000000..a5ab52c5 --- /dev/null +++ b/src/dashboard/routes/world/state.py @@ -0,0 +1,75 @@ +"""World state functions — presence file reading and state API.""" + +import json +import logging +import time +from datetime import UTC, datetime + +from fastapi import APIRouter +from fastapi.responses import JSONResponse + +from infrastructure.presence import serialize_presence +from timmy.workshop_state import PRESENCE_FILE + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/world", tags=["world"]) + +_STALE_THRESHOLD = 90 # seconds — file older than this triggers live rebuild + + +def _read_presence_file() -> dict | None: + """Read presence.json if it exists and is fresh enough.""" + try: + if not PRESENCE_FILE.exists(): + return None + age = time.time() - PRESENCE_FILE.stat().st_mtime + if age > _STALE_THRESHOLD: + logger.debug("presence.json is stale (%.0fs old)", age) + return None + return json.loads(PRESENCE_FILE.read_text()) + except (OSError, json.JSONDecodeError) as exc: + logger.warning("Failed to read presence.json: %s", exc) + return None + + +def _build_world_state(presence: dict) -> dict: + """Transform presence dict into the world/state API response.""" + return serialize_presence(presence) + + +def _get_current_state() -> dict: + """Build the current world-state dict from best available source.""" + presence = _read_presence_file() + + if presence is None: + try: + from timmy.workshop_state import get_state_dict + + presence = get_state_dict() + except Exception as exc: + logger.warning("Live state build failed: %s", exc) + presence = { + "version": 1, + "liveness": datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ"), + "mood": "calm", + "current_focus": "", + "active_threads": [], + "recent_events": [], + "concerns": [], + } + + return _build_world_state(presence) + + +@router.get("/state") +async def get_world_state() -> JSONResponse: + """Return Timmy's current world state for Workshop bootstrap. + + Reads from ``~/.timmy/presence.json`` if fresh, otherwise + rebuilds live from cognitive state. + """ + return JSONResponse( + content=_get_current_state(), + headers={"Cache-Control": "no-cache, no-store"}, + ) diff --git a/src/dashboard/routes/world/utils.py b/src/dashboard/routes/world/utils.py new file mode 100644 index 00000000..2710cc54 --- /dev/null +++ b/src/dashboard/routes/world/utils.py @@ -0,0 +1,85 @@ +"""Shared utilities for the world route submodules.""" + +import math + +# Agent color mapping — consistent with Matrix visual identity +_AGENT_COLORS: dict[str, str] = { + "timmy": "#FFD700", # Gold + "orchestrator": "#FFD700", # Gold + "perplexity": "#3B82F6", # Blue + "replit": "#F97316", # Orange + "kimi": "#06B6D4", # Cyan + "claude": "#A855F7", # Purple + "researcher": "#10B981", # Emerald + "coder": "#EF4444", # Red + "writer": "#EC4899", # Pink + "memory": "#8B5CF6", # Violet + "experimenter": "#14B8A6", # Teal + "forge": "#EF4444", # Red (coder alias) + "seer": "#10B981", # Emerald (researcher alias) + "quill": "#EC4899", # Pink (writer alias) + "echo": "#8B5CF6", # Violet (memory alias) + "lab": "#14B8A6", # Teal (experimenter alias) +} + +# Agent shape mapping for 3D visualization +_AGENT_SHAPES: dict[str, str] = { + "timmy": "sphere", + "orchestrator": "sphere", + "perplexity": "cube", + "replit": "cylinder", + "kimi": "dodecahedron", + "claude": "octahedron", + "researcher": "icosahedron", + "coder": "cube", + "writer": "cone", + "memory": "torus", + "experimenter": "tetrahedron", + "forge": "cube", + "seer": "icosahedron", + "quill": "cone", + "echo": "torus", + "lab": "tetrahedron", +} + +# Default fallback values +_DEFAULT_COLOR = "#9CA3AF" # Gray +_DEFAULT_SHAPE = "sphere" +_DEFAULT_STATUS = "available" + + +def _get_agent_color(agent_id: str) -> str: + """Get the Matrix color for an agent.""" + return _AGENT_COLORS.get(agent_id.lower(), _DEFAULT_COLOR) + + +def _get_agent_shape(agent_id: str) -> str: + """Get the Matrix shape for an agent.""" + return _AGENT_SHAPES.get(agent_id.lower(), _DEFAULT_SHAPE) + + +def _compute_circular_positions(count: int, radius: float = 3.0) -> list[dict[str, float]]: + """Compute circular positions for agents in the Matrix. + + Agents are arranged in a circle on the XZ plane at y=0. + """ + positions = [] + for i in range(count): + angle = (2 * math.pi * i) / count + x = radius * math.cos(angle) + z = radius * math.sin(angle) + positions.append({"x": round(x, 2), "y": 0.0, "z": round(z, 2)}) + return positions + + +def _get_client_ip(request) -> str: + """Extract client IP from request, respecting X-Forwarded-For header.""" + # Check for forwarded IP (when behind proxy) + forwarded = request.headers.get("X-Forwarded-For") + if forwarded: + # Take the first IP in the chain + return forwarded.split(",")[0].strip() + # Fall back to direct client IP + if request.client: + return request.client.host + return "unknown" diff --git a/src/dashboard/routes/world/websocket.py b/src/dashboard/routes/world/websocket.py new file mode 100644 index 00000000..31e61b4e --- /dev/null +++ b/src/dashboard/routes/world/websocket.py @@ -0,0 +1,160 @@ +"""WebSocket relay for live state changes.""" + +import asyncio +import json +import logging + +from fastapi import APIRouter, WebSocket + +from config import settings + +from .bark import _handle_client_message +from .state import _get_current_state + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/world", tags=["world"]) + +_ws_clients: list[WebSocket] = [] + +_HEARTBEAT_INTERVAL = 15 # seconds — ping to detect dead iPad/Safari connections + + +async def _heartbeat(websocket: WebSocket) -> None: + """Send periodic pings to detect dead connections (iPad resilience). + + Safari suspends background tabs, killing the TCP socket silently. + A 15-second ping ensures we notice within one interval. + + Rescued from stale PR #399. + """ + try: + while True: + await asyncio.sleep(_HEARTBEAT_INTERVAL) + await websocket.send_text(json.dumps({"type": "ping"})) + except Exception: + logger.debug("Heartbeat stopped — connection gone") + + +async def _authenticate_ws(websocket: WebSocket) -> bool: + """Authenticate WebSocket connection using matrix_ws_token. + + Checks for token in query param ?token=. If no query param, + accepts the connection and waits for first message with + {"type": "auth", "token": ""}. + + Returns True if authenticated (or if auth is disabled). + Returns False and closes connection with code 4001 if invalid. + """ + token_setting = settings.matrix_ws_token + + # Auth disabled in dev mode (empty/unset token) + if not token_setting: + return True + + # Check query param first (can validate before accept) + query_token = websocket.query_params.get("token", "") + if query_token: + if query_token == token_setting: + return True + # Invalid token in query param - we need to accept to close properly + await websocket.accept() + await websocket.close(code=4001, reason="Invalid token") + return False + + # No query token - accept and wait for auth message + await websocket.accept() + + # Wait for auth message as first message + try: + raw = await websocket.receive_text() + data = json.loads(raw) + if data.get("type") == "auth" and data.get("token") == token_setting: + return True + # Invalid auth message + await websocket.close(code=4001, reason="Invalid token") + return False + except (json.JSONDecodeError, TypeError): + # Non-JSON first message without valid token + await websocket.close(code=4001, reason="Authentication required") + return False + + +@router.websocket("/ws") +async def world_ws(websocket: WebSocket) -> None: + """Accept a Workshop client and keep it alive for state broadcasts. + + Sends a full ``world_state`` snapshot immediately on connect so the + client never starts from a blank slate. Incoming frames are parsed + as JSON — ``visitor_message`` triggers a bark response. A background + heartbeat ping runs every 15 s to detect dead connections early. + + Authentication: + - If matrix_ws_token is configured, clients must provide it via + ?token= param or in the first message as + {"type": "auth", "token": ""}. + - Invalid token results in close code 4001. + - Valid token receives a connection_ack message. + """ + # Authenticate (may accept connection internally) + is_authed = await _authenticate_ws(websocket) + if not is_authed: + logger.info("World WS connection rejected — invalid token") + return + + # Auth passed - accept if not already accepted + if websocket.client_state.name != "CONNECTED": + await websocket.accept() + + # Send connection_ack if auth was required + if settings.matrix_ws_token: + await websocket.send_text(json.dumps({"type": "connection_ack"})) + + _ws_clients.append(websocket) + logger.info("World WS connected — %d clients", len(_ws_clients)) + + # Send full world-state snapshot so client bootstraps instantly + try: + snapshot = _get_current_state() + await websocket.send_text(json.dumps({"type": "world_state", **snapshot})) + except Exception as exc: + logger.warning("Failed to send WS snapshot: %s", exc) + + ping_task = asyncio.create_task(_heartbeat(websocket)) + try: + while True: + raw = await websocket.receive_text() + await _handle_client_message(raw) + except Exception: + logger.debug("WebSocket receive loop ended") + finally: + ping_task.cancel() + if websocket in _ws_clients: + _ws_clients.remove(websocket) + logger.info("World WS disconnected — %d clients", len(_ws_clients)) + + +async def _broadcast(message: str) -> None: + """Send *message* to every connected Workshop client, pruning dead ones.""" + dead: list[WebSocket] = [] + for ws in _ws_clients: + try: + await ws.send_text(message) + except Exception: + logger.debug("Pruning dead WebSocket client") + dead.append(ws) + for ws in dead: + if ws in _ws_clients: + _ws_clients.remove(ws) + + +async def broadcast_world_state(presence: dict) -> None: + """Broadcast a ``timmy_state`` message to all connected Workshop clients. + + Called by :class:`~timmy.workshop_state.WorkshopHeartbeat` via its + ``on_change`` callback. + """ + from .state import _build_world_state + + state = _build_world_state(presence) + await _broadcast(json.dumps({"type": "timmy_state", **state["timmyState"]})) -- 2.43.0