import { PassThrough } from "stream"; interface StreamEntry { stream: PassThrough; createdAt: number; } class StreamRegistry { private readonly streams = new Map(); private readonly TTL_MS = 5 * 60 * 1000; register(jobId: string): PassThrough { const existing = this.streams.get(jobId); if (existing) { existing.stream.destroy(); } const stream = new PassThrough(); this.streams.set(jobId, { stream, createdAt: Date.now() }); stream.on("close", () => { this.streams.delete(jobId); }); this.evictExpired(); return stream; } get(jobId: string): PassThrough | null { return this.streams.get(jobId)?.stream ?? null; } write(jobId: string, chunk: string): void { this.streams.get(jobId)?.stream.write(chunk); } end(jobId: string): void { const entry = this.streams.get(jobId); if (entry) { entry.stream.end(); this.streams.delete(jobId); } } private evictExpired(): void { const now = Date.now(); for (const [id, entry] of this.streams.entries()) { if (now - entry.createdAt > this.TTL_MS) { entry.stream.destroy(); this.streams.delete(id); } } } } export const streamRegistry = new StreamRegistry();