SSE stream registry race condition at 60-second timeout boundary #16

Closed
opened 2026-03-18 22:21:47 +00:00 by replit · 1 comment
Owner

Context

In GET /api/jobs/:id/stream, when a client connects before the work payment is confirmed, the handler waits up to 60 s for the event bus to signal that a stream slot was registered. The wait is resolved by an event bus listener.

However there is a narrow race condition:

  1. Client connects to /stream, no stream slot exists yet
  2. Bus listener is registered — waiting for job:paid or job:state event
  3. Work payment is confirmed — streamRegistry.register(jobId) is called, job:paid event is published
  4. Event fires, bus listener resolves the promise and is removed
  5. Between step 4 and step 5: streamRegistry.get(id) is called on the now-resolved promise
  6. If the background work completes extremely fast (e.g. in stub mode, which is instant), streamRegistry.end(jobId) may have already been called before the SSE handler calls stream.on("data")
  7. The stream is already ended — data events are never received, end fires immediately, client gets a done event with no tokens

A separate issue: if the 60 s deadline fires before the stream is registered (e.g. very slow payment confirmation), the fallback checks the DB and replays the full result — but if the job is still executing at that point (not yet complete), the client gets error: Stream not available even though work is in progress.

Requirements

  1. After the bus-listener wait resolves, check if the stream has already ended. If so, fall back to DB result replay immediately (same as the "already complete" fast path)
  2. If the job is executing but the stream slot is gone (ended before client attached), log a warning and return the DB result when it becomes available (poll with short interval, max 120 s)
  3. Add a unit test covering the race: simulate instant stub completion and verify the SSE client receives a done event with content
  4. Increase the wait timeout from 60 s to 90 s (work payment can take time on mainnet)

Acceptance Criteria

  • SSE client connecting after work has already completed (stream already ended) receives the full result via token + done events, not an error
  • SSE client connecting during an ongoing job (stream active) receives live token deltas
  • SSE client connecting before work payment with a 90 s timeout still receives results if payment lands within that window
  • No error: Stream not available is emitted when the job eventually succeeds
  • Stream registry is empty after a job completes and all SSE clients have disconnected (no leak)

Files

  • artifacts/api-server/src/routes/jobs.ts (SSE endpoint, bottom of file)
  • artifacts/api-server/src/lib/stream-registry.ts
## Context In `GET /api/jobs/:id/stream`, when a client connects before the work payment is confirmed, the handler waits up to 60 s for the event bus to signal that a stream slot was registered. The wait is resolved by an event bus listener. However there is a narrow race condition: 1. Client connects to `/stream`, no stream slot exists yet 2. Bus listener is registered — waiting for `job:paid` or `job:state` event 3. Work payment is confirmed — `streamRegistry.register(jobId)` is called, `job:paid` event is published 4. Event fires, bus listener resolves the promise and is removed 5. **Between step 4 and step 5**: `streamRegistry.get(id)` is called on the now-resolved promise 6. If the background work completes extremely fast (e.g. in stub mode, which is instant), `streamRegistry.end(jobId)` may have already been called before the SSE handler calls `stream.on("data")` 7. The stream is already ended — `data` events are never received, `end` fires immediately, client gets a `done` event with no tokens A separate issue: if the 60 s deadline fires before the stream is registered (e.g. very slow payment confirmation), the fallback checks the DB and replays the full result — but if the job is still `executing` at that point (not yet `complete`), the client gets `error: Stream not available` even though work is in progress. ## Requirements 1. After the bus-listener wait resolves, check if the stream has already ended. If so, fall back to DB result replay immediately (same as the "already complete" fast path) 2. If the job is `executing` but the stream slot is gone (ended before client attached), log a warning and return the DB result when it becomes available (poll with short interval, max 120 s) 3. Add a unit test covering the race: simulate instant stub completion and verify the SSE client receives a `done` event with content 4. Increase the wait timeout from 60 s to 90 s (work payment can take time on mainnet) ## Acceptance Criteria - [ ] SSE client connecting after work has already completed (stream already ended) receives the full result via `token` + `done` events, not an error - [ ] SSE client connecting during an ongoing job (stream active) receives live token deltas - [ ] SSE client connecting before work payment with a 90 s timeout still receives results if payment lands within that window - [ ] No `error: Stream not available` is emitted when the job eventually succeeds - [ ] Stream registry is empty after a job completes and all SSE clients have disconnected (no leak) ## Files - `artifacts/api-server/src/routes/jobs.ts` (SSE endpoint, bottom of file) - `artifacts/api-server/src/lib/stream-registry.ts`
replit added the backendhermes labels 2026-03-19 19:34:13 +00:00
claude was assigned by Rockachopa 2026-03-22 23:38:28 +00:00
Collaborator

PR updated: http://143.198.27.163:3000/replit/token-gated-economy/pulls/56

Added DB polling fallback for the race condition where stub mode (or very fast work) completes before the SSE client attaches to the stream. When streamRegistry.get() returns null but the job is still executing, the handler now polls the DB every 2s (max 120s) until the job reaches a terminal state, then replays the result via token + done SSE events.

Also added 7 unit tests covering the race condition using node:test.

The 90s bus-wait timeout and post-wait DB re-check were already in place from a prior commit.

PR updated: http://143.198.27.163:3000/replit/token-gated-economy/pulls/56 Added DB polling fallback for the race condition where stub mode (or very fast work) completes before the SSE client attaches to the stream. When `streamRegistry.get()` returns null but the job is still `executing`, the handler now polls the DB every 2s (max 120s) until the job reaches a terminal state, then replays the result via `token` + `done` SSE events. Also added 7 unit tests covering the race condition using `node:test`. The 90s bus-wait timeout and post-wait DB re-check were already in place from a prior commit.
This repo is archived. You cannot comment on issues.