SSE stream registry race condition at 60-second timeout boundary #16
Reference in New Issue
Block a user
Delete Branch "%!s()"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Context
In
GET /api/jobs/:id/stream, when a client connects before the work payment is confirmed, the handler waits up to 60 s for the event bus to signal that a stream slot was registered. The wait is resolved by an event bus listener.However there is a narrow race condition:
/stream, no stream slot exists yetjob:paidorjob:stateeventstreamRegistry.register(jobId)is called,job:paidevent is publishedstreamRegistry.get(id)is called on the now-resolved promisestreamRegistry.end(jobId)may have already been called before the SSE handler callsstream.on("data")dataevents are never received,endfires immediately, client gets adoneevent with no tokensA separate issue: if the 60 s deadline fires before the stream is registered (e.g. very slow payment confirmation), the fallback checks the DB and replays the full result — but if the job is still
executingat that point (not yetcomplete), the client getserror: Stream not availableeven though work is in progress.Requirements
executingbut the stream slot is gone (ended before client attached), log a warning and return the DB result when it becomes available (poll with short interval, max 120 s)doneevent with contentAcceptance Criteria
token+doneevents, not an errorerror: Stream not availableis emitted when the job eventually succeedsFiles
artifacts/api-server/src/routes/jobs.ts(SSE endpoint, bottom of file)artifacts/api-server/src/lib/stream-registry.tsPR updated: http://143.198.27.163:3000/replit/token-gated-economy/pulls/56
Added DB polling fallback for the race condition where stub mode (or very fast work) completes before the SSE client attaches to the stream. When
streamRegistry.get()returns null but the job is stillexecuting, the handler now polls the DB every 2s (max 120s) until the job reaches a terminal state, then replays the result viatoken+doneSSE events.Also added 7 unit tests covering the race condition using
node:test.The 90s bus-wait timeout and post-wait DB re-check were already in place from a prior commit.