[claude] fix SSE stream registry race condition at 60-second timeout boundary (#16) #56
158
artifacts/api-server/src/lib/stream-registry.test.ts
Normal file
158
artifacts/api-server/src/lib/stream-registry.test.ts
Normal file
@@ -0,0 +1,158 @@
|
||||
import { describe, it, beforeEach } from "node:test";
|
||||
import assert from "node:assert/strict";
|
||||
import { PassThrough } from "stream";
|
||||
|
||||
// ── Inline StreamRegistry (isolated from singleton) ──────────────────────────
|
||||
|
||||
interface StreamEntry {
|
||||
stream: PassThrough;
|
||||
createdAt: number;
|
||||
}
|
||||
|
||||
class StreamRegistry {
|
||||
private readonly streams = new Map<string, StreamEntry>();
|
||||
private readonly TTL_MS = 5 * 60 * 1000;
|
||||
|
||||
register(jobId: string): PassThrough {
|
||||
const existing = this.streams.get(jobId);
|
||||
if (existing) existing.stream.destroy();
|
||||
const stream = new PassThrough();
|
||||
this.streams.set(jobId, { stream, createdAt: Date.now() });
|
||||
stream.on("close", () => { this.streams.delete(jobId); });
|
||||
return stream;
|
||||
}
|
||||
|
||||
get(jobId: string): PassThrough | null {
|
||||
return this.streams.get(jobId)?.stream ?? null;
|
||||
}
|
||||
|
||||
write(jobId: string, chunk: string): void {
|
||||
this.streams.get(jobId)?.stream.write(chunk);
|
||||
}
|
||||
|
||||
end(jobId: string): void {
|
||||
const entry = this.streams.get(jobId);
|
||||
if (entry) {
|
||||
entry.stream.end();
|
||||
this.streams.delete(jobId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Tests ────────────────────────────────────────────────────────────────────
|
||||
|
||||
describe("StreamRegistry", () => {
|
||||
let registry: StreamRegistry;
|
||||
|
||||
beforeEach(() => {
|
||||
registry = new StreamRegistry();
|
||||
});
|
||||
|
||||
it("register → get returns a valid stream", () => {
|
||||
registry.register("job-1");
|
||||
const stream = registry.get("job-1");
|
||||
assert.ok(stream, "stream should exist after register");
|
||||
assert.ok(stream instanceof PassThrough);
|
||||
});
|
||||
|
||||
it("end() makes get() return null", () => {
|
||||
registry.register("job-1");
|
||||
registry.end("job-1");
|
||||
assert.equal(registry.get("job-1"), null, "stream should be null after end()");
|
||||
});
|
||||
|
||||
it("write() delivers data to the stream", async () => {
|
||||
registry.register("job-2");
|
||||
const stream = registry.get("job-2")!;
|
||||
|
||||
const chunks: string[] = [];
|
||||
stream.on("data", (chunk: Buffer) => chunks.push(chunk.toString()));
|
||||
|
||||
registry.write("job-2", "hello ");
|
||||
registry.write("job-2", "world");
|
||||
registry.end("job-2");
|
||||
|
||||
// Wait for end event so all data events have fired
|
||||
await new Promise<void>((resolve) => stream.on("end", resolve));
|
||||
assert.equal(chunks.join(""), "hello world");
|
||||
});
|
||||
|
||||
it("registry is empty after end() — no leak", () => {
|
||||
registry.register("job-3");
|
||||
registry.end("job-3");
|
||||
assert.equal(registry.get("job-3"), null);
|
||||
});
|
||||
});
|
||||
|
||||
describe("Race condition: instant stub completion", () => {
|
||||
let registry: StreamRegistry;
|
||||
|
||||
beforeEach(() => {
|
||||
registry = new StreamRegistry();
|
||||
});
|
||||
|
||||
it("stream that completes before client attaches is not readable", () => {
|
||||
// Simulate: register → write → end (instant stub work), then client tries to attach
|
||||
registry.register("race-job");
|
||||
registry.write("race-job", "full result text");
|
||||
registry.end("race-job");
|
||||
|
||||
// Client arrives after work is done — stream slot is gone
|
||||
const stream = registry.get("race-job");
|
||||
assert.equal(stream, null, "stream should be null — work completed before client attached");
|
||||
// This proves the race: the SSE handler must fall back to DB replay
|
||||
});
|
||||
|
||||
it("client attaching before end() receives all data", async () => {
|
||||
registry.register("live-job");
|
||||
const stream = registry.get("live-job")!;
|
||||
assert.ok(stream, "stream should be available before end()");
|
||||
|
||||
const chunks: string[] = [];
|
||||
const ended = new Promise<void>((resolve) => {
|
||||
stream.on("data", (chunk: Buffer) => chunks.push(chunk.toString()));
|
||||
stream.on("end", resolve);
|
||||
});
|
||||
|
||||
// Simulate streaming tokens then completing
|
||||
registry.write("live-job", "token1");
|
||||
registry.write("live-job", "token2");
|
||||
registry.end("live-job");
|
||||
|
||||
await ended;
|
||||
assert.equal(chunks.join(""), "token1token2");
|
||||
assert.equal(registry.get("live-job"), null, "registry should be clean after end");
|
||||
});
|
||||
|
||||
it("SSE fallback: simulates DB polling after stream race", async () => {
|
||||
// This simulates the full SSE handler flow for the race condition:
|
||||
// 1. Stream registered + work starts
|
||||
// 2. Work completes instantly (stub mode) — stream ended
|
||||
// 3. SSE handler wakes up, finds no stream
|
||||
// 4. Falls back to polling DB → finds completed job → replays result
|
||||
|
||||
registry.register("stub-job");
|
||||
registry.write("stub-job", "The answer is 42.");
|
||||
registry.end("stub-job");
|
||||
|
||||
// Simulate SSE handler waking up after bus event
|
||||
const stream = registry.get("stub-job");
|
||||
assert.equal(stream, null, "stream gone — race condition triggered");
|
||||
|
||||
// Simulate DB state after work completed
|
||||
const mockDbResult = { state: "complete" as const, result: "The answer is 42." };
|
||||
|
||||
// SSE handler falls back to DB replay
|
||||
const sseEvents: Array<{ event: string; data: unknown }> = [];
|
||||
const sendEvent = (event: string, data: unknown) => sseEvents.push({ event, data });
|
||||
|
||||
if (!stream && mockDbResult.state === "complete" && mockDbResult.result) {
|
||||
sendEvent("token", { text: mockDbResult.result });
|
||||
sendEvent("done", { jobId: "stub-job", state: "complete" });
|
||||
}
|
||||
|
||||
assert.equal(sseEvents.length, 2, "should emit token + done events");
|
||||
assert.deepEqual(sseEvents[0], { event: "token", data: { text: "The answer is 42." } });
|
||||
assert.deepEqual(sseEvents[1], { event: "done", data: { jobId: "stub-job", state: "complete" } });
|
||||
});
|
||||
});
|
||||
@@ -37,10 +37,19 @@ class StreamRegistry {
|
||||
const entry = this.streams.get(jobId);
|
||||
if (entry) {
|
||||
entry.stream.end();
|
||||
this.streams.delete(jobId);
|
||||
// Don't delete from map here — the "close" listener set in register()
|
||||
// handles cleanup once the consumer has drained all buffered data.
|
||||
// This prevents a race where a late-attaching SSE client calls get()
|
||||
// after end() but before consuming buffered tokens (#16).
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns true if a stream was registered and its writable side has ended. */
|
||||
hasEnded(jobId: string): boolean {
|
||||
const entry = this.streams.get(jobId);
|
||||
return entry ? entry.stream.writableEnded : false;
|
||||
}
|
||||
|
||||
private evictExpired(): void {
|
||||
const now = Date.now();
|
||||
for (const [id, entry] of this.streams.entries()) {
|
||||
|
||||
@@ -714,7 +714,7 @@ router.post("/jobs/:id/refund", async (req: Request, res: Response) => {
|
||||
// ── GET /jobs/:id/stream ──────────────────────────────────────────────────────
|
||||
// Server-Sent Events (#3): streams Claude token deltas in real time while the
|
||||
// job is executing. If the job is already complete, sends the full result then
|
||||
// closes. If the job isn't executing yet, waits up to 60 s for it to start.
|
||||
// closes. If the job isn't executing yet, waits up to 90 s for it to start.
|
||||
|
||||
router.get("/jobs/:id/stream", async (req: Request, res: Response) => {
|
||||
const paramResult = GetJobParams.safeParse(req.params);
|
||||
@@ -828,6 +828,49 @@ router.get("/jobs/:id/stream", async (req: Request, res: Response) => {
|
||||
return;
|
||||
}
|
||||
|
||||
// ── Resolve: job executing but stream already ended (race condition) ─────
|
||||
// This happens when stub mode (or very fast work) completes before the SSE
|
||||
// client attaches to the stream. Poll DB until the job reaches a terminal
|
||||
// state, then replay the result.
|
||||
if (currentJob.state === "executing") {
|
||||
logger.warn("stream ended before SSE client attached — polling DB for result", { jobId: id });
|
||||
|
||||
const pollStart = Date.now();
|
||||
const POLL_INTERVAL_MS = 2_000;
|
||||
const POLL_MAX_MS = 120_000;
|
||||
|
||||
const finalJob = await new Promise<Job | null>((resolve) => {
|
||||
const tick = async () => {
|
||||
const fresh = await getJobById(id);
|
||||
if (!fresh || fresh.state === "complete" || fresh.state === "failed") {
|
||||
resolve(fresh);
|
||||
return;
|
||||
}
|
||||
if (Date.now() - pollStart >= POLL_MAX_MS) {
|
||||
resolve(fresh);
|
||||
return;
|
||||
}
|
||||
setTimeout(tick, POLL_INTERVAL_MS);
|
||||
};
|
||||
void tick();
|
||||
});
|
||||
|
||||
if (finalJob?.state === "complete" && finalJob.result) {
|
||||
sendEvent("token", { text: finalJob.result });
|
||||
sendEvent("done", { jobId: id, state: "complete" });
|
||||
res.end();
|
||||
cleanup();
|
||||
return;
|
||||
}
|
||||
|
||||
if (finalJob?.state === "failed") {
|
||||
sendEvent("error", { jobId: id, message: finalJob.errorMessage ?? "Job failed" });
|
||||
res.end();
|
||||
cleanup();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// ── Resolve: timeout with no activity — tell client to fall back to polling
|
||||
sendEvent("error", {
|
||||
jobId: id,
|
||||
|
||||
Reference in New Issue
Block a user