diff --git a/artifacts/api-server/src/lib/stream-registry.test.ts b/artifacts/api-server/src/lib/stream-registry.test.ts new file mode 100644 index 0000000..fccab7b --- /dev/null +++ b/artifacts/api-server/src/lib/stream-registry.test.ts @@ -0,0 +1,166 @@ +/** + * Unit test: stream registry race condition at completion boundary (#16) + * + * Simulates the scenario where work completes (stream.end()) before a consumer + * attaches. Verifies that buffered data is still readable by a late consumer. + * + * Run: npx tsx artifacts/api-server/src/lib/stream-registry.test.ts + */ +import { PassThrough } from "stream"; + +// Inline minimal StreamRegistry to test in isolation (no side-effect imports) +class StreamRegistry { + private readonly streams = new Map(); + + register(jobId: string): PassThrough { + const existing = this.streams.get(jobId); + if (existing) existing.stream.destroy(); + const stream = new PassThrough(); + this.streams.set(jobId, { stream, createdAt: Date.now() }); + stream.on("close", () => { this.streams.delete(jobId); }); + return stream; + } + + get(jobId: string): PassThrough | null { + return this.streams.get(jobId)?.stream ?? null; + } + + write(jobId: string, chunk: string): void { + this.streams.get(jobId)?.stream.write(chunk); + } + + end(jobId: string): void { + const entry = this.streams.get(jobId); + if (entry) { + entry.stream.end(); + // NOT deleting from map — "close" listener handles cleanup after consumer drains + } + } + + hasEnded(jobId: string): boolean { + const entry = this.streams.get(jobId); + return entry ? entry.stream.writableEnded : false; + } +} + +let passed = 0; +let failed = 0; + +function assert(condition: boolean, msg: string) { + if (!condition) { + console.error(` FAIL: ${msg}`); + failed++; + } else { + console.log(` PASS: ${msg}`); + passed++; + } +} + +async function testRaceCondition() { + console.log("Test: late consumer receives buffered data after stream.end()"); + + const registry = new StreamRegistry(); + const jobId = "test-job-1"; + + // 1. Register stream and write data + end (simulating instant stub completion) + registry.register(jobId); + registry.write(jobId, "Hello "); + registry.write(jobId, "world"); + registry.end(jobId); + + // 2. Verify stream is still retrievable (not deleted from map) + const stream = registry.get(jobId); + assert(stream !== null, "stream should still be in registry after end()"); + assert(registry.hasEnded(jobId), "hasEnded() should return true"); + + // 3. Late consumer attaches and reads buffered data + const chunks: string[] = []; + await new Promise((resolve) => { + stream!.on("data", (chunk: Buffer) => { + chunks.push(chunk.toString("utf8")); + }); + stream!.on("end", () => { + resolve(); + }); + }); + + const fullText = chunks.join(""); + assert(fullText === "Hello world", `consumer received full text: "${fullText}"`); +} + +async function testNormalFlow() { + console.log("Test: consumer attached before end() receives live data"); + + const registry = new StreamRegistry(); + const jobId = "test-job-2"; + + registry.register(jobId); + + const chunks: string[] = []; + const stream = registry.get(jobId)!; + + const done = new Promise((resolve) => { + stream.on("data", (chunk: Buffer) => { + chunks.push(chunk.toString("utf8")); + }); + stream.on("end", () => { + resolve(); + }); + }); + + // Write after consumer attached + registry.write(jobId, "token1 "); + registry.write(jobId, "token2"); + registry.end(jobId); + + await done; + + const fullText = chunks.join(""); + assert(fullText === "token1 token2", `live consumer received: "${fullText}"`); +} + +async function testCleanupAfterConsume() { + console.log("Test: stream removed from registry after consumer finishes"); + + const registry = new StreamRegistry(); + const jobId = "test-job-3"; + + registry.register(jobId); + registry.write(jobId, "data"); + registry.end(jobId); + + const stream = registry.get(jobId)!; + + await new Promise((resolve) => { + stream.on("data", () => {}); + stream.on("end", () => resolve()); + }); + + // After consuming, wait for close event to fire (nextTick) + await new Promise((r) => setTimeout(r, 50)); + + assert(registry.get(jobId) === null, "stream removed from registry after drain"); +} + +async function testNoStreamReturnsNull() { + console.log("Test: get() returns null for unknown job"); + + const registry = new StreamRegistry(); + assert(registry.get("nonexistent") === null, "returns null for unknown job"); + assert(!registry.hasEnded("nonexistent"), "hasEnded returns false for unknown job"); +} + +async function main() { + await testRaceCondition(); + await testNormalFlow(); + await testCleanupAfterConsume(); + await testNoStreamReturnsNull(); + + console.log(`\nResults: ${passed} passed, ${failed} failed`); + if (failed > 0) process.exit(1); +} + +main().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/artifacts/api-server/src/lib/stream-registry.ts b/artifacts/api-server/src/lib/stream-registry.ts index 7d02eec..45dbfa2 100644 --- a/artifacts/api-server/src/lib/stream-registry.ts +++ b/artifacts/api-server/src/lib/stream-registry.ts @@ -37,10 +37,19 @@ class StreamRegistry { const entry = this.streams.get(jobId); if (entry) { entry.stream.end(); - this.streams.delete(jobId); + // Don't delete from map here — the "close" listener set in register() + // handles cleanup once the consumer has drained all buffered data. + // This prevents a race where a late-attaching SSE client calls get() + // after end() but before consuming buffered tokens (#16). } } + /** Returns true if a stream was registered and its writable side has ended. */ + hasEnded(jobId: string): boolean { + const entry = this.streams.get(jobId); + return entry ? entry.stream.writableEnded : false; + } + private evictExpired(): void { const now = Date.now(); for (const [id, entry] of this.streams.entries()) { diff --git a/artifacts/api-server/src/routes/jobs.ts b/artifacts/api-server/src/routes/jobs.ts index 76aaba4..760b8e2 100644 --- a/artifacts/api-server/src/routes/jobs.ts +++ b/artifacts/api-server/src/routes/jobs.ts @@ -714,7 +714,7 @@ router.post("/jobs/:id/refund", async (req: Request, res: Response) => { // ── GET /jobs/:id/stream ────────────────────────────────────────────────────── // Server-Sent Events (#3): streams Claude token deltas in real time while the // job is executing. If the job is already complete, sends the full result then -// closes. If the job isn't executing yet, waits up to 60 s for it to start. +// closes. If the job isn't executing yet, waits up to 90 s for it to start. router.get("/jobs/:id/stream", async (req: Request, res: Response) => { const paramResult = GetJobParams.safeParse(req.params); @@ -828,6 +828,44 @@ router.get("/jobs/:id/stream", async (req: Request, res: Response) => { return; } + // ── Resolve: job executing but stream gone — poll DB until result (#16) ─── + if (currentJob.state === "executing") { + logger.warn("stream slot gone while job still executing — polling DB for result", { jobId: id }); + const pollStart = Date.now(); + const POLL_INTERVAL = 2_000; + const POLL_MAX = 120_000; + + const pollResult = await new Promise((resolve) => { + const tick = async () => { + if (Date.now() - pollStart > POLL_MAX) { + resolve(null); + return; + } + const fresh = await getJobById(id); + if (fresh && (fresh.state === "complete" || fresh.state === "failed")) { + resolve(fresh); + return; + } + setTimeout(tick, POLL_INTERVAL); + }; + setTimeout(tick, POLL_INTERVAL); + }); + + if (pollResult?.state === "complete" && pollResult.result) { + sendEvent("token", { text: pollResult.result }); + sendEvent("done", { jobId: id, state: "complete" }); + res.end(); + cleanup(); + return; + } + if (pollResult?.state === "failed") { + sendEvent("error", { jobId: id, message: pollResult.errorMessage ?? "Job failed" }); + res.end(); + cleanup(); + return; + } + } + // ── Resolve: timeout with no activity — tell client to fall back to polling sendEvent("error", { jobId: id,