/** * /api/ws — WebSocket bridge from the internal EventBus to connected Workshop clients. * * Protocol (server → client): * { type: "world_state", timmyState, agentStates, recentEvents, updatedAt } * { type: "agent_state", agentId: string, state: "idle"|"active"|"working"|"thinking" } * { type: "job_started", jobId: string, agentId: string } * { type: "job_completed", jobId: string, agentId: string } * { type: "chat", agentId: string, text: string } * { type: "visitor_count", count: number } * { type: "ping" } * * Protocol (client → server): * { type: "visitor_enter", visitorId: string, visitorName?: string } * { type: "visitor_leave", visitorId: string } * { type: "visitor_message", visitorId: string, text: string } * { type: "pong" } * * Agent mapping (Workshop): * alpha — orchestrator (overall job lifecycle) * beta — eval (Haiku judge) * gamma — work (Sonnet executor) * delta — lightning (payment monitor / invoice watcher) */ import { randomUUID } from "crypto"; import type { IncomingMessage } from "http"; import type { WebSocket } from "ws"; import { WebSocketServer } from "ws"; import type { Server } from "http"; import { eventBus, type BusEvent } from "../lib/event-bus.js"; import { makeLogger } from "../lib/logger.js"; import { getWorldState, setAgentStateInWorld } from "../lib/world-state.js"; import { agentService } from "../lib/agent.js"; import { db, worldEvents } from "@workspace/db"; const logger = makeLogger("ws-events"); const PING_INTERVAL_MS = 30_000; // ── Per-visitor rate limit (3 replies/minute) ───────────────────────────────── const CHAT_RATE_LIMIT = 3; const CHAT_RATE_WINDOW_MS = 60_000; interface RateLimitEntry { count: number; resetAt: number; } const visitorRateLimits = new Map(); function checkChatRateLimit(visitorId: string): boolean { const now = Date.now(); const entry = visitorRateLimits.get(visitorId); if (!entry || now > entry.resetAt) { visitorRateLimits.set(visitorId, { count: 1, resetAt: now + CHAT_RATE_WINDOW_MS }); return true; } if (entry.count >= CHAT_RATE_LIMIT) return false; entry.count++; return true; } function broadcastToAll(wss: WebSocketServer, payload: object): void { const str = JSON.stringify(payload); wss.clients.forEach((c) => { if (c.readyState === 1) c.send(str); }); } function updateAgentWorld(agentId: string, state: string): void { try { setAgentStateInWorld(agentId, state); } catch { /* non-fatal */ } } async function logWorldEvent( type: string, summary: string, agentId?: string, jobId?: string, ): Promise { try { await db.insert(worldEvents).values({ id: randomUUID(), type, summary, agentId: agentId ?? null, jobId: jobId ?? null, }); } catch { /* non-fatal — don't crash WS on DB error */ } } function translateEvent(ev: BusEvent): object | null { switch (ev.type) { // ── Mode 1 job lifecycle ───────────────────────────────────────────────── case "job:state": { if (ev.state === "evaluating") { updateAgentWorld("alpha", "active"); updateAgentWorld("beta", "thinking"); void logWorldEvent("job:evaluating", `Evaluating job ${ev.jobId.slice(0, 8)}`, "beta", ev.jobId); return [ { type: "agent_state", agentId: "alpha", state: "active" }, { type: "agent_state", agentId: "beta", state: "thinking" }, { type: "job_started", jobId: ev.jobId, agentId: "beta" }, ]; } if (ev.state === "awaiting_eval_payment") { updateAgentWorld("alpha", "active"); return { type: "agent_state", agentId: "alpha", state: "active" }; } if (ev.state === "awaiting_work_payment") { updateAgentWorld("beta", "idle"); updateAgentWorld("delta", "active"); return [ { type: "agent_state", agentId: "beta", state: "idle" }, { type: "agent_state", agentId: "delta", state: "active" }, ]; } if (ev.state === "executing") { updateAgentWorld("delta", "idle"); updateAgentWorld("gamma", "working"); void logWorldEvent("job:executing", `Working on job ${ev.jobId.slice(0, 8)}`, "gamma", ev.jobId); return [ { type: "agent_state", agentId: "delta", state: "idle" }, { type: "agent_state", agentId: "gamma", state: "working" }, ]; } if (ev.state === "complete") { updateAgentWorld("gamma", "idle"); updateAgentWorld("alpha", "idle"); void logWorldEvent("job:complete", `Completed job ${ev.jobId.slice(0, 8)}`, "alpha", ev.jobId); return [ { type: "agent_state", agentId: "gamma", state: "idle" }, { type: "agent_state", agentId: "alpha", state: "idle" }, { type: "job_completed", jobId: ev.jobId, agentId: "gamma" }, ]; } if (ev.state === "rejected" || ev.state === "failed") { ["alpha", "beta", "gamma", "delta"].forEach(a => updateAgentWorld(a, "idle")); void logWorldEvent(`job:${ev.state}`, `Job ${ev.jobId.slice(0, 8)} ${ev.state}`, "alpha", ev.jobId); return [ { type: "agent_state", agentId: "beta", state: "idle" }, { type: "agent_state", agentId: "gamma", state: "idle" }, { type: "agent_state", agentId: "alpha", state: "idle" }, { type: "agent_state", agentId: "delta", state: "idle" }, ]; } return null; } case "job:completed": updateAgentWorld("gamma", "idle"); updateAgentWorld("alpha", "idle"); return [ { type: "agent_state", agentId: "gamma", state: "idle" }, { type: "agent_state", agentId: "alpha", state: "idle" }, { type: "chat", agentId: "gamma", text: `Job ${ev.jobId.slice(0, 8)} complete` }, ]; case "job:failed": ["alpha", "beta", "gamma", "delta"].forEach(a => updateAgentWorld(a, "idle")); return [ { type: "agent_state", agentId: "alpha", state: "idle" }, { type: "agent_state", agentId: "beta", state: "idle" }, { type: "agent_state", agentId: "gamma", state: "idle" }, { type: "agent_state", agentId: "delta", state: "idle" }, { type: "chat", agentId: "alpha", text: `Job ${ev.jobId.slice(0, 8)} failed: ${ev.reason}` }, ]; case "job:paid": if (ev.invoiceType === "eval") { updateAgentWorld("delta", "idle"); updateAgentWorld("beta", "thinking"); void logWorldEvent("payment:eval", "Eval payment confirmed", "delta"); return [ { type: "agent_state", agentId: "delta", state: "idle" }, { type: "agent_state", agentId: "beta", state: "thinking" }, { type: "chat", agentId: "delta", text: "⚡ Eval payment confirmed" }, ]; } if (ev.invoiceType === "work") { updateAgentWorld("delta", "idle"); updateAgentWorld("gamma", "working"); void logWorldEvent("payment:work", "Work payment confirmed", "delta"); return [ { type: "agent_state", agentId: "delta", state: "idle" }, { type: "agent_state", agentId: "gamma", state: "working" }, { type: "chat", agentId: "delta", text: "⚡ Work payment confirmed" }, ]; } return null; // ── Mode 2 session lifecycle ───────────────────────────────────────────── case "session:paid": return { type: "chat", agentId: "delta", text: `⚡ Session funded: ${ev.amountSats} sats` }; case "session:balance": return { type: "chat", agentId: "delta", text: `Balance: ${ev.balanceSats} sats remaining`, }; case "session:state": if (ev.state === "active") { return { type: "agent_state", agentId: "delta", state: "idle" }; } if (ev.state === "paused") { return { type: "chat", agentId: "delta", text: "Session paused — balance low. Top up to continue.", }; } return null; // ── Debate events (#21) ──────────────────────────────────────────────── case "debate:argument": { void logWorldEvent( "debate:argument", `${ev.agent} argues to ${ev.position}: ${ev.argument.slice(0, 80)}`, "beta", ev.jobId, ); return { type: "agent_debate", jobId: ev.jobId, agent: ev.agent, position: ev.position, argument: ev.argument, }; } case "debate:verdict": { void logWorldEvent( "debate:verdict", `Verdict: ${ev.accepted ? "accepted" : "rejected"} — ${ev.reason.slice(0, 80)}`, "beta", ev.jobId, ); return { type: "agent_debate", jobId: ev.jobId, agent: "Beta", position: "verdict", argument: `Final verdict: ${ev.accepted ? "ACCEPTED" : "REJECTED"} — ${ev.reason}`, accepted: ev.accepted, }; } default: return null; } } function send(socket: WebSocket, payload: object): void { if (socket.readyState === 1) { socket.send(JSON.stringify(payload)); } } function broadcast(socket: WebSocket, ev: BusEvent): void { const out = translateEvent(ev); if (!out) return; const messages = Array.isArray(out) ? out : [out]; for (const msg of messages) { send(socket, msg); } } async function sendWorldStateBootstrap(socket: WebSocket): Promise { try { const state = getWorldState(); const { desc } = await import("drizzle-orm"); const recent = await db .select() .from(worldEvents) .orderBy(desc(worldEvents.createdAt)) .limit(20); send(socket, { type: "world_state", ...state, recentEvents: recent.reverse(), }); } catch { send(socket, { type: "world_state", ...getWorldState(), recentEvents: [] }); } } export function attachWebSocketServer(server: Server): void { const wss = new WebSocketServer({ server, path: "/api/ws" }); wss.on("connection", (socket: WebSocket, req: IncomingMessage) => { const ip = req.headers["x-forwarded-for"] ?? req.socket.remoteAddress ?? "unknown"; logger.info("ws client connected", { ip, clients: wss.clients.size }); void sendWorldStateBootstrap(socket); const busHandler = (ev: BusEvent) => broadcast(socket, ev); eventBus.on("bus", busHandler); const pingTimer = setInterval(() => { send(socket, { type: "ping" }); }, PING_INTERVAL_MS); socket.on("message", (raw) => { try { const msg = JSON.parse(raw.toString()) as { type?: string; text?: string; visitorId?: string }; if (msg.type === "pong") return; if (msg.type === "subscribe") { send(socket, { type: "agent_count", count: wss.clients.size }); } if (msg.type === "visitor_enter") { wss.clients.forEach(c => { if (c !== socket && c.readyState === 1) { c.send(JSON.stringify({ type: "visitor_count", count: wss.clients.size })); } }); send(socket, { type: "visitor_count", count: wss.clients.size }); } if (msg.type === "visitor_leave") { wss.clients.forEach(c => { if (c !== socket && c.readyState === 1) { c.send(JSON.stringify({ type: "visitor_count", count: Math.max(0, wss.clients.size - 1) })); } }); } if (msg.type === "visitor_message" && msg.text) { const text = String(msg.text).slice(0, 500); // Broadcast visitor message to all watchers broadcastToAll(wss, { type: "chat", agentId: "visitor", text }); // Rate-limit Timmy's AI replies — key on server-trusted IP, not // client-provided visitorId (which is trivially spoofable). const ipStr = Array.isArray(ip) ? (ip[0] ?? "unknown") : String(ip).split(",")[0]!.trim(); if (!checkChatRateLimit(ipStr)) { send(socket, { type: "chat", agentId: "timmy", text: "I need a moment to gather my thoughts… try again shortly.", }); return; } // Fire-and-forget AI reply void (async () => { try { // Signal that Timmy is thinking broadcastToAll(wss, { type: "agent_state", agentId: "gamma", state: "working" }); updateAgentWorld("gamma", "working"); const reply = await agentService.chatReply(text); broadcastToAll(wss, { type: "agent_state", agentId: "gamma", state: "idle" }); updateAgentWorld("gamma", "idle"); broadcastToAll(wss, { type: "chat", agentId: "timmy", text: reply }); void logWorldEvent("visitor:reply", reply.slice(0, 100), "timmy"); } catch (err) { broadcastToAll(wss, { type: "agent_state", agentId: "gamma", state: "idle" }); updateAgentWorld("gamma", "idle"); logger.warn("chatReply failed", { err: String(err) }); } })(); } } catch { /* ignore malformed messages */ } }); socket.on("close", () => { clearInterval(pingTimer); eventBus.off("bus", busHandler); logger.info("ws client disconnected", { clients: wss.clients.size - 1 }); }); socket.on("error", (err) => { logger.warn("ws socket error", { err: err.message }); }); }); logger.info("WebSocket server attached at /api/ws"); }