Files
timmy-tower/artifacts/api-server/src/routes/events.ts

477 lines
17 KiB
TypeScript

/**
* /api/ws — WebSocket bridge from the internal EventBus to connected Workshop clients.
*
* Protocol (server → client):
* { type: "world_state", timmyState, agentStates, recentEvents, updatedAt }
* { type: "agent_state", agentId: string, state: "idle"|"active"|"working"|"thinking" }
* { type: "job_started", jobId: string, agentId: string }
* { type: "job_completed", jobId: string, agentId: string }
* { type: "chat", agentId: string, text: string }
* { type: "visitor_count", count: number }
* { type: "ping" }
*
* Protocol (client → server):
* { type: "visitor_enter", visitorId: string, visitorName?: string }
* { type: "visitor_leave", visitorId: string }
* { type: "visitor_message", visitorId: string, text: string }
* { type: "pong" }
*
* Agent mapping (Workshop):
* alpha — orchestrator (overall job lifecycle)
* beta — eval (Haiku judge)
* gamma — work (Sonnet executor)
* delta — lightning (payment monitor / invoice watcher)
*/
import { randomUUID } from "crypto";
import type { IncomingMessage } from "http";
import type { WebSocket } from "ws";
import { WebSocketServer } from "ws";
import type { Server } from "http";
import { eventBus, type BusEvent } from "../lib/event-bus.js";
import { makeLogger } from "../lib/logger.js";
import { getWorldState, setAgentStateInWorld } from "../lib/world-state.js";
import { agentService } from "../lib/agent.js";
import { db, worldEvents } from "@workspace/db";
const logger = makeLogger("ws-events");
const PING_INTERVAL_MS = 30_000;
// Map to store visitorId -> npub mappings
const connectedVisitors = new Map<string, string>();
// ── Per-visitor rate limit (3 replies/minute) ─────────────────────────────────
const CHAT_RATE_LIMIT = 3;
const CHAT_RATE_WINDOW_MS = 60_000;
interface RateLimitEntry {
count: number;
resetAt: number;
}
const visitorRateLimits = new Map<string, RateLimitEntry>();
function checkChatRateLimit(visitorId: string): boolean {
const now = Date.now();
const entry = visitorRateLimits.get(visitorId);
if (!entry || now > entry.resetAt) {
visitorRateLimits.set(visitorId, { count: 1, resetAt: now + CHAT_RATE_WINDOW_MS });
return true;
}
if (entry.count >= CHAT_RATE_LIMIT) return false;
entry.count++;
return true;
}
function broadcastToAll(wss: WebSocketServer, payload: object): void {
const str = JSON.stringify(payload);
wss.clients.forEach((c) => {
if (c.readyState === 1) c.send(str);
});
}
function updateAgentWorld(agentId: string, state: string): void {
try {
setAgentStateInWorld(agentId, state);
} catch {
/* non-fatal */
}
}
async function logWorldEvent(
type: string,
summary: string,
agentId?: string,
jobId?: string,
): Promise<void> {
try {
await db.insert(worldEvents).values({
id: randomUUID(),
type,
summary,
agentId: agentId ?? null,
jobId: jobId ?? null,
});
} catch {
/* non-fatal — don't crash WS on DB error */
}
}
function translateEvent(ev: BusEvent): object | null {
switch (ev.type) {
// ── Mode 1 job lifecycle ─────────────────────────────────────────────────
case "job:state": {
if (ev.state === "evaluating") {
updateAgentWorld("alpha", "active");
updateAgentWorld("beta", "thinking");
void logWorldEvent("job:evaluating", `Evaluating job ${ev.jobId.slice(0, 8)}`, "beta", ev.jobId);
return [
{ type: "agent_state", agentId: "alpha", state: "active" },
{ type: "agent_state", agentId: "beta", state: "thinking" },
{ type: "job_started", jobId: ev.jobId, agentId: "beta" },
];
}
if (ev.state === "awaiting_eval_payment") {
updateAgentWorld("alpha", "active");
return { type: "agent_state", agentId: "alpha", state: "active" };
}
if (ev.state === "awaiting_work_payment") {
updateAgentWorld("beta", "idle");
updateAgentWorld("delta", "active");
return [
{ type: "agent_state", agentId: "beta", state: "idle" },
{ type: "agent_state", agentId: "delta", state: "active" },
];
}
if (ev.state === "executing") {
updateAgentWorld("delta", "idle");
updateAgentWorld("gamma", "working");
void logWorldEvent("job:executing", `Working on job ${ev.jobId.slice(0, 8)}`, "gamma", ev.jobId);
return [
{ type: "agent_state", agentId: "delta", state: "idle" },
{ type: "agent_state", agentId: "gamma", state: "working" },
];
}
if (ev.state === "complete") {
updateAgentWorld("gamma", "idle");
updateAgentWorld("alpha", "idle");
void logWorldEvent("job:complete", `Completed job ${ev.jobId.slice(0, 8)}`, "alpha", ev.jobId);
return [
{ type: "agent_state", agentId: "gamma", state: "idle" },
{ type: "agent_state", agentId: "alpha", state: "idle" },
{ type: "job_completed", jobId: ev.jobId, agentId: "gamma" },
];
}
if (ev.state === "rejected" || ev.state === "failed") {
["alpha", "beta", "gamma", "delta"].forEach(a => updateAgentWorld(a, "idle"));
void logWorldEvent(`job:${ev.state}`, `Job ${ev.jobId.slice(0, 8)} ${ev.state}`, "alpha", ev.jobId);
return [
{ type: "agent_state", agentId: "beta", state: "idle" },
{ type: "agent_state", agentId: "gamma", state: "idle" },
{ type: "agent_state", agentId: "alpha", state: "idle" },
{ type: "agent_state", agentId: "delta", state: "idle" },
];
}
return null;
}
case "job:completed":
updateAgentWorld("gamma", "idle");
updateAgentWorld("alpha", "idle");
return [
{ type: "agent_state", agentId: "gamma", state: "idle" },
{ type: "agent_state", agentId: "alpha", state: "idle" },
{ type: "chat", agentId: "gamma", text: `Job ${ev.jobId.slice(0, 8)} complete` },
];
case "job:failed":
["alpha", "beta", "gamma", "delta"].forEach(a => updateAgentWorld(a, "idle"));
return [
{ type: "agent_state", agentId: "alpha", state: "idle" },
{ type: "agent_state", agentId: "beta", state: "idle" },
{ type: "agent_state", agentId: "gamma", state: "idle" },
{ type: "agent_state", agentId: "delta", state: "idle" },
{ type: "chat", agentId: "alpha", text: `Job ${ev.jobId.slice(0, 8)} failed: ${ev.reason}` },
];
case "job:paid":
if (ev.invoiceType === "eval") {
updateAgentWorld("delta", "idle");
updateAgentWorld("beta", "thinking");
void logWorldEvent("payment:eval", "Eval payment confirmed", "delta");
return [
{ type: "agent_state", agentId: "delta", state: "idle" },
{ type: "agent_state", agentId: "beta", state: "thinking" },
{ type: "chat", agentId: "delta", text: "⚡ Eval payment confirmed" },
];
}
if (ev.invoiceType === "work") {
updateAgentWorld("delta", "idle");
updateAgentWorld("gamma", "working");
void logWorldEvent("payment:work", "Work payment confirmed", "delta");
return [
{ type: "agent_state", agentId: "delta", state: "idle" },
{ type: "agent_state", agentId: "gamma", state: "working" },
{ type: "chat", agentId: "delta", text: "⚡ Work payment confirmed" },
];
}
return null;
// ── Mode 2 session lifecycle ─────────────────────────────────────────────
case "session:paid":
return { type: "chat", agentId: "delta", text: `⚡ Session funded: ${ev.amountSats} sats` };
case "session:balance":
return {
type: "chat",
agentId: "delta",
text: `Balance: ${ev.balanceSats} sats remaining`,
};
case "session:state":
if (ev.state === "active") {
return { type: "agent_state", agentId: "delta", state: "idle" };
}
if (ev.state === "paused") {
return {
type: "chat",
agentId: "delta",
text: "Session paused — balance low. Top up to continue.",
};
}
return null;
// ── Debate events (#21) ────────────────────────────────────────────────
case "debate:argument": {
void logWorldEvent(
"debate:argument",
`${ev.agent} argues to ${ev.position}: ${ev.argument.slice(0, 80)}`,
"beta",
ev.jobId,
);
return {
type: "agent_debate",
jobId: ev.jobId,
agent: ev.agent,
position: ev.position,
argument: ev.argument,
};
}
case "debate:verdict": {
void logWorldEvent(
"debate:verdict",
`Verdict: ${ev.accepted ? "accepted" : "rejected"}${ev.reason.slice(0, 80)}`,
"beta",
ev.jobId,
);
return {
type: "agent_debate",
jobId: ev.jobId,
agent: "Beta",
position: "verdict",
argument: `Final verdict: ${ev.accepted ? "ACCEPTED" : "REJECTED"}${ev.reason}`,
accepted: ev.accepted,
};
}
// ── Real-time cost ticker (#68) ───────────────────────────────────────────
case "cost:update":
return {
type: "cost_update",
jobId: ev.jobId,
sats: ev.sats,
phase: ev.phase,
isFinal: ev.isFinal,
};
// ── Agent commentary (#1) ─────────────────────────────────────────────────
case "agent_commentary":
return {
type: "agent_commentary",
agentId: ev.agentId,
jobId: ev.jobId,
text: ev.text,
};
// ── External agent state (Kimi, Perplexity) (#11) ─────────────────────────
case "agent:external_state": {
updateAgentWorld(ev.agentId, ev.state);
void logWorldEvent(
`agent:${ev.state}`,
`${ev.agentId} is now ${ev.state}${ev.taskSummary ? `: ${ev.taskSummary.slice(0, 80)}` : ""}`,
ev.agentId,
);
const msgs: object[] = [{ type: "agent_state", agentId: ev.agentId, state: ev.state }];
if (ev.taskSummary) {
msgs.push({ type: "agent_task_summary", agentId: ev.agentId, summary: ev.taskSummary });
}
return msgs;
}
default:
return null;
}
}
function send(socket: WebSocket, payload: object): void {
if (socket.readyState === 1) {
socket.send(JSON.stringify(payload));
}
}
function broadcast(socket: WebSocket, ev: BusEvent): void {
const out = translateEvent(ev);
if (!out) return;
const messages = Array.isArray(out) ? out : [out];
for (const msg of messages) {
send(socket, msg);
}
}
async function sendWorldStateBootstrap(socket: WebSocket): Promise<void> {
try {
const state = getWorldState();
const { desc } = await import("drizzle-orm");
const recent = await db
.select()
.from(worldEvents)
.orderBy(desc(worldEvents.createdAt))
.limit(20);
send(socket, {
type: "world_state",
...state,
recentEvents: recent.reverse(),
});
} catch {
send(socket, { type: "world_state", ...getWorldState(), recentEvents: [] });
}
}
export function attachWebSocketServer(server: Server): void {
const wss = new WebSocketServer({ server, path: "/api/ws" });
wss.on("connection", (socket: WebSocket, req: IncomingMessage) => {
const ip = req.headers["x-forwarded-for"] ?? req.socket.remoteAddress ?? "unknown";
logger.info("ws client connected", { ip, clients: wss.clients.size });
void sendWorldStateBootstrap(socket);
const busHandler = (ev: BusEvent) => broadcast(socket, ev);
eventBus.on("bus", busHandler);
const pingTimer = setInterval(() => {
send(socket, { type: "ping" });
}, PING_INTERVAL_MS);
socket.on("message", (raw) => {
try {
const msg = JSON.parse(raw.toString()) as { type?: string; text?: string; visitorId?: string; npub?: string };
if (msg.type === "pong") return;
if (msg.type === "subscribe") {
send(socket, { type: "agent_count", count: wss.clients.size });
}
if (msg.type === "visitor_enter") {
const { visitorId, npub } = msg;
if (visitorId && npub) {
connectedVisitors.set(visitorId, npub);
const formattedNpub = `${npub.slice(0, 8)}${npub.slice(-4)}`;
broadcastToAll(wss, { type: "chat", agentId: "timmy", text: `Welcome, Nostr user ${formattedNpub}! What can I help you with?` });
}
wss.clients.forEach(c => {
if (c !== socket && c.readyState === 1) {
c.send(JSON.stringify({ type: "visitor_count", count: wss.clients.size }));
}
});
send(socket, { type: "visitor_count", count: wss.clients.size });
}
if (msg.type === "visitor_leave") {
const { visitorId } = msg;
if (visitorId) {
connectedVisitors.delete(visitorId);
}
wss.clients.forEach(c => {
if (c !== socket && c.readyState === 1) {
c.send(JSON.stringify({ type: "visitor_count", count: Math.max(0, wss.clients.size - 1) }));
}
});
}
if (msg.type === "visitor_message" && msg.text) {
const text = String(msg.text).slice(0, 500);
// Broadcast visitor message to all watchers
broadcastToAll(wss, { type: "chat", agentId: "visitor", text });
// Rate-limit Timmy's AI replies — key on server-trusted IP, not
// client-provided visitorId (which is trivially spoofable).
const ipStr = Array.isArray(ip) ? (ip[0] ?? "unknown") : String(ip).split(",")[0]!.trim();
if (!checkChatRateLimit(ipStr)) {
send(socket, {
type: "chat",
agentId: "timmy",
text: "I need a moment to gather my thoughts… try again shortly.",
});
return;
}
// Fire-and-forget AI reply
void (async () => {
try {
// Signal that Timmy is thinking
broadcastToAll(wss, { type: "agent_state", agentId: "gamma", state: "working" });
updateAgentWorld("gamma", "working");
const reply = await agentService.chatReply(text);
broadcastToAll(wss, { type: "agent_state", agentId: "gamma", state: "idle" });
updateAgentWorld("gamma", "idle");
broadcastToAll(wss, { type: "chat", agentId: "timmy", text: reply });
void logWorldEvent("visitor:reply", reply.slice(0, 100), "timmy");
} catch (err) {
broadcastToAll(wss, { type: "agent_state", agentId: "gamma", state: "idle" });
updateAgentWorld("gamma", "idle");
logger.warn("chatReply failed", { err: String(err) });
}
})();
}
} catch {
/* ignore malformed messages */
}
});
socket.on("close", () => {
clearInterval(pingTimer);
eventBus.off("bus", busHandler);
logger.info("ws client disconnected", { clients: wss.clients.size - 1 });
});
socket.on("error", (err) => {
logger.warn("ws socket error", { err: err.message });
});
});
// ── Global commentary listener (set up once per server, not per socket) ────
// Watches job lifecycle events and fires Haiku commentary to all clients.
eventBus.on("bus", (ev: BusEvent) => {
let agentId: string | null = null;
let phase: string | null = null;
let jobId: string | null = null;
if (ev.type === "job:state") {
jobId = ev.jobId;
if (ev.state === "evaluating") {
// Beta evaluating + Alpha routing
void (async () => {
const [betaText, alphaText] = await Promise.all([
agentService.generateCommentary("beta", "evaluating"),
agentService.generateCommentary("alpha", "routing"),
]);
if (betaText) broadcastToAll(wss, { type: "agent_commentary", agentId: "beta", jobId, text: betaText });
if (alphaText) broadcastToAll(wss, { type: "agent_commentary", agentId: "alpha", jobId, text: alphaText });
})();
return;
}
if (ev.state === "executing") {
agentId = "gamma"; phase = "starting";
} else if (ev.state === "complete") {
agentId = "alpha"; phase = "complete";
} else if (ev.state === "rejected") {
agentId = "alpha"; phase = "rejected";
}
} else if (ev.type === "job:paid") {
jobId = ev.jobId;
agentId = "delta";
phase = ev.invoiceType === "eval" ? "eval_paid" : "work_paid";
}
if (agentId && phase && jobId) {
const capturedAgentId = agentId;
const capturedPhase = phase;
const capturedJobId = jobId;
void (async () => {
const text = await agentService.generateCommentary(capturedAgentId, capturedPhase);
if (text) broadcastToAll(wss, { type: "agent_commentary", agentId: capturedAgentId, jobId: capturedJobId, text });
})();
}
});
logger.info("WebSocket server attached at /api/ws");
}