feat(integration): WS bridge + Tower + payment panel + E2E test [10/10 PASS] (#26)

This commit is contained in:
2026-03-18 21:20:51 -04:00
parent 3031c399ee
commit e088ca4cd8
11 changed files with 974 additions and 37 deletions

View File

@@ -0,0 +1,187 @@
/**
* /api/ws — WebSocket bridge from the internal EventBus to connected Matrix clients.
*
* Protocol (server → client):
* { type: "agent_state", agentId: string, state: "idle"|"active"|"working"|"thinking" }
* { type: "job_started", jobId: string, agentId: string }
* { type: "job_completed", jobId: string, agentId: string }
* { type: "chat", agentId: string, text: string }
* { type: "ping" }
*
* Protocol (client → server):
* { type: "subscribe", channel: "agents", clientId: string }
* { type: "pong" }
*
* Agent mapping (Matrix IDs → Timmy roles):
* alpha — orchestrator (overall job lifecycle)
* beta — eval (Haiku judge)
* gamma — work (Sonnet executor)
* delta — lightning (payment monitor / invoice watcher)
*/
import type { IncomingMessage } from "http";
import type { WebSocket } from "ws";
import { WebSocketServer } from "ws";
import type { Server } from "http";
import { eventBus, type BusEvent } from "../lib/event-bus.js";
import { makeLogger } from "../lib/logger.js";
const logger = makeLogger("ws-events");
const PING_INTERVAL_MS = 30_000;
function translateEvent(ev: BusEvent): object | null {
switch (ev.type) {
// ── Mode 1 job lifecycle ─────────────────────────────────────────────────
case "job:state": {
if (ev.state === "evaluating") {
return [
{ type: "agent_state", agentId: "alpha", state: "active" },
{ type: "agent_state", agentId: "beta", state: "thinking" },
{ type: "job_started", jobId: ev.jobId, agentId: "beta" },
];
}
if (ev.state === "awaiting_eval_payment") {
return { type: "agent_state", agentId: "alpha", state: "active" };
}
if (ev.state === "awaiting_work_payment") {
return [
{ type: "agent_state", agentId: "beta", state: "idle" },
{ type: "agent_state", agentId: "delta", state: "active" },
];
}
if (ev.state === "executing") {
return [
{ type: "agent_state", agentId: "delta", state: "idle" },
{ type: "agent_state", agentId: "gamma", state: "working" },
];
}
if (ev.state === "complete") {
return [
{ type: "agent_state", agentId: "gamma", state: "idle" },
{ type: "agent_state", agentId: "alpha", state: "idle" },
{ type: "job_completed", jobId: ev.jobId, agentId: "gamma" },
];
}
if (ev.state === "rejected" || ev.state === "failed") {
return [
{ type: "agent_state", agentId: "beta", state: "idle" },
{ type: "agent_state", agentId: "gamma", state: "idle" },
{ type: "agent_state", agentId: "alpha", state: "idle" },
{ type: "agent_state", agentId: "delta", state: "idle" },
];
}
return null;
}
case "job:completed":
return [
{ type: "agent_state", agentId: "gamma", state: "idle" },
{ type: "agent_state", agentId: "alpha", state: "idle" },
{ type: "chat", agentId: "gamma", text: `Job ${ev.jobId.slice(0, 8)} complete` },
];
case "job:failed":
return [
{ type: "agent_state", agentId: "alpha", state: "idle" },
{ type: "agent_state", agentId: "beta", state: "idle" },
{ type: "agent_state", agentId: "gamma", state: "idle" },
{ type: "agent_state", agentId: "delta", state: "idle" },
{ type: "chat", agentId: "alpha", text: `Job ${ev.jobId.slice(0, 8)} failed: ${ev.reason}` },
];
case "job:paid":
if (ev.invoiceType === "eval") {
return [
{ type: "agent_state", agentId: "delta", state: "idle" },
{ type: "agent_state", agentId: "beta", state: "thinking" },
{ type: "chat", agentId: "delta", text: "⚡ Eval payment confirmed" },
];
}
if (ev.invoiceType === "work") {
return [
{ type: "agent_state", agentId: "delta", state: "idle" },
{ type: "agent_state", agentId: "gamma", state: "working" },
{ type: "chat", agentId: "delta", text: "⚡ Work payment confirmed" },
];
}
return null;
// ── Mode 2 session lifecycle ─────────────────────────────────────────────
case "session:paid":
return { type: "chat", agentId: "delta", text: `⚡ Session funded: ${ev.amountSats} sats` };
case "session:balance":
return {
type: "chat",
agentId: "delta",
text: `Balance: ${ev.balanceSats} sats remaining`,
};
case "session:state":
if (ev.state === "active") {
return { type: "agent_state", agentId: "delta", state: "idle" };
}
if (ev.state === "paused") {
return {
type: "chat",
agentId: "delta",
text: "Session paused — balance low. Top up to continue.",
};
}
return null;
default:
return null;
}
}
function send(socket: WebSocket, payload: object): void {
if (socket.readyState === 1) {
socket.send(JSON.stringify(payload));
}
}
function broadcast(socket: WebSocket, ev: BusEvent): void {
const out = translateEvent(ev);
if (!out) return;
const messages = Array.isArray(out) ? out : [out];
for (const msg of messages) {
send(socket, msg);
}
}
export function attachWebSocketServer(server: Server): void {
const wss = new WebSocketServer({ server, path: "/api/ws" });
wss.on("connection", (socket: WebSocket, req: IncomingMessage) => {
const ip = req.headers["x-forwarded-for"] ?? req.socket.remoteAddress ?? "unknown";
logger.info("ws client connected", { ip, clients: wss.clients.size });
const busHandler = (ev: BusEvent) => broadcast(socket, ev);
eventBus.on("bus", busHandler);
const pingTimer = setInterval(() => {
send(socket, { type: "ping" });
}, PING_INTERVAL_MS);
socket.on("message", (raw) => {
try {
const msg = JSON.parse(raw.toString()) as { type?: string };
if (msg.type === "pong") return;
if (msg.type === "subscribe") {
send(socket, { type: "agent_count", count: wss.clients.size });
}
} catch {
/* ignore malformed messages */
}
});
socket.on("close", () => {
clearInterval(pingTimer);
eventBus.off("bus", busHandler);
logger.info("ws client disconnected", { clients: wss.clients.size - 1 });
});
socket.on("error", (err) => {
logger.warn("ws socket error", { err: err.message });
});
});
logger.info("WebSocket server attached at /api/ws");
}