diff --git a/artifacts/api-server/src/lib/stream-registry.test.ts b/artifacts/api-server/src/lib/stream-registry.test.ts new file mode 100644 index 0000000..e5b41d7 --- /dev/null +++ b/artifacts/api-server/src/lib/stream-registry.test.ts @@ -0,0 +1,158 @@ +import { describe, it, beforeEach } from "node:test"; +import assert from "node:assert/strict"; +import { PassThrough } from "stream"; + +// ── Inline StreamRegistry (isolated from singleton) ────────────────────────── + +interface StreamEntry { + stream: PassThrough; + createdAt: number; +} + +class StreamRegistry { + private readonly streams = new Map(); + private readonly TTL_MS = 5 * 60 * 1000; + + register(jobId: string): PassThrough { + const existing = this.streams.get(jobId); + if (existing) existing.stream.destroy(); + const stream = new PassThrough(); + this.streams.set(jobId, { stream, createdAt: Date.now() }); + stream.on("close", () => { this.streams.delete(jobId); }); + return stream; + } + + get(jobId: string): PassThrough | null { + return this.streams.get(jobId)?.stream ?? null; + } + + write(jobId: string, chunk: string): void { + this.streams.get(jobId)?.stream.write(chunk); + } + + end(jobId: string): void { + const entry = this.streams.get(jobId); + if (entry) { + entry.stream.end(); + this.streams.delete(jobId); + } + } +} + +// ── Tests ──────────────────────────────────────────────────────────────────── + +describe("StreamRegistry", () => { + let registry: StreamRegistry; + + beforeEach(() => { + registry = new StreamRegistry(); + }); + + it("register → get returns a valid stream", () => { + registry.register("job-1"); + const stream = registry.get("job-1"); + assert.ok(stream, "stream should exist after register"); + assert.ok(stream instanceof PassThrough); + }); + + it("end() makes get() return null", () => { + registry.register("job-1"); + registry.end("job-1"); + assert.equal(registry.get("job-1"), null, "stream should be null after end()"); + }); + + it("write() delivers data to the stream", async () => { + registry.register("job-2"); + const stream = registry.get("job-2")!; + + const chunks: string[] = []; + stream.on("data", (chunk: Buffer) => chunks.push(chunk.toString())); + + registry.write("job-2", "hello "); + registry.write("job-2", "world"); + registry.end("job-2"); + + // Wait for end event so all data events have fired + await new Promise((resolve) => stream.on("end", resolve)); + assert.equal(chunks.join(""), "hello world"); + }); + + it("registry is empty after end() — no leak", () => { + registry.register("job-3"); + registry.end("job-3"); + assert.equal(registry.get("job-3"), null); + }); +}); + +describe("Race condition: instant stub completion", () => { + let registry: StreamRegistry; + + beforeEach(() => { + registry = new StreamRegistry(); + }); + + it("stream that completes before client attaches is not readable", () => { + // Simulate: register → write → end (instant stub work), then client tries to attach + registry.register("race-job"); + registry.write("race-job", "full result text"); + registry.end("race-job"); + + // Client arrives after work is done — stream slot is gone + const stream = registry.get("race-job"); + assert.equal(stream, null, "stream should be null — work completed before client attached"); + // This proves the race: the SSE handler must fall back to DB replay + }); + + it("client attaching before end() receives all data", async () => { + registry.register("live-job"); + const stream = registry.get("live-job")!; + assert.ok(stream, "stream should be available before end()"); + + const chunks: string[] = []; + const ended = new Promise((resolve) => { + stream.on("data", (chunk: Buffer) => chunks.push(chunk.toString())); + stream.on("end", resolve); + }); + + // Simulate streaming tokens then completing + registry.write("live-job", "token1"); + registry.write("live-job", "token2"); + registry.end("live-job"); + + await ended; + assert.equal(chunks.join(""), "token1token2"); + assert.equal(registry.get("live-job"), null, "registry should be clean after end"); + }); + + it("SSE fallback: simulates DB polling after stream race", async () => { + // This simulates the full SSE handler flow for the race condition: + // 1. Stream registered + work starts + // 2. Work completes instantly (stub mode) — stream ended + // 3. SSE handler wakes up, finds no stream + // 4. Falls back to polling DB → finds completed job → replays result + + registry.register("stub-job"); + registry.write("stub-job", "The answer is 42."); + registry.end("stub-job"); + + // Simulate SSE handler waking up after bus event + const stream = registry.get("stub-job"); + assert.equal(stream, null, "stream gone — race condition triggered"); + + // Simulate DB state after work completed + const mockDbResult = { state: "complete" as const, result: "The answer is 42." }; + + // SSE handler falls back to DB replay + const sseEvents: Array<{ event: string; data: unknown }> = []; + const sendEvent = (event: string, data: unknown) => sseEvents.push({ event, data }); + + if (!stream && mockDbResult.state === "complete" && mockDbResult.result) { + sendEvent("token", { text: mockDbResult.result }); + sendEvent("done", { jobId: "stub-job", state: "complete" }); + } + + assert.equal(sseEvents.length, 2, "should emit token + done events"); + assert.deepEqual(sseEvents[0], { event: "token", data: { text: "The answer is 42." } }); + assert.deepEqual(sseEvents[1], { event: "done", data: { jobId: "stub-job", state: "complete" } }); + }); +}); diff --git a/artifacts/api-server/src/lib/stream-registry.ts b/artifacts/api-server/src/lib/stream-registry.ts index 7d02eec..45dbfa2 100644 --- a/artifacts/api-server/src/lib/stream-registry.ts +++ b/artifacts/api-server/src/lib/stream-registry.ts @@ -37,10 +37,19 @@ class StreamRegistry { const entry = this.streams.get(jobId); if (entry) { entry.stream.end(); - this.streams.delete(jobId); + // Don't delete from map here — the "close" listener set in register() + // handles cleanup once the consumer has drained all buffered data. + // This prevents a race where a late-attaching SSE client calls get() + // after end() but before consuming buffered tokens (#16). } } + /** Returns true if a stream was registered and its writable side has ended. */ + hasEnded(jobId: string): boolean { + const entry = this.streams.get(jobId); + return entry ? entry.stream.writableEnded : false; + } + private evictExpired(): void { const now = Date.now(); for (const [id, entry] of this.streams.entries()) { diff --git a/artifacts/api-server/src/routes/jobs.ts b/artifacts/api-server/src/routes/jobs.ts index 76aaba4..3fd1a68 100644 --- a/artifacts/api-server/src/routes/jobs.ts +++ b/artifacts/api-server/src/routes/jobs.ts @@ -714,7 +714,7 @@ router.post("/jobs/:id/refund", async (req: Request, res: Response) => { // ── GET /jobs/:id/stream ────────────────────────────────────────────────────── // Server-Sent Events (#3): streams Claude token deltas in real time while the // job is executing. If the job is already complete, sends the full result then -// closes. If the job isn't executing yet, waits up to 60 s for it to start. +// closes. If the job isn't executing yet, waits up to 90 s for it to start. router.get("/jobs/:id/stream", async (req: Request, res: Response) => { const paramResult = GetJobParams.safeParse(req.params); @@ -828,6 +828,49 @@ router.get("/jobs/:id/stream", async (req: Request, res: Response) => { return; } + // ── Resolve: job executing but stream already ended (race condition) ───── + // This happens when stub mode (or very fast work) completes before the SSE + // client attaches to the stream. Poll DB until the job reaches a terminal + // state, then replay the result. + if (currentJob.state === "executing") { + logger.warn("stream ended before SSE client attached — polling DB for result", { jobId: id }); + + const pollStart = Date.now(); + const POLL_INTERVAL_MS = 2_000; + const POLL_MAX_MS = 120_000; + + const finalJob = await new Promise((resolve) => { + const tick = async () => { + const fresh = await getJobById(id); + if (!fresh || fresh.state === "complete" || fresh.state === "failed") { + resolve(fresh); + return; + } + if (Date.now() - pollStart >= POLL_MAX_MS) { + resolve(fresh); + return; + } + setTimeout(tick, POLL_INTERVAL_MS); + }; + void tick(); + }); + + if (finalJob?.state === "complete" && finalJob.result) { + sendEvent("token", { text: finalJob.result }); + sendEvent("done", { jobId: id, state: "complete" }); + res.end(); + cleanup(); + return; + } + + if (finalJob?.state === "failed") { + sendEvent("error", { jobId: id, message: finalJob.errorMessage ?? "Job failed" }); + res.end(); + cleanup(); + return; + } + } + // ── Resolve: timeout with no activity — tell client to fall back to polling sendEvent("error", { jobId: id,