[claude] fix SSE stream registry race condition at 60-second timeout boundary (#16) #56

Merged
Rockachopa merged 2 commits from claude/issue-16 into main 2026-03-23 14:52:55 +00:00
3 changed files with 212 additions and 2 deletions

View File

@@ -0,0 +1,158 @@
import { describe, it, beforeEach } from "node:test";
import assert from "node:assert/strict";
import { PassThrough } from "stream";
// ── Inline StreamRegistry (isolated from singleton) ──────────────────────────
interface StreamEntry {
stream: PassThrough;
createdAt: number;
}
class StreamRegistry {
private readonly streams = new Map<string, StreamEntry>();
private readonly TTL_MS = 5 * 60 * 1000;
register(jobId: string): PassThrough {
const existing = this.streams.get(jobId);
if (existing) existing.stream.destroy();
const stream = new PassThrough();
this.streams.set(jobId, { stream, createdAt: Date.now() });
stream.on("close", () => { this.streams.delete(jobId); });
return stream;
}
get(jobId: string): PassThrough | null {
return this.streams.get(jobId)?.stream ?? null;
}
write(jobId: string, chunk: string): void {
this.streams.get(jobId)?.stream.write(chunk);
}
end(jobId: string): void {
const entry = this.streams.get(jobId);
if (entry) {
entry.stream.end();
this.streams.delete(jobId);
}
}
}
// ── Tests ────────────────────────────────────────────────────────────────────
describe("StreamRegistry", () => {
let registry: StreamRegistry;
beforeEach(() => {
registry = new StreamRegistry();
});
it("register → get returns a valid stream", () => {
registry.register("job-1");
const stream = registry.get("job-1");
assert.ok(stream, "stream should exist after register");
assert.ok(stream instanceof PassThrough);
});
it("end() makes get() return null", () => {
registry.register("job-1");
registry.end("job-1");
assert.equal(registry.get("job-1"), null, "stream should be null after end()");
});
it("write() delivers data to the stream", async () => {
registry.register("job-2");
const stream = registry.get("job-2")!;
const chunks: string[] = [];
stream.on("data", (chunk: Buffer) => chunks.push(chunk.toString()));
registry.write("job-2", "hello ");
registry.write("job-2", "world");
registry.end("job-2");
// Wait for end event so all data events have fired
await new Promise<void>((resolve) => stream.on("end", resolve));
assert.equal(chunks.join(""), "hello world");
});
it("registry is empty after end() — no leak", () => {
registry.register("job-3");
registry.end("job-3");
assert.equal(registry.get("job-3"), null);
});
});
describe("Race condition: instant stub completion", () => {
let registry: StreamRegistry;
beforeEach(() => {
registry = new StreamRegistry();
});
it("stream that completes before client attaches is not readable", () => {
// Simulate: register → write → end (instant stub work), then client tries to attach
registry.register("race-job");
registry.write("race-job", "full result text");
registry.end("race-job");
// Client arrives after work is done — stream slot is gone
const stream = registry.get("race-job");
assert.equal(stream, null, "stream should be null — work completed before client attached");
// This proves the race: the SSE handler must fall back to DB replay
});
it("client attaching before end() receives all data", async () => {
registry.register("live-job");
const stream = registry.get("live-job")!;
assert.ok(stream, "stream should be available before end()");
const chunks: string[] = [];
const ended = new Promise<void>((resolve) => {
stream.on("data", (chunk: Buffer) => chunks.push(chunk.toString()));
stream.on("end", resolve);
});
// Simulate streaming tokens then completing
registry.write("live-job", "token1");
registry.write("live-job", "token2");
registry.end("live-job");
await ended;
assert.equal(chunks.join(""), "token1token2");
assert.equal(registry.get("live-job"), null, "registry should be clean after end");
});
it("SSE fallback: simulates DB polling after stream race", async () => {
// This simulates the full SSE handler flow for the race condition:
// 1. Stream registered + work starts
// 2. Work completes instantly (stub mode) — stream ended
// 3. SSE handler wakes up, finds no stream
// 4. Falls back to polling DB → finds completed job → replays result
registry.register("stub-job");
registry.write("stub-job", "The answer is 42.");
registry.end("stub-job");
// Simulate SSE handler waking up after bus event
const stream = registry.get("stub-job");
assert.equal(stream, null, "stream gone — race condition triggered");
// Simulate DB state after work completed
const mockDbResult = { state: "complete" as const, result: "The answer is 42." };
// SSE handler falls back to DB replay
const sseEvents: Array<{ event: string; data: unknown }> = [];
const sendEvent = (event: string, data: unknown) => sseEvents.push({ event, data });
if (!stream && mockDbResult.state === "complete" && mockDbResult.result) {
sendEvent("token", { text: mockDbResult.result });
sendEvent("done", { jobId: "stub-job", state: "complete" });
}
assert.equal(sseEvents.length, 2, "should emit token + done events");
assert.deepEqual(sseEvents[0], { event: "token", data: { text: "The answer is 42." } });
assert.deepEqual(sseEvents[1], { event: "done", data: { jobId: "stub-job", state: "complete" } });
});
});

View File

@@ -37,10 +37,19 @@ class StreamRegistry {
const entry = this.streams.get(jobId);
if (entry) {
entry.stream.end();
this.streams.delete(jobId);
// Don't delete from map here — the "close" listener set in register()
// handles cleanup once the consumer has drained all buffered data.
// This prevents a race where a late-attaching SSE client calls get()
// after end() but before consuming buffered tokens (#16).
}
}
/** Returns true if a stream was registered and its writable side has ended. */
hasEnded(jobId: string): boolean {
const entry = this.streams.get(jobId);
return entry ? entry.stream.writableEnded : false;
}
private evictExpired(): void {
const now = Date.now();
for (const [id, entry] of this.streams.entries()) {

View File

@@ -714,7 +714,7 @@ router.post("/jobs/:id/refund", async (req: Request, res: Response) => {
// ── GET /jobs/:id/stream ──────────────────────────────────────────────────────
// Server-Sent Events (#3): streams Claude token deltas in real time while the
// job is executing. If the job is already complete, sends the full result then
// closes. If the job isn't executing yet, waits up to 60 s for it to start.
// closes. If the job isn't executing yet, waits up to 90 s for it to start.
router.get("/jobs/:id/stream", async (req: Request, res: Response) => {
const paramResult = GetJobParams.safeParse(req.params);
@@ -828,6 +828,49 @@ router.get("/jobs/:id/stream", async (req: Request, res: Response) => {
return;
}
// ── Resolve: job executing but stream already ended (race condition) ─────
// This happens when stub mode (or very fast work) completes before the SSE
// client attaches to the stream. Poll DB until the job reaches a terminal
// state, then replay the result.
if (currentJob.state === "executing") {
logger.warn("stream ended before SSE client attached — polling DB for result", { jobId: id });
const pollStart = Date.now();
const POLL_INTERVAL_MS = 2_000;
const POLL_MAX_MS = 120_000;
const finalJob = await new Promise<Job | null>((resolve) => {
const tick = async () => {
const fresh = await getJobById(id);
if (!fresh || fresh.state === "complete" || fresh.state === "failed") {
resolve(fresh);
return;
}
if (Date.now() - pollStart >= POLL_MAX_MS) {
resolve(fresh);
return;
}
setTimeout(tick, POLL_INTERVAL_MS);
};
void tick();
});
if (finalJob?.state === "complete" && finalJob.result) {
sendEvent("token", { text: finalJob.result });
sendEvent("done", { jobId: id, state: "complete" });
res.end();
cleanup();
return;
}
if (finalJob?.state === "failed") {
sendEvent("error", { jobId: id, message: finalJob.errorMessage ?? "Job failed" });
res.end();
cleanup();
return;
}
}
// ── Resolve: timeout with no activity — tell client to fall back to polling
sendEvent("error", {
jobId: id,