56 lines
1.2 KiB
TypeScript
56 lines
1.2 KiB
TypeScript
import { PassThrough } from "stream";
|
|
|
|
interface StreamEntry {
|
|
stream: PassThrough;
|
|
createdAt: number;
|
|
}
|
|
|
|
class StreamRegistry {
|
|
private readonly streams = new Map<string, StreamEntry>();
|
|
private readonly TTL_MS = 5 * 60 * 1000;
|
|
|
|
register(jobId: string): PassThrough {
|
|
const existing = this.streams.get(jobId);
|
|
if (existing) {
|
|
existing.stream.destroy();
|
|
}
|
|
const stream = new PassThrough();
|
|
this.streams.set(jobId, { stream, createdAt: Date.now() });
|
|
|
|
stream.on("close", () => {
|
|
this.streams.delete(jobId);
|
|
});
|
|
|
|
this.evictExpired();
|
|
return stream;
|
|
}
|
|
|
|
get(jobId: string): PassThrough | null {
|
|
return this.streams.get(jobId)?.stream ?? null;
|
|
}
|
|
|
|
write(jobId: string, chunk: string): void {
|
|
this.streams.get(jobId)?.stream.write(chunk);
|
|
}
|
|
|
|
end(jobId: string): void {
|
|
const entry = this.streams.get(jobId);
|
|
if (entry) {
|
|
entry.stream.end();
|
|
this.streams.delete(jobId);
|
|
}
|
|
}
|
|
|
|
private evictExpired(): void {
|
|
const now = Date.now();
|
|
for (const [id, entry] of this.streams.entries()) {
|
|
if (now - entry.createdAt > this.TTL_MS) {
|
|
entry.stream.destroy();
|
|
this.streams.delete(id);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
export const streamRegistry = new StreamRegistry();
|