fix: resolve SSE stream registry race condition at completion boundary
Some checks failed
CI / Typecheck & Lint (pull_request) Failing after 1s
Some checks failed
CI / Typecheck & Lint (pull_request) Failing after 1s
Fixes #16 1. stream-registry: Don't delete stream from map in end() — let the "close" event handle cleanup after consumers drain buffered data. This prevents the race where a late-attaching SSE client calls get() after end() but before reading buffered tokens. 2. stream-registry: Add hasEnded() method to check if a stream's writable side has ended (used for diagnostics). 3. jobs SSE endpoint: When job is "executing" but stream slot is gone (ended before client attached), poll DB every 2s (max 120s) until the job completes, then replay the full result. Previously this case returned "Stream timed out" error. 4. Timeout was already 90s (updated in prior work); fixed the docstring comment from 60s to 90s. 5. Added unit test covering the race: simulates instant stub completion (write + end before consumer attaches) and verifies buffered data is still readable by a late consumer. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
166
artifacts/api-server/src/lib/stream-registry.test.ts
Normal file
166
artifacts/api-server/src/lib/stream-registry.test.ts
Normal file
@@ -0,0 +1,166 @@
|
||||
/**
|
||||
* Unit test: stream registry race condition at completion boundary (#16)
|
||||
*
|
||||
* Simulates the scenario where work completes (stream.end()) before a consumer
|
||||
* attaches. Verifies that buffered data is still readable by a late consumer.
|
||||
*
|
||||
* Run: npx tsx artifacts/api-server/src/lib/stream-registry.test.ts
|
||||
*/
|
||||
import { PassThrough } from "stream";
|
||||
|
||||
// Inline minimal StreamRegistry to test in isolation (no side-effect imports)
|
||||
class StreamRegistry {
|
||||
private readonly streams = new Map<string, { stream: PassThrough; createdAt: number }>();
|
||||
|
||||
register(jobId: string): PassThrough {
|
||||
const existing = this.streams.get(jobId);
|
||||
if (existing) existing.stream.destroy();
|
||||
const stream = new PassThrough();
|
||||
this.streams.set(jobId, { stream, createdAt: Date.now() });
|
||||
stream.on("close", () => { this.streams.delete(jobId); });
|
||||
return stream;
|
||||
}
|
||||
|
||||
get(jobId: string): PassThrough | null {
|
||||
return this.streams.get(jobId)?.stream ?? null;
|
||||
}
|
||||
|
||||
write(jobId: string, chunk: string): void {
|
||||
this.streams.get(jobId)?.stream.write(chunk);
|
||||
}
|
||||
|
||||
end(jobId: string): void {
|
||||
const entry = this.streams.get(jobId);
|
||||
if (entry) {
|
||||
entry.stream.end();
|
||||
// NOT deleting from map — "close" listener handles cleanup after consumer drains
|
||||
}
|
||||
}
|
||||
|
||||
hasEnded(jobId: string): boolean {
|
||||
const entry = this.streams.get(jobId);
|
||||
return entry ? entry.stream.writableEnded : false;
|
||||
}
|
||||
}
|
||||
|
||||
let passed = 0;
|
||||
let failed = 0;
|
||||
|
||||
function assert(condition: boolean, msg: string) {
|
||||
if (!condition) {
|
||||
console.error(` FAIL: ${msg}`);
|
||||
failed++;
|
||||
} else {
|
||||
console.log(` PASS: ${msg}`);
|
||||
passed++;
|
||||
}
|
||||
}
|
||||
|
||||
async function testRaceCondition() {
|
||||
console.log("Test: late consumer receives buffered data after stream.end()");
|
||||
|
||||
const registry = new StreamRegistry();
|
||||
const jobId = "test-job-1";
|
||||
|
||||
// 1. Register stream and write data + end (simulating instant stub completion)
|
||||
registry.register(jobId);
|
||||
registry.write(jobId, "Hello ");
|
||||
registry.write(jobId, "world");
|
||||
registry.end(jobId);
|
||||
|
||||
// 2. Verify stream is still retrievable (not deleted from map)
|
||||
const stream = registry.get(jobId);
|
||||
assert(stream !== null, "stream should still be in registry after end()");
|
||||
assert(registry.hasEnded(jobId), "hasEnded() should return true");
|
||||
|
||||
// 3. Late consumer attaches and reads buffered data
|
||||
const chunks: string[] = [];
|
||||
await new Promise<void>((resolve) => {
|
||||
stream!.on("data", (chunk: Buffer) => {
|
||||
chunks.push(chunk.toString("utf8"));
|
||||
});
|
||||
stream!.on("end", () => {
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
|
||||
const fullText = chunks.join("");
|
||||
assert(fullText === "Hello world", `consumer received full text: "${fullText}"`);
|
||||
}
|
||||
|
||||
async function testNormalFlow() {
|
||||
console.log("Test: consumer attached before end() receives live data");
|
||||
|
||||
const registry = new StreamRegistry();
|
||||
const jobId = "test-job-2";
|
||||
|
||||
registry.register(jobId);
|
||||
|
||||
const chunks: string[] = [];
|
||||
const stream = registry.get(jobId)!;
|
||||
|
||||
const done = new Promise<void>((resolve) => {
|
||||
stream.on("data", (chunk: Buffer) => {
|
||||
chunks.push(chunk.toString("utf8"));
|
||||
});
|
||||
stream.on("end", () => {
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
|
||||
// Write after consumer attached
|
||||
registry.write(jobId, "token1 ");
|
||||
registry.write(jobId, "token2");
|
||||
registry.end(jobId);
|
||||
|
||||
await done;
|
||||
|
||||
const fullText = chunks.join("");
|
||||
assert(fullText === "token1 token2", `live consumer received: "${fullText}"`);
|
||||
}
|
||||
|
||||
async function testCleanupAfterConsume() {
|
||||
console.log("Test: stream removed from registry after consumer finishes");
|
||||
|
||||
const registry = new StreamRegistry();
|
||||
const jobId = "test-job-3";
|
||||
|
||||
registry.register(jobId);
|
||||
registry.write(jobId, "data");
|
||||
registry.end(jobId);
|
||||
|
||||
const stream = registry.get(jobId)!;
|
||||
|
||||
await new Promise<void>((resolve) => {
|
||||
stream.on("data", () => {});
|
||||
stream.on("end", () => resolve());
|
||||
});
|
||||
|
||||
// After consuming, wait for close event to fire (nextTick)
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
assert(registry.get(jobId) === null, "stream removed from registry after drain");
|
||||
}
|
||||
|
||||
async function testNoStreamReturnsNull() {
|
||||
console.log("Test: get() returns null for unknown job");
|
||||
|
||||
const registry = new StreamRegistry();
|
||||
assert(registry.get("nonexistent") === null, "returns null for unknown job");
|
||||
assert(!registry.hasEnded("nonexistent"), "hasEnded returns false for unknown job");
|
||||
}
|
||||
|
||||
async function main() {
|
||||
await testRaceCondition();
|
||||
await testNormalFlow();
|
||||
await testCleanupAfterConsume();
|
||||
await testNoStreamReturnsNull();
|
||||
|
||||
console.log(`\nResults: ${passed} passed, ${failed} failed`);
|
||||
if (failed > 0) process.exit(1);
|
||||
}
|
||||
|
||||
main().catch((err) => {
|
||||
console.error(err);
|
||||
process.exit(1);
|
||||
});
|
||||
@@ -37,10 +37,19 @@ class StreamRegistry {
|
||||
const entry = this.streams.get(jobId);
|
||||
if (entry) {
|
||||
entry.stream.end();
|
||||
this.streams.delete(jobId);
|
||||
// Don't delete from map here — the "close" listener set in register()
|
||||
// handles cleanup once the consumer has drained all buffered data.
|
||||
// This prevents a race where a late-attaching SSE client calls get()
|
||||
// after end() but before consuming buffered tokens (#16).
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns true if a stream was registered and its writable side has ended. */
|
||||
hasEnded(jobId: string): boolean {
|
||||
const entry = this.streams.get(jobId);
|
||||
return entry ? entry.stream.writableEnded : false;
|
||||
}
|
||||
|
||||
private evictExpired(): void {
|
||||
const now = Date.now();
|
||||
for (const [id, entry] of this.streams.entries()) {
|
||||
|
||||
@@ -714,7 +714,7 @@ router.post("/jobs/:id/refund", async (req: Request, res: Response) => {
|
||||
// ── GET /jobs/:id/stream ──────────────────────────────────────────────────────
|
||||
// Server-Sent Events (#3): streams Claude token deltas in real time while the
|
||||
// job is executing. If the job is already complete, sends the full result then
|
||||
// closes. If the job isn't executing yet, waits up to 60 s for it to start.
|
||||
// closes. If the job isn't executing yet, waits up to 90 s for it to start.
|
||||
|
||||
router.get("/jobs/:id/stream", async (req: Request, res: Response) => {
|
||||
const paramResult = GetJobParams.safeParse(req.params);
|
||||
@@ -828,6 +828,44 @@ router.get("/jobs/:id/stream", async (req: Request, res: Response) => {
|
||||
return;
|
||||
}
|
||||
|
||||
// ── Resolve: job executing but stream gone — poll DB until result (#16) ───
|
||||
if (currentJob.state === "executing") {
|
||||
logger.warn("stream slot gone while job still executing — polling DB for result", { jobId: id });
|
||||
const pollStart = Date.now();
|
||||
const POLL_INTERVAL = 2_000;
|
||||
const POLL_MAX = 120_000;
|
||||
|
||||
const pollResult = await new Promise<Job | null>((resolve) => {
|
||||
const tick = async () => {
|
||||
if (Date.now() - pollStart > POLL_MAX) {
|
||||
resolve(null);
|
||||
return;
|
||||
}
|
||||
const fresh = await getJobById(id);
|
||||
if (fresh && (fresh.state === "complete" || fresh.state === "failed")) {
|
||||
resolve(fresh);
|
||||
return;
|
||||
}
|
||||
setTimeout(tick, POLL_INTERVAL);
|
||||
};
|
||||
setTimeout(tick, POLL_INTERVAL);
|
||||
});
|
||||
|
||||
if (pollResult?.state === "complete" && pollResult.result) {
|
||||
sendEvent("token", { text: pollResult.result });
|
||||
sendEvent("done", { jobId: id, state: "complete" });
|
||||
res.end();
|
||||
cleanup();
|
||||
return;
|
||||
}
|
||||
if (pollResult?.state === "failed") {
|
||||
sendEvent("error", { jobId: id, message: pollResult.errorMessage ?? "Job failed" });
|
||||
res.end();
|
||||
cleanup();
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// ── Resolve: timeout with no activity — tell client to fall back to polling
|
||||
sendEvent("error", {
|
||||
jobId: id,
|
||||
|
||||
Reference in New Issue
Block a user