[loop-cycle-2] refactor: split world.py into focused submodules (#1360) #1449
File diff suppressed because it is too large
Load Diff
124
src/dashboard/routes/world/__init__.py
Normal file
124
src/dashboard/routes/world/__init__.py
Normal file
@@ -0,0 +1,124 @@
|
||||
"""Workshop world state API and WebSocket relay.
|
||||
|
||||
Serves Timmy's current presence state to the Workshop 3D renderer.
|
||||
The primary consumer is the browser on first load — before any
|
||||
WebSocket events arrive, the client needs a full state snapshot.
|
||||
|
||||
The ``/ws/world`` endpoint streams ``timmy_state`` messages whenever
|
||||
the heartbeat detects a state change. It also accepts ``visitor_message``
|
||||
frames from the 3D client and responds with ``timmy_speech`` barks.
|
||||
|
||||
Source of truth: ``~/.timmy/presence.json`` written by
|
||||
:class:`~timmy.workshop_state.WorkshopHeartbeat`.
|
||||
Falls back to a live ``get_state_dict()`` call if the file is stale
|
||||
or missing.
|
||||
"""
|
||||
|
||||
from fastapi import APIRouter
|
||||
|
||||
# Import submodule routers
|
||||
from .bark import matrix_router as _bark_matrix_router
|
||||
from .matrix import matrix_router as _matrix_matrix_router
|
||||
from .state import router as _state_router
|
||||
from .websocket import router as _ws_router
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Combine sub-routers into the two top-level routers that app.py expects
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
router = APIRouter(prefix="/api/world", tags=["world"])
|
||||
|
||||
# Include state routes (GET /state)
|
||||
for route in _state_router.routes:
|
||||
router.routes.append(route)
|
||||
|
||||
# Include websocket routes (WS /ws)
|
||||
for route in _ws_router.routes:
|
||||
router.routes.append(route)
|
||||
|
||||
# Combine matrix sub-routers
|
||||
matrix_router = APIRouter(prefix="/api/matrix", tags=["matrix"])
|
||||
|
||||
for route in _bark_matrix_router.routes:
|
||||
matrix_router.routes.append(route)
|
||||
|
||||
for route in _matrix_matrix_router.routes:
|
||||
matrix_router.routes.append(route)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Re-export public API for backward compatibility
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Used by src/dashboard/app.py
|
||||
from .websocket import broadcast_world_state # noqa: E402, F401
|
||||
|
||||
# Used by src/infrastructure/presence.py
|
||||
from .websocket import _ws_clients # noqa: E402, F401
|
||||
|
||||
# Used by tests
|
||||
from .bark import ( # noqa: E402, F401
|
||||
BarkRequest,
|
||||
_BARK_RATE_LIMIT_SECONDS,
|
||||
_GROUND_TTL,
|
||||
_MAX_EXCHANGES,
|
||||
_bark_and_broadcast,
|
||||
_bark_last_request,
|
||||
_conversation,
|
||||
_generate_bark,
|
||||
_handle_client_message,
|
||||
_log_bark_failure,
|
||||
_refresh_ground,
|
||||
post_matrix_bark,
|
||||
reset_conversation_ground,
|
||||
)
|
||||
from .commitments import ( # noqa: E402, F401
|
||||
_COMMITMENT_PATTERNS,
|
||||
_MAX_COMMITMENTS,
|
||||
_REMIND_AFTER,
|
||||
_build_commitment_context,
|
||||
_commitments,
|
||||
_extract_commitments,
|
||||
_record_commitments,
|
||||
_tick_commitments,
|
||||
close_commitment,
|
||||
get_commitments,
|
||||
reset_commitments,
|
||||
)
|
||||
from .matrix import ( # noqa: E402, F401
|
||||
_DEFAULT_MATRIX_CONFIG,
|
||||
_build_matrix_agents_response,
|
||||
_build_matrix_health_response,
|
||||
_build_matrix_memory_response,
|
||||
_build_matrix_thoughts_response,
|
||||
_check_capability_bark,
|
||||
_check_capability_familiar,
|
||||
_check_capability_lightning,
|
||||
_check_capability_memory,
|
||||
_check_capability_thinking,
|
||||
_load_matrix_config,
|
||||
_memory_search_last_request,
|
||||
get_matrix_agents,
|
||||
get_matrix_config,
|
||||
get_matrix_health,
|
||||
get_matrix_memory_search,
|
||||
get_matrix_thoughts,
|
||||
)
|
||||
from .state import ( # noqa: E402, F401
|
||||
_STALE_THRESHOLD,
|
||||
_build_world_state,
|
||||
_get_current_state,
|
||||
_read_presence_file,
|
||||
get_world_state,
|
||||
)
|
||||
from .utils import ( # noqa: E402, F401
|
||||
_compute_circular_positions,
|
||||
_get_agent_color,
|
||||
_get_agent_shape,
|
||||
_get_client_ip,
|
||||
)
|
||||
from .websocket import ( # noqa: E402, F401
|
||||
_authenticate_ws,
|
||||
_broadcast,
|
||||
_heartbeat,
|
||||
world_ws,
|
||||
)
|
||||
212
src/dashboard/routes/world/bark.py
Normal file
212
src/dashboard/routes/world/bark.py
Normal file
@@ -0,0 +1,212 @@
|
||||
"""Bark/conversation — visitor chat engine and Matrix bark endpoint."""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from collections import deque
|
||||
|
||||
from fastapi import APIRouter
|
||||
from fastapi.responses import JSONResponse
|
||||
from pydantic import BaseModel
|
||||
|
||||
from infrastructure.presence import produce_bark
|
||||
|
||||
from .commitments import (
|
||||
_build_commitment_context,
|
||||
_record_commitments,
|
||||
_tick_commitments,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
matrix_router = APIRouter(prefix="/api/matrix", tags=["matrix"])
|
||||
|
||||
# Rate limiting: 1 request per 3 seconds per visitor_id
|
||||
_BARK_RATE_LIMIT_SECONDS = 3
|
||||
_bark_last_request: dict[str, float] = {}
|
||||
|
||||
# Recent conversation buffer — kept in memory for the Workshop overlay.
|
||||
# Stores the last _MAX_EXCHANGES (visitor_text, timmy_text) pairs.
|
||||
_MAX_EXCHANGES = 3
|
||||
_conversation: deque[dict] = deque(maxlen=_MAX_EXCHANGES)
|
||||
|
||||
_WORKSHOP_SESSION_ID = "workshop"
|
||||
|
||||
# Conversation grounding — anchor to opening topic so Timmy doesn't drift.
|
||||
_ground_topic: str | None = None
|
||||
_ground_set_at: float = 0.0
|
||||
_GROUND_TTL = 300 # seconds of inactivity before the anchor expires
|
||||
|
||||
|
||||
class BarkRequest(BaseModel):
|
||||
"""Request body for POST /api/matrix/bark."""
|
||||
|
||||
text: str
|
||||
visitor_id: str
|
||||
|
||||
|
||||
@matrix_router.post("/bark")
|
||||
async def post_matrix_bark(request: BarkRequest) -> JSONResponse:
|
||||
"""Generate a bark response for a visitor message.
|
||||
|
||||
HTTP fallback for when WebSocket isn't available. The Matrix frontend
|
||||
can POST a message and get Timmy's bark response back as JSON.
|
||||
|
||||
Rate limited to 1 request per 3 seconds per visitor_id.
|
||||
|
||||
Request body:
|
||||
- text: The visitor's message text
|
||||
- visitor_id: Unique identifier for the visitor (used for rate limiting)
|
||||
|
||||
Returns:
|
||||
- 200: Bark message in produce_bark() format
|
||||
- 429: Rate limit exceeded (try again later)
|
||||
- 422: Invalid request (missing/invalid fields)
|
||||
"""
|
||||
# Validate inputs
|
||||
text = request.text.strip() if request.text else ""
|
||||
visitor_id = request.visitor_id.strip() if request.visitor_id else ""
|
||||
|
||||
if not text:
|
||||
return JSONResponse(
|
||||
status_code=422,
|
||||
content={"error": "text is required"},
|
||||
)
|
||||
|
||||
if not visitor_id:
|
||||
return JSONResponse(
|
||||
status_code=422,
|
||||
content={"error": "visitor_id is required"},
|
||||
)
|
||||
|
||||
# Rate limiting check
|
||||
now = time.time()
|
||||
last_request = _bark_last_request.get(visitor_id, 0)
|
||||
time_since_last = now - last_request
|
||||
|
||||
if time_since_last < _BARK_RATE_LIMIT_SECONDS:
|
||||
retry_after = _BARK_RATE_LIMIT_SECONDS - time_since_last
|
||||
return JSONResponse(
|
||||
status_code=429,
|
||||
content={"error": "Rate limit exceeded. Try again later."},
|
||||
headers={"Retry-After": str(int(retry_after) + 1)},
|
||||
)
|
||||
|
||||
# Record this request
|
||||
_bark_last_request[visitor_id] = now
|
||||
|
||||
# Generate bark response
|
||||
try:
|
||||
reply = await _generate_bark(text)
|
||||
except Exception as exc:
|
||||
logger.warning("Bark generation failed: %s", exc)
|
||||
reply = "Hmm, my thoughts are a bit tangled right now."
|
||||
|
||||
# Build bark response using produce_bark format
|
||||
bark = produce_bark(agent_id="timmy", text=reply, style="speech")
|
||||
|
||||
return JSONResponse(
|
||||
content=bark,
|
||||
headers={"Cache-Control": "no-cache, no-store"},
|
||||
)
|
||||
|
||||
|
||||
def reset_conversation_ground() -> None:
|
||||
"""Clear the conversation grounding anchor (e.g. after inactivity)."""
|
||||
global _ground_topic, _ground_set_at
|
||||
_ground_topic = None
|
||||
_ground_set_at = 0.0
|
||||
|
||||
|
||||
def _refresh_ground(visitor_text: str) -> None:
|
||||
"""Set or refresh the conversation grounding anchor.
|
||||
|
||||
The first visitor message in a session (or after the TTL expires)
|
||||
becomes the anchor topic. Subsequent messages are grounded against it.
|
||||
"""
|
||||
global _ground_topic, _ground_set_at
|
||||
now = time.time()
|
||||
if _ground_topic is None or (now - _ground_set_at) > _GROUND_TTL:
|
||||
_ground_topic = visitor_text[:120]
|
||||
logger.debug("Ground topic set: %s", _ground_topic)
|
||||
_ground_set_at = now
|
||||
|
||||
|
||||
async def _bark_and_broadcast(visitor_text: str) -> None:
|
||||
"""Generate a bark response and broadcast it to all Workshop clients."""
|
||||
from .websocket import _broadcast
|
||||
|
||||
await _broadcast(json.dumps({"type": "timmy_thinking"}))
|
||||
|
||||
# Notify Pip that a visitor spoke
|
||||
try:
|
||||
from timmy.familiar import pip_familiar
|
||||
|
||||
pip_familiar.on_event("visitor_spoke")
|
||||
except Exception:
|
||||
logger.debug("Pip familiar notification failed (optional)")
|
||||
|
||||
_refresh_ground(visitor_text)
|
||||
_tick_commitments()
|
||||
reply = await _generate_bark(visitor_text)
|
||||
_record_commitments(reply)
|
||||
|
||||
_conversation.append({"visitor": visitor_text, "timmy": reply})
|
||||
|
||||
await _broadcast(
|
||||
json.dumps(
|
||||
{
|
||||
"type": "timmy_speech",
|
||||
"text": reply,
|
||||
"recentExchanges": list(_conversation),
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
async def _generate_bark(visitor_text: str) -> str:
|
||||
"""Generate a short in-character bark response.
|
||||
|
||||
Uses the existing Timmy session with a dedicated workshop session ID.
|
||||
When a grounding anchor exists, the opening topic is prepended so the
|
||||
model stays on-topic across long sessions.
|
||||
Gracefully degrades to a canned response if inference fails.
|
||||
"""
|
||||
try:
|
||||
from timmy import session as _session
|
||||
|
||||
grounded = visitor_text
|
||||
commitment_ctx = _build_commitment_context()
|
||||
if commitment_ctx:
|
||||
grounded = f"{commitment_ctx}\n{grounded}"
|
||||
if _ground_topic and visitor_text != _ground_topic:
|
||||
grounded = f"[Workshop conversation topic: {_ground_topic}]\n{grounded}"
|
||||
response = await _session.chat(grounded, session_id=_WORKSHOP_SESSION_ID)
|
||||
return response
|
||||
except Exception as exc:
|
||||
logger.warning("Bark generation failed: %s", exc)
|
||||
return "Hmm, my thoughts are a bit tangled right now."
|
||||
|
||||
|
||||
def _log_bark_failure(task: asyncio.Task) -> None:
|
||||
"""Log unhandled exceptions from fire-and-forget bark tasks."""
|
||||
if task.cancelled():
|
||||
return
|
||||
exc = task.exception()
|
||||
if exc is not None:
|
||||
logger.error("Bark task failed: %s", exc)
|
||||
|
||||
|
||||
async def _handle_client_message(raw: str) -> None:
|
||||
"""Dispatch an incoming WebSocket frame from the Workshop client."""
|
||||
try:
|
||||
data = json.loads(raw)
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
return # ignore non-JSON keep-alive pings
|
||||
|
||||
if data.get("type") == "visitor_message":
|
||||
text = (data.get("text") or "").strip()
|
||||
if text:
|
||||
task = asyncio.create_task(_bark_and_broadcast(text))
|
||||
task.add_done_callback(_log_bark_failure)
|
||||
77
src/dashboard/routes/world/commitments.py
Normal file
77
src/dashboard/routes/world/commitments.py
Normal file
@@ -0,0 +1,77 @@
|
||||
"""Conversation grounding — commitment tracking (rescued from PR #408)."""
|
||||
|
||||
import re
|
||||
import time
|
||||
|
||||
# Patterns that indicate Timmy is committing to an action.
|
||||
_COMMITMENT_PATTERNS: list[re.Pattern[str]] = [
|
||||
re.compile(r"I'll (.+?)(?:\.|!|\?|$)", re.IGNORECASE),
|
||||
re.compile(r"I will (.+?)(?:\.|!|\?|$)", re.IGNORECASE),
|
||||
re.compile(r"[Ll]et me (.+?)(?:\.|!|\?|$)", re.IGNORECASE),
|
||||
]
|
||||
|
||||
# After this many messages without follow-up, surface open commitments.
|
||||
_REMIND_AFTER = 5
|
||||
_MAX_COMMITMENTS = 10
|
||||
|
||||
# In-memory list of open commitments.
|
||||
# Each entry: {"text": str, "created_at": float, "messages_since": int}
|
||||
_commitments: list[dict] = []
|
||||
|
||||
|
||||
def _extract_commitments(text: str) -> list[str]:
|
||||
"""Pull commitment phrases from Timmy's reply text."""
|
||||
found: list[str] = []
|
||||
for pattern in _COMMITMENT_PATTERNS:
|
||||
for match in pattern.finditer(text):
|
||||
phrase = match.group(1).strip()
|
||||
if len(phrase) > 5: # skip trivially short matches
|
||||
found.append(phrase[:120])
|
||||
return found
|
||||
|
||||
|
||||
def _record_commitments(reply: str) -> None:
|
||||
"""Scan a Timmy reply for commitments and store them."""
|
||||
for phrase in _extract_commitments(reply):
|
||||
# Avoid near-duplicate commitments
|
||||
if any(c["text"] == phrase for c in _commitments):
|
||||
continue
|
||||
_commitments.append({"text": phrase, "created_at": time.time(), "messages_since": 0})
|
||||
if len(_commitments) > _MAX_COMMITMENTS:
|
||||
_commitments.pop(0)
|
||||
|
||||
|
||||
def _tick_commitments() -> None:
|
||||
"""Increment messages_since for every open commitment."""
|
||||
for c in _commitments:
|
||||
c["messages_since"] += 1
|
||||
|
||||
|
||||
def _build_commitment_context() -> str:
|
||||
"""Return a grounding note if any commitments are overdue for follow-up."""
|
||||
overdue = [c for c in _commitments if c["messages_since"] >= _REMIND_AFTER]
|
||||
if not overdue:
|
||||
return ""
|
||||
lines = [f"- {c['text']}" for c in overdue]
|
||||
return (
|
||||
"[Open commitments Timmy made earlier — "
|
||||
"weave awareness naturally, don't list robotically]\n" + "\n".join(lines)
|
||||
)
|
||||
|
||||
|
||||
def close_commitment(index: int) -> bool:
|
||||
"""Remove a commitment by index. Returns True if removed."""
|
||||
if 0 <= index < len(_commitments):
|
||||
_commitments.pop(index)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def get_commitments() -> list[dict]:
|
||||
"""Return a copy of open commitments (for testing / API)."""
|
||||
return list(_commitments)
|
||||
|
||||
|
||||
def reset_commitments() -> None:
|
||||
"""Clear all commitments (for testing / session reset)."""
|
||||
_commitments.clear()
|
||||
397
src/dashboard/routes/world/matrix.py
Normal file
397
src/dashboard/routes/world/matrix.py
Normal file
@@ -0,0 +1,397 @@
|
||||
"""Matrix API endpoints — config, agents, health, thoughts, memory search."""
|
||||
|
||||
import logging
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
from fastapi import APIRouter, Request
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
from config import settings
|
||||
from timmy.memory_system import search_memories
|
||||
|
||||
from .utils import (
|
||||
_DEFAULT_STATUS,
|
||||
_compute_circular_positions,
|
||||
_get_agent_color,
|
||||
_get_agent_shape,
|
||||
_get_client_ip,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
matrix_router = APIRouter(prefix="/api/matrix", tags=["matrix"])
|
||||
|
||||
# Default Matrix configuration (fallback when matrix.yaml is missing/corrupt)
|
||||
_DEFAULT_MATRIX_CONFIG: dict[str, Any] = {
|
||||
"lighting": {
|
||||
"ambient_color": "#1a1a2e",
|
||||
"ambient_intensity": 0.4,
|
||||
"point_lights": [
|
||||
{"color": "#FFD700", "intensity": 1.2, "position": {"x": 0, "y": 5, "z": 0}},
|
||||
{"color": "#3B82F6", "intensity": 0.8, "position": {"x": -5, "y": 3, "z": -5}},
|
||||
{"color": "#A855F7", "intensity": 0.6, "position": {"x": 5, "y": 3, "z": 5}},
|
||||
],
|
||||
},
|
||||
"environment": {
|
||||
"rain_enabled": False,
|
||||
"starfield_enabled": True,
|
||||
"fog_color": "#0f0f23",
|
||||
"fog_density": 0.02,
|
||||
},
|
||||
"features": {
|
||||
"chat_enabled": True,
|
||||
"visitor_avatars": True,
|
||||
"pip_familiar": True,
|
||||
"workshop_portal": True,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _load_matrix_config() -> dict[str, Any]:
|
||||
"""Load Matrix world configuration from matrix.yaml with fallback to defaults.
|
||||
|
||||
Returns a dict with sections: lighting, environment, features.
|
||||
If the config file is missing or invalid, returns sensible defaults.
|
||||
"""
|
||||
try:
|
||||
config_path = Path(settings.repo_root) / "config" / "matrix.yaml"
|
||||
if not config_path.exists():
|
||||
logger.debug("matrix.yaml not found, using default config")
|
||||
return _DEFAULT_MATRIX_CONFIG.copy()
|
||||
|
||||
raw = config_path.read_text()
|
||||
config = yaml.safe_load(raw)
|
||||
if not isinstance(config, dict):
|
||||
logger.warning("matrix.yaml invalid format, using defaults")
|
||||
return _DEFAULT_MATRIX_CONFIG.copy()
|
||||
|
||||
# Merge with defaults to ensure all required fields exist
|
||||
result: dict[str, Any] = {
|
||||
"lighting": {
|
||||
**_DEFAULT_MATRIX_CONFIG["lighting"],
|
||||
**config.get("lighting", {}),
|
||||
},
|
||||
"environment": {
|
||||
**_DEFAULT_MATRIX_CONFIG["environment"],
|
||||
**config.get("environment", {}),
|
||||
},
|
||||
"features": {
|
||||
**_DEFAULT_MATRIX_CONFIG["features"],
|
||||
**config.get("features", {}),
|
||||
},
|
||||
}
|
||||
|
||||
# Ensure point_lights is a list
|
||||
if "point_lights" in config.get("lighting", {}):
|
||||
result["lighting"]["point_lights"] = config["lighting"]["point_lights"]
|
||||
else:
|
||||
result["lighting"]["point_lights"] = _DEFAULT_MATRIX_CONFIG["lighting"]["point_lights"]
|
||||
|
||||
return result
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to load matrix config: %s, using defaults", exc)
|
||||
return _DEFAULT_MATRIX_CONFIG.copy()
|
||||
|
||||
|
||||
@matrix_router.get("/config")
|
||||
async def get_matrix_config() -> JSONResponse:
|
||||
"""Return Matrix world configuration.
|
||||
|
||||
Serves lighting presets, environment settings, and feature flags
|
||||
to the Matrix frontend so it can be config-driven rather than
|
||||
hardcoded. Reads from config/matrix.yaml with sensible defaults.
|
||||
|
||||
"""
|
||||
config = _load_matrix_config()
|
||||
return JSONResponse(
|
||||
content=config,
|
||||
headers={"Cache-Control": "no-cache, no-store"},
|
||||
)
|
||||
|
||||
|
||||
def _build_matrix_agents_response() -> list[dict[str, Any]]:
|
||||
"""Build the Matrix agent registry response.
|
||||
|
||||
Reads from agents.yaml and returns agents with Matrix-compatible
|
||||
formatting including colors, shapes, and positions.
|
||||
"""
|
||||
try:
|
||||
from timmy.agents.loader import list_agents
|
||||
|
||||
agents = list_agents()
|
||||
if not agents:
|
||||
return []
|
||||
|
||||
positions = _compute_circular_positions(len(agents))
|
||||
|
||||
result = []
|
||||
for i, agent in enumerate(agents):
|
||||
agent_id = agent.get("id", "")
|
||||
result.append(
|
||||
{
|
||||
"id": agent_id,
|
||||
"display_name": agent.get("name", agent_id.title()),
|
||||
"role": agent.get("role", "general"),
|
||||
"color": _get_agent_color(agent_id),
|
||||
"position": positions[i],
|
||||
"shape": _get_agent_shape(agent_id),
|
||||
"status": agent.get("status", _DEFAULT_STATUS),
|
||||
}
|
||||
)
|
||||
|
||||
return result
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to load agents for Matrix: %s", exc)
|
||||
return []
|
||||
|
||||
|
||||
@matrix_router.get("/agents")
|
||||
async def get_matrix_agents() -> JSONResponse:
|
||||
"""Return the agent registry for Matrix visualization.
|
||||
|
||||
Serves agents from agents.yaml with Matrix-compatible formatting.
|
||||
Returns 200 with empty list if no agents configured.
|
||||
"""
|
||||
agents = _build_matrix_agents_response()
|
||||
return JSONResponse(
|
||||
content=agents,
|
||||
headers={"Cache-Control": "no-cache, no-store"},
|
||||
)
|
||||
|
||||
|
||||
_MAX_THOUGHT_LIMIT = 50 # Maximum thoughts allowed per request
|
||||
_DEFAULT_THOUGHT_LIMIT = 10 # Default number of thoughts to return
|
||||
_MAX_THOUGHT_TEXT_LEN = 500 # Max characters for thought text
|
||||
|
||||
|
||||
def _build_matrix_thoughts_response(limit: int = _DEFAULT_THOUGHT_LIMIT) -> list[dict[str, Any]]:
|
||||
"""Build the Matrix thoughts response from the thinking engine.
|
||||
|
||||
Returns recent thoughts formatted for Matrix display:
|
||||
- id: thought UUID
|
||||
- text: thought content (truncated to 500 chars)
|
||||
- created_at: ISO-8601 timestamp
|
||||
- chain_id: parent thought ID (or null if root thought)
|
||||
|
||||
Returns empty list if thinking engine is disabled or fails.
|
||||
"""
|
||||
try:
|
||||
from timmy.thinking import thinking_engine
|
||||
|
||||
thoughts = thinking_engine.get_recent_thoughts(limit=limit)
|
||||
return [
|
||||
{
|
||||
"id": t.id,
|
||||
"text": t.content[:_MAX_THOUGHT_TEXT_LEN],
|
||||
"created_at": t.created_at,
|
||||
"chain_id": t.parent_id,
|
||||
}
|
||||
for t in thoughts
|
||||
]
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to load thoughts for Matrix: %s", exc)
|
||||
return []
|
||||
|
||||
|
||||
@matrix_router.get("/thoughts")
|
||||
async def get_matrix_thoughts(limit: int = _DEFAULT_THOUGHT_LIMIT) -> JSONResponse:
|
||||
"""Return Timmy's recent thoughts formatted for Matrix display.
|
||||
|
||||
Query params:
|
||||
- limit: Number of thoughts to return (default 10, max 50)
|
||||
|
||||
Returns empty array if thinking engine is disabled or fails.
|
||||
"""
|
||||
# Clamp limit to valid range
|
||||
if limit < 1:
|
||||
limit = 1
|
||||
elif limit > _MAX_THOUGHT_LIMIT:
|
||||
limit = _MAX_THOUGHT_LIMIT
|
||||
|
||||
thoughts = _build_matrix_thoughts_response(limit=limit)
|
||||
return JSONResponse(
|
||||
content=thoughts,
|
||||
headers={"Cache-Control": "no-cache, no-store"},
|
||||
)
|
||||
|
||||
|
||||
# Health check cache (5-second TTL for capability checks)
|
||||
_health_cache: dict | None = None
|
||||
_health_cache_ts: float = 0.0
|
||||
_HEALTH_CACHE_TTL = 5.0
|
||||
|
||||
|
||||
def _check_capability_thinking() -> bool:
|
||||
"""Check if thinking engine is available."""
|
||||
try:
|
||||
from timmy.thinking import thinking_engine
|
||||
|
||||
# Check if the engine has been initialized (has a db path)
|
||||
return hasattr(thinking_engine, "_db") and thinking_engine._db is not None
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _check_capability_memory() -> bool:
|
||||
"""Check if memory system is available."""
|
||||
try:
|
||||
from timmy.memory_system import HOT_MEMORY_PATH
|
||||
|
||||
return HOT_MEMORY_PATH.exists()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _check_capability_bark() -> bool:
|
||||
"""Check if bark production is available."""
|
||||
try:
|
||||
from infrastructure.presence import produce_bark
|
||||
|
||||
return callable(produce_bark)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _check_capability_familiar() -> bool:
|
||||
"""Check if familiar (Pip) is available."""
|
||||
try:
|
||||
from timmy.familiar import pip_familiar
|
||||
|
||||
return pip_familiar is not None
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _check_capability_lightning() -> bool:
|
||||
"""Check if Lightning payments are available."""
|
||||
# Lightning is currently disabled per health.py
|
||||
# Returns False until properly re-implemented
|
||||
return False
|
||||
|
||||
|
||||
def _build_matrix_health_response() -> dict[str, Any]:
|
||||
"""Build the Matrix health response with capability checks.
|
||||
|
||||
Performs lightweight checks (<100ms total) to determine which features
|
||||
are available. Returns 200 even if some capabilities are degraded.
|
||||
"""
|
||||
capabilities = {
|
||||
"thinking": _check_capability_thinking(),
|
||||
"memory": _check_capability_memory(),
|
||||
"bark": _check_capability_bark(),
|
||||
"familiar": _check_capability_familiar(),
|
||||
"lightning": _check_capability_lightning(),
|
||||
}
|
||||
|
||||
# Status is ok if core capabilities (thinking, memory, bark) are available
|
||||
core_caps = ["thinking", "memory", "bark"]
|
||||
core_available = all(capabilities[c] for c in core_caps)
|
||||
status = "ok" if core_available else "degraded"
|
||||
|
||||
return {
|
||||
"status": status,
|
||||
"version": "1.0.0",
|
||||
"capabilities": capabilities,
|
||||
}
|
||||
|
||||
|
||||
@matrix_router.get("/health")
|
||||
async def get_matrix_health() -> JSONResponse:
|
||||
"""Return health status and capability availability for Matrix frontend.
|
||||
|
||||
Returns 200 even if some capabilities are degraded.
|
||||
"""
|
||||
response = _build_matrix_health_response()
|
||||
status_code = 200 # Always 200, even if degraded
|
||||
|
||||
return JSONResponse(
|
||||
content=response,
|
||||
status_code=status_code,
|
||||
headers={"Cache-Control": "no-cache, no-store"},
|
||||
)
|
||||
|
||||
|
||||
# Rate limiting: 1 search per 5 seconds per IP
|
||||
_MEMORY_SEARCH_RATE_LIMIT_SECONDS = 5
|
||||
_memory_search_last_request: dict[str, float] = {}
|
||||
_MAX_MEMORY_RESULTS = 5
|
||||
_MAX_MEMORY_TEXT_LENGTH = 200
|
||||
|
||||
|
||||
def _build_matrix_memory_response(
|
||||
memories: list,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Build the Matrix memory search response.
|
||||
|
||||
Formats memory entries for Matrix display:
|
||||
- text: truncated to 200 characters
|
||||
- relevance: 0-1 score from relevance_score
|
||||
- created_at: ISO-8601 timestamp
|
||||
- context_type: the memory type
|
||||
|
||||
Results are capped at _MAX_MEMORY_RESULTS.
|
||||
"""
|
||||
results = []
|
||||
for mem in memories[:_MAX_MEMORY_RESULTS]:
|
||||
text = mem.content
|
||||
if len(text) > _MAX_MEMORY_TEXT_LENGTH:
|
||||
text = text[:_MAX_MEMORY_TEXT_LENGTH] + "..."
|
||||
|
||||
results.append(
|
||||
{
|
||||
"text": text,
|
||||
"relevance": round(mem.relevance_score or 0.0, 4),
|
||||
"created_at": mem.timestamp,
|
||||
"context_type": mem.context_type,
|
||||
}
|
||||
)
|
||||
return results
|
||||
|
||||
|
||||
@matrix_router.get("/memory/search")
|
||||
async def get_matrix_memory_search(request: Request, q: str | None = None) -> JSONResponse:
|
||||
"""Search Timmy's memory for relevant snippets.
|
||||
|
||||
Rate limited to 1 search per 5 seconds per IP.
|
||||
Returns 200 with results, 400 if missing query, or 429 if rate limited.
|
||||
"""
|
||||
# Validate query parameter
|
||||
query = q.strip() if q else ""
|
||||
if not query:
|
||||
return JSONResponse(
|
||||
status_code=400,
|
||||
content={"error": "Query parameter 'q' is required"},
|
||||
)
|
||||
|
||||
# Rate limiting check by IP
|
||||
client_ip = _get_client_ip(request)
|
||||
now = time.time()
|
||||
last_request = _memory_search_last_request.get(client_ip, 0)
|
||||
time_since_last = now - last_request
|
||||
|
||||
if time_since_last < _MEMORY_SEARCH_RATE_LIMIT_SECONDS:
|
||||
retry_after = _MEMORY_SEARCH_RATE_LIMIT_SECONDS - time_since_last
|
||||
return JSONResponse(
|
||||
status_code=429,
|
||||
content={"error": "Rate limit exceeded. Try again later."},
|
||||
headers={"Retry-After": str(int(retry_after) + 1)},
|
||||
)
|
||||
|
||||
# Record this request
|
||||
_memory_search_last_request[client_ip] = now
|
||||
|
||||
# Search memories
|
||||
try:
|
||||
memories = search_memories(query, limit=_MAX_MEMORY_RESULTS)
|
||||
results = _build_matrix_memory_response(memories)
|
||||
except Exception as exc:
|
||||
logger.warning("Memory search failed: %s", exc)
|
||||
results = []
|
||||
|
||||
return JSONResponse(
|
||||
content=results,
|
||||
headers={"Cache-Control": "no-cache, no-store"},
|
||||
)
|
||||
75
src/dashboard/routes/world/state.py
Normal file
75
src/dashboard/routes/world/state.py
Normal file
@@ -0,0 +1,75 @@
|
||||
"""World state functions — presence file reading and state API."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
from datetime import UTC, datetime
|
||||
|
||||
from fastapi import APIRouter
|
||||
from fastapi.responses import JSONResponse
|
||||
|
||||
from infrastructure.presence import serialize_presence
|
||||
from timmy.workshop_state import PRESENCE_FILE
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/api/world", tags=["world"])
|
||||
|
||||
_STALE_THRESHOLD = 90 # seconds — file older than this triggers live rebuild
|
||||
|
||||
|
||||
def _read_presence_file() -> dict | None:
|
||||
"""Read presence.json if it exists and is fresh enough."""
|
||||
try:
|
||||
if not PRESENCE_FILE.exists():
|
||||
return None
|
||||
age = time.time() - PRESENCE_FILE.stat().st_mtime
|
||||
if age > _STALE_THRESHOLD:
|
||||
logger.debug("presence.json is stale (%.0fs old)", age)
|
||||
return None
|
||||
return json.loads(PRESENCE_FILE.read_text())
|
||||
except (OSError, json.JSONDecodeError) as exc:
|
||||
logger.warning("Failed to read presence.json: %s", exc)
|
||||
return None
|
||||
|
||||
|
||||
def _build_world_state(presence: dict) -> dict:
|
||||
"""Transform presence dict into the world/state API response."""
|
||||
return serialize_presence(presence)
|
||||
|
||||
|
||||
def _get_current_state() -> dict:
|
||||
"""Build the current world-state dict from best available source."""
|
||||
presence = _read_presence_file()
|
||||
|
||||
if presence is None:
|
||||
try:
|
||||
from timmy.workshop_state import get_state_dict
|
||||
|
||||
presence = get_state_dict()
|
||||
except Exception as exc:
|
||||
logger.warning("Live state build failed: %s", exc)
|
||||
presence = {
|
||||
"version": 1,
|
||||
"liveness": datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ"),
|
||||
"mood": "calm",
|
||||
"current_focus": "",
|
||||
"active_threads": [],
|
||||
"recent_events": [],
|
||||
"concerns": [],
|
||||
}
|
||||
|
||||
return _build_world_state(presence)
|
||||
|
||||
|
||||
@router.get("/state")
|
||||
async def get_world_state() -> JSONResponse:
|
||||
"""Return Timmy's current world state for Workshop bootstrap.
|
||||
|
||||
Reads from ``~/.timmy/presence.json`` if fresh, otherwise
|
||||
rebuilds live from cognitive state.
|
||||
"""
|
||||
return JSONResponse(
|
||||
content=_get_current_state(),
|
||||
headers={"Cache-Control": "no-cache, no-store"},
|
||||
)
|
||||
85
src/dashboard/routes/world/utils.py
Normal file
85
src/dashboard/routes/world/utils.py
Normal file
@@ -0,0 +1,85 @@
|
||||
"""Shared utilities for the world route submodules."""
|
||||
|
||||
import math
|
||||
|
||||
# Agent color mapping — consistent with Matrix visual identity
|
||||
_AGENT_COLORS: dict[str, str] = {
|
||||
"timmy": "#FFD700", # Gold
|
||||
"orchestrator": "#FFD700", # Gold
|
||||
"perplexity": "#3B82F6", # Blue
|
||||
"replit": "#F97316", # Orange
|
||||
"kimi": "#06B6D4", # Cyan
|
||||
"claude": "#A855F7", # Purple
|
||||
"researcher": "#10B981", # Emerald
|
||||
"coder": "#EF4444", # Red
|
||||
"writer": "#EC4899", # Pink
|
||||
"memory": "#8B5CF6", # Violet
|
||||
"experimenter": "#14B8A6", # Teal
|
||||
"forge": "#EF4444", # Red (coder alias)
|
||||
"seer": "#10B981", # Emerald (researcher alias)
|
||||
"quill": "#EC4899", # Pink (writer alias)
|
||||
"echo": "#8B5CF6", # Violet (memory alias)
|
||||
"lab": "#14B8A6", # Teal (experimenter alias)
|
||||
}
|
||||
|
||||
# Agent shape mapping for 3D visualization
|
||||
_AGENT_SHAPES: dict[str, str] = {
|
||||
"timmy": "sphere",
|
||||
"orchestrator": "sphere",
|
||||
"perplexity": "cube",
|
||||
"replit": "cylinder",
|
||||
"kimi": "dodecahedron",
|
||||
"claude": "octahedron",
|
||||
"researcher": "icosahedron",
|
||||
"coder": "cube",
|
||||
"writer": "cone",
|
||||
"memory": "torus",
|
||||
"experimenter": "tetrahedron",
|
||||
"forge": "cube",
|
||||
"seer": "icosahedron",
|
||||
"quill": "cone",
|
||||
"echo": "torus",
|
||||
"lab": "tetrahedron",
|
||||
}
|
||||
|
||||
# Default fallback values
|
||||
_DEFAULT_COLOR = "#9CA3AF" # Gray
|
||||
_DEFAULT_SHAPE = "sphere"
|
||||
_DEFAULT_STATUS = "available"
|
||||
|
||||
|
||||
def _get_agent_color(agent_id: str) -> str:
|
||||
"""Get the Matrix color for an agent."""
|
||||
return _AGENT_COLORS.get(agent_id.lower(), _DEFAULT_COLOR)
|
||||
|
||||
|
||||
def _get_agent_shape(agent_id: str) -> str:
|
||||
"""Get the Matrix shape for an agent."""
|
||||
return _AGENT_SHAPES.get(agent_id.lower(), _DEFAULT_SHAPE)
|
||||
|
||||
|
||||
def _compute_circular_positions(count: int, radius: float = 3.0) -> list[dict[str, float]]:
|
||||
"""Compute circular positions for agents in the Matrix.
|
||||
|
||||
Agents are arranged in a circle on the XZ plane at y=0.
|
||||
"""
|
||||
positions = []
|
||||
for i in range(count):
|
||||
angle = (2 * math.pi * i) / count
|
||||
x = radius * math.cos(angle)
|
||||
z = radius * math.sin(angle)
|
||||
positions.append({"x": round(x, 2), "y": 0.0, "z": round(z, 2)})
|
||||
return positions
|
||||
|
||||
|
||||
def _get_client_ip(request) -> str:
|
||||
"""Extract client IP from request, respecting X-Forwarded-For header."""
|
||||
# Check for forwarded IP (when behind proxy)
|
||||
forwarded = request.headers.get("X-Forwarded-For")
|
||||
if forwarded:
|
||||
# Take the first IP in the chain
|
||||
return forwarded.split(",")[0].strip()
|
||||
# Fall back to direct client IP
|
||||
if request.client:
|
||||
return request.client.host
|
||||
return "unknown"
|
||||
160
src/dashboard/routes/world/websocket.py
Normal file
160
src/dashboard/routes/world/websocket.py
Normal file
@@ -0,0 +1,160 @@
|
||||
"""WebSocket relay for live state changes."""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
|
||||
from fastapi import APIRouter, WebSocket
|
||||
|
||||
from config import settings
|
||||
|
||||
from .bark import _handle_client_message
|
||||
from .state import _get_current_state
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/api/world", tags=["world"])
|
||||
|
||||
_ws_clients: list[WebSocket] = []
|
||||
|
||||
_HEARTBEAT_INTERVAL = 15 # seconds — ping to detect dead iPad/Safari connections
|
||||
|
||||
|
||||
async def _heartbeat(websocket: WebSocket) -> None:
|
||||
"""Send periodic pings to detect dead connections (iPad resilience).
|
||||
|
||||
Safari suspends background tabs, killing the TCP socket silently.
|
||||
A 15-second ping ensures we notice within one interval.
|
||||
|
||||
Rescued from stale PR #399.
|
||||
"""
|
||||
try:
|
||||
while True:
|
||||
await asyncio.sleep(_HEARTBEAT_INTERVAL)
|
||||
await websocket.send_text(json.dumps({"type": "ping"}))
|
||||
except Exception:
|
||||
logger.debug("Heartbeat stopped — connection gone")
|
||||
|
||||
|
||||
async def _authenticate_ws(websocket: WebSocket) -> bool:
|
||||
"""Authenticate WebSocket connection using matrix_ws_token.
|
||||
|
||||
Checks for token in query param ?token=<token>. If no query param,
|
||||
accepts the connection and waits for first message with
|
||||
{"type": "auth", "token": "<token>"}.
|
||||
|
||||
Returns True if authenticated (or if auth is disabled).
|
||||
Returns False and closes connection with code 4001 if invalid.
|
||||
"""
|
||||
token_setting = settings.matrix_ws_token
|
||||
|
||||
# Auth disabled in dev mode (empty/unset token)
|
||||
if not token_setting:
|
||||
return True
|
||||
|
||||
# Check query param first (can validate before accept)
|
||||
query_token = websocket.query_params.get("token", "")
|
||||
if query_token:
|
||||
if query_token == token_setting:
|
||||
return True
|
||||
# Invalid token in query param - we need to accept to close properly
|
||||
await websocket.accept()
|
||||
await websocket.close(code=4001, reason="Invalid token")
|
||||
return False
|
||||
|
||||
# No query token - accept and wait for auth message
|
||||
await websocket.accept()
|
||||
|
||||
# Wait for auth message as first message
|
||||
try:
|
||||
raw = await websocket.receive_text()
|
||||
data = json.loads(raw)
|
||||
if data.get("type") == "auth" and data.get("token") == token_setting:
|
||||
return True
|
||||
# Invalid auth message
|
||||
await websocket.close(code=4001, reason="Invalid token")
|
||||
return False
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
# Non-JSON first message without valid token
|
||||
await websocket.close(code=4001, reason="Authentication required")
|
||||
return False
|
||||
|
||||
|
||||
@router.websocket("/ws")
|
||||
async def world_ws(websocket: WebSocket) -> None:
|
||||
"""Accept a Workshop client and keep it alive for state broadcasts.
|
||||
|
||||
Sends a full ``world_state`` snapshot immediately on connect so the
|
||||
client never starts from a blank slate. Incoming frames are parsed
|
||||
as JSON — ``visitor_message`` triggers a bark response. A background
|
||||
heartbeat ping runs every 15 s to detect dead connections early.
|
||||
|
||||
Authentication:
|
||||
- If matrix_ws_token is configured, clients must provide it via
|
||||
?token=<token> param or in the first message as
|
||||
{"type": "auth", "token": "<token>"}.
|
||||
- Invalid token results in close code 4001.
|
||||
- Valid token receives a connection_ack message.
|
||||
"""
|
||||
# Authenticate (may accept connection internally)
|
||||
is_authed = await _authenticate_ws(websocket)
|
||||
if not is_authed:
|
||||
logger.info("World WS connection rejected — invalid token")
|
||||
return
|
||||
|
||||
# Auth passed - accept if not already accepted
|
||||
if websocket.client_state.name != "CONNECTED":
|
||||
await websocket.accept()
|
||||
|
||||
# Send connection_ack if auth was required
|
||||
if settings.matrix_ws_token:
|
||||
await websocket.send_text(json.dumps({"type": "connection_ack"}))
|
||||
|
||||
_ws_clients.append(websocket)
|
||||
logger.info("World WS connected — %d clients", len(_ws_clients))
|
||||
|
||||
# Send full world-state snapshot so client bootstraps instantly
|
||||
try:
|
||||
snapshot = _get_current_state()
|
||||
await websocket.send_text(json.dumps({"type": "world_state", **snapshot}))
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to send WS snapshot: %s", exc)
|
||||
|
||||
ping_task = asyncio.create_task(_heartbeat(websocket))
|
||||
try:
|
||||
while True:
|
||||
raw = await websocket.receive_text()
|
||||
await _handle_client_message(raw)
|
||||
except Exception:
|
||||
logger.debug("WebSocket receive loop ended")
|
||||
finally:
|
||||
ping_task.cancel()
|
||||
if websocket in _ws_clients:
|
||||
_ws_clients.remove(websocket)
|
||||
logger.info("World WS disconnected — %d clients", len(_ws_clients))
|
||||
|
||||
|
||||
async def _broadcast(message: str) -> None:
|
||||
"""Send *message* to every connected Workshop client, pruning dead ones."""
|
||||
dead: list[WebSocket] = []
|
||||
for ws in _ws_clients:
|
||||
try:
|
||||
await ws.send_text(message)
|
||||
except Exception:
|
||||
logger.debug("Pruning dead WebSocket client")
|
||||
dead.append(ws)
|
||||
for ws in dead:
|
||||
if ws in _ws_clients:
|
||||
_ws_clients.remove(ws)
|
||||
|
||||
|
||||
async def broadcast_world_state(presence: dict) -> None:
|
||||
"""Broadcast a ``timmy_state`` message to all connected Workshop clients.
|
||||
|
||||
Called by :class:`~timmy.workshop_state.WorkshopHeartbeat` via its
|
||||
``on_change`` callback.
|
||||
"""
|
||||
from .state import _build_world_state
|
||||
|
||||
state = _build_world_state(presence)
|
||||
await _broadcast(json.dumps({"type": "timmy_state", **state["timmyState"]}))
|
||||
Reference in New Issue
Block a user