[loop-cycle-2] refactor: split world.py into focused submodules (#1360) #1449

Merged
Timmy merged 1 commits from refactor/split-world-routes into main 2026-03-24 18:55:14 +00:00
8 changed files with 1130 additions and 1065 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,124 @@
"""Workshop world state API and WebSocket relay.
Serves Timmy's current presence state to the Workshop 3D renderer.
The primary consumer is the browser on first load — before any
WebSocket events arrive, the client needs a full state snapshot.
The ``/ws/world`` endpoint streams ``timmy_state`` messages whenever
the heartbeat detects a state change. It also accepts ``visitor_message``
frames from the 3D client and responds with ``timmy_speech`` barks.
Source of truth: ``~/.timmy/presence.json`` written by
:class:`~timmy.workshop_state.WorkshopHeartbeat`.
Falls back to a live ``get_state_dict()`` call if the file is stale
or missing.
"""
from fastapi import APIRouter
# Import submodule routers
from .bark import matrix_router as _bark_matrix_router
from .matrix import matrix_router as _matrix_matrix_router
from .state import router as _state_router
from .websocket import router as _ws_router
# ---------------------------------------------------------------------------
# Combine sub-routers into the two top-level routers that app.py expects
# ---------------------------------------------------------------------------
router = APIRouter(prefix="/api/world", tags=["world"])
# Include state routes (GET /state)
for route in _state_router.routes:
router.routes.append(route)
# Include websocket routes (WS /ws)
for route in _ws_router.routes:
router.routes.append(route)
# Combine matrix sub-routers
matrix_router = APIRouter(prefix="/api/matrix", tags=["matrix"])
for route in _bark_matrix_router.routes:
matrix_router.routes.append(route)
for route in _matrix_matrix_router.routes:
matrix_router.routes.append(route)
# ---------------------------------------------------------------------------
# Re-export public API for backward compatibility
# ---------------------------------------------------------------------------
# Used by src/dashboard/app.py
from .websocket import broadcast_world_state # noqa: E402, F401
# Used by src/infrastructure/presence.py
from .websocket import _ws_clients # noqa: E402, F401
# Used by tests
from .bark import ( # noqa: E402, F401
BarkRequest,
_BARK_RATE_LIMIT_SECONDS,
_GROUND_TTL,
_MAX_EXCHANGES,
_bark_and_broadcast,
_bark_last_request,
_conversation,
_generate_bark,
_handle_client_message,
_log_bark_failure,
_refresh_ground,
post_matrix_bark,
reset_conversation_ground,
)
from .commitments import ( # noqa: E402, F401
_COMMITMENT_PATTERNS,
_MAX_COMMITMENTS,
_REMIND_AFTER,
_build_commitment_context,
_commitments,
_extract_commitments,
_record_commitments,
_tick_commitments,
close_commitment,
get_commitments,
reset_commitments,
)
from .matrix import ( # noqa: E402, F401
_DEFAULT_MATRIX_CONFIG,
_build_matrix_agents_response,
_build_matrix_health_response,
_build_matrix_memory_response,
_build_matrix_thoughts_response,
_check_capability_bark,
_check_capability_familiar,
_check_capability_lightning,
_check_capability_memory,
_check_capability_thinking,
_load_matrix_config,
_memory_search_last_request,
get_matrix_agents,
get_matrix_config,
get_matrix_health,
get_matrix_memory_search,
get_matrix_thoughts,
)
from .state import ( # noqa: E402, F401
_STALE_THRESHOLD,
_build_world_state,
_get_current_state,
_read_presence_file,
get_world_state,
)
from .utils import ( # noqa: E402, F401
_compute_circular_positions,
_get_agent_color,
_get_agent_shape,
_get_client_ip,
)
from .websocket import ( # noqa: E402, F401
_authenticate_ws,
_broadcast,
_heartbeat,
world_ws,
)

View File

@@ -0,0 +1,212 @@
"""Bark/conversation — visitor chat engine and Matrix bark endpoint."""
import asyncio
import json
import logging
import time
from collections import deque
from fastapi import APIRouter
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from infrastructure.presence import produce_bark
from .commitments import (
_build_commitment_context,
_record_commitments,
_tick_commitments,
)
logger = logging.getLogger(__name__)
matrix_router = APIRouter(prefix="/api/matrix", tags=["matrix"])
# Rate limiting: 1 request per 3 seconds per visitor_id
_BARK_RATE_LIMIT_SECONDS = 3
_bark_last_request: dict[str, float] = {}
# Recent conversation buffer — kept in memory for the Workshop overlay.
# Stores the last _MAX_EXCHANGES (visitor_text, timmy_text) pairs.
_MAX_EXCHANGES = 3
_conversation: deque[dict] = deque(maxlen=_MAX_EXCHANGES)
_WORKSHOP_SESSION_ID = "workshop"
# Conversation grounding — anchor to opening topic so Timmy doesn't drift.
_ground_topic: str | None = None
_ground_set_at: float = 0.0
_GROUND_TTL = 300 # seconds of inactivity before the anchor expires
class BarkRequest(BaseModel):
"""Request body for POST /api/matrix/bark."""
text: str
visitor_id: str
@matrix_router.post("/bark")
async def post_matrix_bark(request: BarkRequest) -> JSONResponse:
"""Generate a bark response for a visitor message.
HTTP fallback for when WebSocket isn't available. The Matrix frontend
can POST a message and get Timmy's bark response back as JSON.
Rate limited to 1 request per 3 seconds per visitor_id.
Request body:
- text: The visitor's message text
- visitor_id: Unique identifier for the visitor (used for rate limiting)
Returns:
- 200: Bark message in produce_bark() format
- 429: Rate limit exceeded (try again later)
- 422: Invalid request (missing/invalid fields)
"""
# Validate inputs
text = request.text.strip() if request.text else ""
visitor_id = request.visitor_id.strip() if request.visitor_id else ""
if not text:
return JSONResponse(
status_code=422,
content={"error": "text is required"},
)
if not visitor_id:
return JSONResponse(
status_code=422,
content={"error": "visitor_id is required"},
)
# Rate limiting check
now = time.time()
last_request = _bark_last_request.get(visitor_id, 0)
time_since_last = now - last_request
if time_since_last < _BARK_RATE_LIMIT_SECONDS:
retry_after = _BARK_RATE_LIMIT_SECONDS - time_since_last
return JSONResponse(
status_code=429,
content={"error": "Rate limit exceeded. Try again later."},
headers={"Retry-After": str(int(retry_after) + 1)},
)
# Record this request
_bark_last_request[visitor_id] = now
# Generate bark response
try:
reply = await _generate_bark(text)
except Exception as exc:
logger.warning("Bark generation failed: %s", exc)
reply = "Hmm, my thoughts are a bit tangled right now."
# Build bark response using produce_bark format
bark = produce_bark(agent_id="timmy", text=reply, style="speech")
return JSONResponse(
content=bark,
headers={"Cache-Control": "no-cache, no-store"},
)
def reset_conversation_ground() -> None:
"""Clear the conversation grounding anchor (e.g. after inactivity)."""
global _ground_topic, _ground_set_at
_ground_topic = None
_ground_set_at = 0.0
def _refresh_ground(visitor_text: str) -> None:
"""Set or refresh the conversation grounding anchor.
The first visitor message in a session (or after the TTL expires)
becomes the anchor topic. Subsequent messages are grounded against it.
"""
global _ground_topic, _ground_set_at
now = time.time()
if _ground_topic is None or (now - _ground_set_at) > _GROUND_TTL:
_ground_topic = visitor_text[:120]
logger.debug("Ground topic set: %s", _ground_topic)
_ground_set_at = now
async def _bark_and_broadcast(visitor_text: str) -> None:
"""Generate a bark response and broadcast it to all Workshop clients."""
from .websocket import _broadcast
await _broadcast(json.dumps({"type": "timmy_thinking"}))
# Notify Pip that a visitor spoke
try:
from timmy.familiar import pip_familiar
pip_familiar.on_event("visitor_spoke")
except Exception:
logger.debug("Pip familiar notification failed (optional)")
_refresh_ground(visitor_text)
_tick_commitments()
reply = await _generate_bark(visitor_text)
_record_commitments(reply)
_conversation.append({"visitor": visitor_text, "timmy": reply})
await _broadcast(
json.dumps(
{
"type": "timmy_speech",
"text": reply,
"recentExchanges": list(_conversation),
}
)
)
async def _generate_bark(visitor_text: str) -> str:
"""Generate a short in-character bark response.
Uses the existing Timmy session with a dedicated workshop session ID.
When a grounding anchor exists, the opening topic is prepended so the
model stays on-topic across long sessions.
Gracefully degrades to a canned response if inference fails.
"""
try:
from timmy import session as _session
grounded = visitor_text
commitment_ctx = _build_commitment_context()
if commitment_ctx:
grounded = f"{commitment_ctx}\n{grounded}"
if _ground_topic and visitor_text != _ground_topic:
grounded = f"[Workshop conversation topic: {_ground_topic}]\n{grounded}"
response = await _session.chat(grounded, session_id=_WORKSHOP_SESSION_ID)
return response
except Exception as exc:
logger.warning("Bark generation failed: %s", exc)
return "Hmm, my thoughts are a bit tangled right now."
def _log_bark_failure(task: asyncio.Task) -> None:
"""Log unhandled exceptions from fire-and-forget bark tasks."""
if task.cancelled():
return
exc = task.exception()
if exc is not None:
logger.error("Bark task failed: %s", exc)
async def _handle_client_message(raw: str) -> None:
"""Dispatch an incoming WebSocket frame from the Workshop client."""
try:
data = json.loads(raw)
except (json.JSONDecodeError, TypeError):
return # ignore non-JSON keep-alive pings
if data.get("type") == "visitor_message":
text = (data.get("text") or "").strip()
if text:
task = asyncio.create_task(_bark_and_broadcast(text))
task.add_done_callback(_log_bark_failure)

View File

@@ -0,0 +1,77 @@
"""Conversation grounding — commitment tracking (rescued from PR #408)."""
import re
import time
# Patterns that indicate Timmy is committing to an action.
_COMMITMENT_PATTERNS: list[re.Pattern[str]] = [
re.compile(r"I'll (.+?)(?:\.|!|\?|$)", re.IGNORECASE),
re.compile(r"I will (.+?)(?:\.|!|\?|$)", re.IGNORECASE),
re.compile(r"[Ll]et me (.+?)(?:\.|!|\?|$)", re.IGNORECASE),
]
# After this many messages without follow-up, surface open commitments.
_REMIND_AFTER = 5
_MAX_COMMITMENTS = 10
# In-memory list of open commitments.
# Each entry: {"text": str, "created_at": float, "messages_since": int}
_commitments: list[dict] = []
def _extract_commitments(text: str) -> list[str]:
"""Pull commitment phrases from Timmy's reply text."""
found: list[str] = []
for pattern in _COMMITMENT_PATTERNS:
for match in pattern.finditer(text):
phrase = match.group(1).strip()
if len(phrase) > 5: # skip trivially short matches
found.append(phrase[:120])
return found
def _record_commitments(reply: str) -> None:
"""Scan a Timmy reply for commitments and store them."""
for phrase in _extract_commitments(reply):
# Avoid near-duplicate commitments
if any(c["text"] == phrase for c in _commitments):
continue
_commitments.append({"text": phrase, "created_at": time.time(), "messages_since": 0})
if len(_commitments) > _MAX_COMMITMENTS:
_commitments.pop(0)
def _tick_commitments() -> None:
"""Increment messages_since for every open commitment."""
for c in _commitments:
c["messages_since"] += 1
def _build_commitment_context() -> str:
"""Return a grounding note if any commitments are overdue for follow-up."""
overdue = [c for c in _commitments if c["messages_since"] >= _REMIND_AFTER]
if not overdue:
return ""
lines = [f"- {c['text']}" for c in overdue]
return (
"[Open commitments Timmy made earlier — "
"weave awareness naturally, don't list robotically]\n" + "\n".join(lines)
)
def close_commitment(index: int) -> bool:
"""Remove a commitment by index. Returns True if removed."""
if 0 <= index < len(_commitments):
_commitments.pop(index)
return True
return False
def get_commitments() -> list[dict]:
"""Return a copy of open commitments (for testing / API)."""
return list(_commitments)
def reset_commitments() -> None:
"""Clear all commitments (for testing / session reset)."""
_commitments.clear()

View File

@@ -0,0 +1,397 @@
"""Matrix API endpoints — config, agents, health, thoughts, memory search."""
import logging
import time
from pathlib import Path
from typing import Any
import yaml
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse
from config import settings
from timmy.memory_system import search_memories
from .utils import (
_DEFAULT_STATUS,
_compute_circular_positions,
_get_agent_color,
_get_agent_shape,
_get_client_ip,
)
logger = logging.getLogger(__name__)
matrix_router = APIRouter(prefix="/api/matrix", tags=["matrix"])
# Default Matrix configuration (fallback when matrix.yaml is missing/corrupt)
_DEFAULT_MATRIX_CONFIG: dict[str, Any] = {
"lighting": {
"ambient_color": "#1a1a2e",
"ambient_intensity": 0.4,
"point_lights": [
{"color": "#FFD700", "intensity": 1.2, "position": {"x": 0, "y": 5, "z": 0}},
{"color": "#3B82F6", "intensity": 0.8, "position": {"x": -5, "y": 3, "z": -5}},
{"color": "#A855F7", "intensity": 0.6, "position": {"x": 5, "y": 3, "z": 5}},
],
},
"environment": {
"rain_enabled": False,
"starfield_enabled": True,
"fog_color": "#0f0f23",
"fog_density": 0.02,
},
"features": {
"chat_enabled": True,
"visitor_avatars": True,
"pip_familiar": True,
"workshop_portal": True,
},
}
def _load_matrix_config() -> dict[str, Any]:
"""Load Matrix world configuration from matrix.yaml with fallback to defaults.
Returns a dict with sections: lighting, environment, features.
If the config file is missing or invalid, returns sensible defaults.
"""
try:
config_path = Path(settings.repo_root) / "config" / "matrix.yaml"
if not config_path.exists():
logger.debug("matrix.yaml not found, using default config")
return _DEFAULT_MATRIX_CONFIG.copy()
raw = config_path.read_text()
config = yaml.safe_load(raw)
if not isinstance(config, dict):
logger.warning("matrix.yaml invalid format, using defaults")
return _DEFAULT_MATRIX_CONFIG.copy()
# Merge with defaults to ensure all required fields exist
result: dict[str, Any] = {
"lighting": {
**_DEFAULT_MATRIX_CONFIG["lighting"],
**config.get("lighting", {}),
},
"environment": {
**_DEFAULT_MATRIX_CONFIG["environment"],
**config.get("environment", {}),
},
"features": {
**_DEFAULT_MATRIX_CONFIG["features"],
**config.get("features", {}),
},
}
# Ensure point_lights is a list
if "point_lights" in config.get("lighting", {}):
result["lighting"]["point_lights"] = config["lighting"]["point_lights"]
else:
result["lighting"]["point_lights"] = _DEFAULT_MATRIX_CONFIG["lighting"]["point_lights"]
return result
except Exception as exc:
logger.warning("Failed to load matrix config: %s, using defaults", exc)
return _DEFAULT_MATRIX_CONFIG.copy()
@matrix_router.get("/config")
async def get_matrix_config() -> JSONResponse:
"""Return Matrix world configuration.
Serves lighting presets, environment settings, and feature flags
to the Matrix frontend so it can be config-driven rather than
hardcoded. Reads from config/matrix.yaml with sensible defaults.
"""
config = _load_matrix_config()
return JSONResponse(
content=config,
headers={"Cache-Control": "no-cache, no-store"},
)
def _build_matrix_agents_response() -> list[dict[str, Any]]:
"""Build the Matrix agent registry response.
Reads from agents.yaml and returns agents with Matrix-compatible
formatting including colors, shapes, and positions.
"""
try:
from timmy.agents.loader import list_agents
agents = list_agents()
if not agents:
return []
positions = _compute_circular_positions(len(agents))
result = []
for i, agent in enumerate(agents):
agent_id = agent.get("id", "")
result.append(
{
"id": agent_id,
"display_name": agent.get("name", agent_id.title()),
"role": agent.get("role", "general"),
"color": _get_agent_color(agent_id),
"position": positions[i],
"shape": _get_agent_shape(agent_id),
"status": agent.get("status", _DEFAULT_STATUS),
}
)
return result
except Exception as exc:
logger.warning("Failed to load agents for Matrix: %s", exc)
return []
@matrix_router.get("/agents")
async def get_matrix_agents() -> JSONResponse:
"""Return the agent registry for Matrix visualization.
Serves agents from agents.yaml with Matrix-compatible formatting.
Returns 200 with empty list if no agents configured.
"""
agents = _build_matrix_agents_response()
return JSONResponse(
content=agents,
headers={"Cache-Control": "no-cache, no-store"},
)
_MAX_THOUGHT_LIMIT = 50 # Maximum thoughts allowed per request
_DEFAULT_THOUGHT_LIMIT = 10 # Default number of thoughts to return
_MAX_THOUGHT_TEXT_LEN = 500 # Max characters for thought text
def _build_matrix_thoughts_response(limit: int = _DEFAULT_THOUGHT_LIMIT) -> list[dict[str, Any]]:
"""Build the Matrix thoughts response from the thinking engine.
Returns recent thoughts formatted for Matrix display:
- id: thought UUID
- text: thought content (truncated to 500 chars)
- created_at: ISO-8601 timestamp
- chain_id: parent thought ID (or null if root thought)
Returns empty list if thinking engine is disabled or fails.
"""
try:
from timmy.thinking import thinking_engine
thoughts = thinking_engine.get_recent_thoughts(limit=limit)
return [
{
"id": t.id,
"text": t.content[:_MAX_THOUGHT_TEXT_LEN],
"created_at": t.created_at,
"chain_id": t.parent_id,
}
for t in thoughts
]
except Exception as exc:
logger.warning("Failed to load thoughts for Matrix: %s", exc)
return []
@matrix_router.get("/thoughts")
async def get_matrix_thoughts(limit: int = _DEFAULT_THOUGHT_LIMIT) -> JSONResponse:
"""Return Timmy's recent thoughts formatted for Matrix display.
Query params:
- limit: Number of thoughts to return (default 10, max 50)
Returns empty array if thinking engine is disabled or fails.
"""
# Clamp limit to valid range
if limit < 1:
limit = 1
elif limit > _MAX_THOUGHT_LIMIT:
limit = _MAX_THOUGHT_LIMIT
thoughts = _build_matrix_thoughts_response(limit=limit)
return JSONResponse(
content=thoughts,
headers={"Cache-Control": "no-cache, no-store"},
)
# Health check cache (5-second TTL for capability checks)
_health_cache: dict | None = None
_health_cache_ts: float = 0.0
_HEALTH_CACHE_TTL = 5.0
def _check_capability_thinking() -> bool:
"""Check if thinking engine is available."""
try:
from timmy.thinking import thinking_engine
# Check if the engine has been initialized (has a db path)
return hasattr(thinking_engine, "_db") and thinking_engine._db is not None
except Exception:
return False
def _check_capability_memory() -> bool:
"""Check if memory system is available."""
try:
from timmy.memory_system import HOT_MEMORY_PATH
return HOT_MEMORY_PATH.exists()
except Exception:
return False
def _check_capability_bark() -> bool:
"""Check if bark production is available."""
try:
from infrastructure.presence import produce_bark
return callable(produce_bark)
except Exception:
return False
def _check_capability_familiar() -> bool:
"""Check if familiar (Pip) is available."""
try:
from timmy.familiar import pip_familiar
return pip_familiar is not None
except Exception:
return False
def _check_capability_lightning() -> bool:
"""Check if Lightning payments are available."""
# Lightning is currently disabled per health.py
# Returns False until properly re-implemented
return False
def _build_matrix_health_response() -> dict[str, Any]:
"""Build the Matrix health response with capability checks.
Performs lightweight checks (<100ms total) to determine which features
are available. Returns 200 even if some capabilities are degraded.
"""
capabilities = {
"thinking": _check_capability_thinking(),
"memory": _check_capability_memory(),
"bark": _check_capability_bark(),
"familiar": _check_capability_familiar(),
"lightning": _check_capability_lightning(),
}
# Status is ok if core capabilities (thinking, memory, bark) are available
core_caps = ["thinking", "memory", "bark"]
core_available = all(capabilities[c] for c in core_caps)
status = "ok" if core_available else "degraded"
return {
"status": status,
"version": "1.0.0",
"capabilities": capabilities,
}
@matrix_router.get("/health")
async def get_matrix_health() -> JSONResponse:
"""Return health status and capability availability for Matrix frontend.
Returns 200 even if some capabilities are degraded.
"""
response = _build_matrix_health_response()
status_code = 200 # Always 200, even if degraded
return JSONResponse(
content=response,
status_code=status_code,
headers={"Cache-Control": "no-cache, no-store"},
)
# Rate limiting: 1 search per 5 seconds per IP
_MEMORY_SEARCH_RATE_LIMIT_SECONDS = 5
_memory_search_last_request: dict[str, float] = {}
_MAX_MEMORY_RESULTS = 5
_MAX_MEMORY_TEXT_LENGTH = 200
def _build_matrix_memory_response(
memories: list,
) -> list[dict[str, Any]]:
"""Build the Matrix memory search response.
Formats memory entries for Matrix display:
- text: truncated to 200 characters
- relevance: 0-1 score from relevance_score
- created_at: ISO-8601 timestamp
- context_type: the memory type
Results are capped at _MAX_MEMORY_RESULTS.
"""
results = []
for mem in memories[:_MAX_MEMORY_RESULTS]:
text = mem.content
if len(text) > _MAX_MEMORY_TEXT_LENGTH:
text = text[:_MAX_MEMORY_TEXT_LENGTH] + "..."
results.append(
{
"text": text,
"relevance": round(mem.relevance_score or 0.0, 4),
"created_at": mem.timestamp,
"context_type": mem.context_type,
}
)
return results
@matrix_router.get("/memory/search")
async def get_matrix_memory_search(request: Request, q: str | None = None) -> JSONResponse:
"""Search Timmy's memory for relevant snippets.
Rate limited to 1 search per 5 seconds per IP.
Returns 200 with results, 400 if missing query, or 429 if rate limited.
"""
# Validate query parameter
query = q.strip() if q else ""
if not query:
return JSONResponse(
status_code=400,
content={"error": "Query parameter 'q' is required"},
)
# Rate limiting check by IP
client_ip = _get_client_ip(request)
now = time.time()
last_request = _memory_search_last_request.get(client_ip, 0)
time_since_last = now - last_request
if time_since_last < _MEMORY_SEARCH_RATE_LIMIT_SECONDS:
retry_after = _MEMORY_SEARCH_RATE_LIMIT_SECONDS - time_since_last
return JSONResponse(
status_code=429,
content={"error": "Rate limit exceeded. Try again later."},
headers={"Retry-After": str(int(retry_after) + 1)},
)
# Record this request
_memory_search_last_request[client_ip] = now
# Search memories
try:
memories = search_memories(query, limit=_MAX_MEMORY_RESULTS)
results = _build_matrix_memory_response(memories)
except Exception as exc:
logger.warning("Memory search failed: %s", exc)
results = []
return JSONResponse(
content=results,
headers={"Cache-Control": "no-cache, no-store"},
)

View File

@@ -0,0 +1,75 @@
"""World state functions — presence file reading and state API."""
import json
import logging
import time
from datetime import UTC, datetime
from fastapi import APIRouter
from fastapi.responses import JSONResponse
from infrastructure.presence import serialize_presence
from timmy.workshop_state import PRESENCE_FILE
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/world", tags=["world"])
_STALE_THRESHOLD = 90 # seconds — file older than this triggers live rebuild
def _read_presence_file() -> dict | None:
"""Read presence.json if it exists and is fresh enough."""
try:
if not PRESENCE_FILE.exists():
return None
age = time.time() - PRESENCE_FILE.stat().st_mtime
if age > _STALE_THRESHOLD:
logger.debug("presence.json is stale (%.0fs old)", age)
return None
return json.loads(PRESENCE_FILE.read_text())
except (OSError, json.JSONDecodeError) as exc:
logger.warning("Failed to read presence.json: %s", exc)
return None
def _build_world_state(presence: dict) -> dict:
"""Transform presence dict into the world/state API response."""
return serialize_presence(presence)
def _get_current_state() -> dict:
"""Build the current world-state dict from best available source."""
presence = _read_presence_file()
if presence is None:
try:
from timmy.workshop_state import get_state_dict
presence = get_state_dict()
except Exception as exc:
logger.warning("Live state build failed: %s", exc)
presence = {
"version": 1,
"liveness": datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ"),
"mood": "calm",
"current_focus": "",
"active_threads": [],
"recent_events": [],
"concerns": [],
}
return _build_world_state(presence)
@router.get("/state")
async def get_world_state() -> JSONResponse:
"""Return Timmy's current world state for Workshop bootstrap.
Reads from ``~/.timmy/presence.json`` if fresh, otherwise
rebuilds live from cognitive state.
"""
return JSONResponse(
content=_get_current_state(),
headers={"Cache-Control": "no-cache, no-store"},
)

View File

@@ -0,0 +1,85 @@
"""Shared utilities for the world route submodules."""
import math
# Agent color mapping — consistent with Matrix visual identity
_AGENT_COLORS: dict[str, str] = {
"timmy": "#FFD700", # Gold
"orchestrator": "#FFD700", # Gold
"perplexity": "#3B82F6", # Blue
"replit": "#F97316", # Orange
"kimi": "#06B6D4", # Cyan
"claude": "#A855F7", # Purple
"researcher": "#10B981", # Emerald
"coder": "#EF4444", # Red
"writer": "#EC4899", # Pink
"memory": "#8B5CF6", # Violet
"experimenter": "#14B8A6", # Teal
"forge": "#EF4444", # Red (coder alias)
"seer": "#10B981", # Emerald (researcher alias)
"quill": "#EC4899", # Pink (writer alias)
"echo": "#8B5CF6", # Violet (memory alias)
"lab": "#14B8A6", # Teal (experimenter alias)
}
# Agent shape mapping for 3D visualization
_AGENT_SHAPES: dict[str, str] = {
"timmy": "sphere",
"orchestrator": "sphere",
"perplexity": "cube",
"replit": "cylinder",
"kimi": "dodecahedron",
"claude": "octahedron",
"researcher": "icosahedron",
"coder": "cube",
"writer": "cone",
"memory": "torus",
"experimenter": "tetrahedron",
"forge": "cube",
"seer": "icosahedron",
"quill": "cone",
"echo": "torus",
"lab": "tetrahedron",
}
# Default fallback values
_DEFAULT_COLOR = "#9CA3AF" # Gray
_DEFAULT_SHAPE = "sphere"
_DEFAULT_STATUS = "available"
def _get_agent_color(agent_id: str) -> str:
"""Get the Matrix color for an agent."""
return _AGENT_COLORS.get(agent_id.lower(), _DEFAULT_COLOR)
def _get_agent_shape(agent_id: str) -> str:
"""Get the Matrix shape for an agent."""
return _AGENT_SHAPES.get(agent_id.lower(), _DEFAULT_SHAPE)
def _compute_circular_positions(count: int, radius: float = 3.0) -> list[dict[str, float]]:
"""Compute circular positions for agents in the Matrix.
Agents are arranged in a circle on the XZ plane at y=0.
"""
positions = []
for i in range(count):
angle = (2 * math.pi * i) / count
x = radius * math.cos(angle)
z = radius * math.sin(angle)
positions.append({"x": round(x, 2), "y": 0.0, "z": round(z, 2)})
return positions
def _get_client_ip(request) -> str:
"""Extract client IP from request, respecting X-Forwarded-For header."""
# Check for forwarded IP (when behind proxy)
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
# Take the first IP in the chain
return forwarded.split(",")[0].strip()
# Fall back to direct client IP
if request.client:
return request.client.host
return "unknown"

View File

@@ -0,0 +1,160 @@
"""WebSocket relay for live state changes."""
import asyncio
import json
import logging
from fastapi import APIRouter, WebSocket
from config import settings
from .bark import _handle_client_message
from .state import _get_current_state
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/world", tags=["world"])
_ws_clients: list[WebSocket] = []
_HEARTBEAT_INTERVAL = 15 # seconds — ping to detect dead iPad/Safari connections
async def _heartbeat(websocket: WebSocket) -> None:
"""Send periodic pings to detect dead connections (iPad resilience).
Safari suspends background tabs, killing the TCP socket silently.
A 15-second ping ensures we notice within one interval.
Rescued from stale PR #399.
"""
try:
while True:
await asyncio.sleep(_HEARTBEAT_INTERVAL)
await websocket.send_text(json.dumps({"type": "ping"}))
except Exception:
logger.debug("Heartbeat stopped — connection gone")
async def _authenticate_ws(websocket: WebSocket) -> bool:
"""Authenticate WebSocket connection using matrix_ws_token.
Checks for token in query param ?token=<token>. If no query param,
accepts the connection and waits for first message with
{"type": "auth", "token": "<token>"}.
Returns True if authenticated (or if auth is disabled).
Returns False and closes connection with code 4001 if invalid.
"""
token_setting = settings.matrix_ws_token
# Auth disabled in dev mode (empty/unset token)
if not token_setting:
return True
# Check query param first (can validate before accept)
query_token = websocket.query_params.get("token", "")
if query_token:
if query_token == token_setting:
return True
# Invalid token in query param - we need to accept to close properly
await websocket.accept()
await websocket.close(code=4001, reason="Invalid token")
return False
# No query token - accept and wait for auth message
await websocket.accept()
# Wait for auth message as first message
try:
raw = await websocket.receive_text()
data = json.loads(raw)
if data.get("type") == "auth" and data.get("token") == token_setting:
return True
# Invalid auth message
await websocket.close(code=4001, reason="Invalid token")
return False
except (json.JSONDecodeError, TypeError):
# Non-JSON first message without valid token
await websocket.close(code=4001, reason="Authentication required")
return False
@router.websocket("/ws")
async def world_ws(websocket: WebSocket) -> None:
"""Accept a Workshop client and keep it alive for state broadcasts.
Sends a full ``world_state`` snapshot immediately on connect so the
client never starts from a blank slate. Incoming frames are parsed
as JSON — ``visitor_message`` triggers a bark response. A background
heartbeat ping runs every 15 s to detect dead connections early.
Authentication:
- If matrix_ws_token is configured, clients must provide it via
?token=<token> param or in the first message as
{"type": "auth", "token": "<token>"}.
- Invalid token results in close code 4001.
- Valid token receives a connection_ack message.
"""
# Authenticate (may accept connection internally)
is_authed = await _authenticate_ws(websocket)
if not is_authed:
logger.info("World WS connection rejected — invalid token")
return
# Auth passed - accept if not already accepted
if websocket.client_state.name != "CONNECTED":
await websocket.accept()
# Send connection_ack if auth was required
if settings.matrix_ws_token:
await websocket.send_text(json.dumps({"type": "connection_ack"}))
_ws_clients.append(websocket)
logger.info("World WS connected — %d clients", len(_ws_clients))
# Send full world-state snapshot so client bootstraps instantly
try:
snapshot = _get_current_state()
await websocket.send_text(json.dumps({"type": "world_state", **snapshot}))
except Exception as exc:
logger.warning("Failed to send WS snapshot: %s", exc)
ping_task = asyncio.create_task(_heartbeat(websocket))
try:
while True:
raw = await websocket.receive_text()
await _handle_client_message(raw)
except Exception:
logger.debug("WebSocket receive loop ended")
finally:
ping_task.cancel()
if websocket in _ws_clients:
_ws_clients.remove(websocket)
logger.info("World WS disconnected — %d clients", len(_ws_clients))
async def _broadcast(message: str) -> None:
"""Send *message* to every connected Workshop client, pruning dead ones."""
dead: list[WebSocket] = []
for ws in _ws_clients:
try:
await ws.send_text(message)
except Exception:
logger.debug("Pruning dead WebSocket client")
dead.append(ws)
for ws in dead:
if ws in _ws_clients:
_ws_clients.remove(ws)
async def broadcast_world_state(presence: dict) -> None:
"""Broadcast a ``timmy_state`` message to all connected Workshop clients.
Called by :class:`~timmy.workshop_state.WorkshopHeartbeat` via its
``on_change`` callback.
"""
from .state import _build_world_state
state = _build_world_state(presence)
await _broadcast(json.dumps({"type": "timmy_state", **state["timmyState"]}))