Add streaming capabilities and improve API stability and security

Introduce streaming for AI job execution, implement rate limiting for API endpoints, enhance CORS configuration, and refactor event handling.

Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 418bf6f8-212b-4bb0-a7a5-8231a061da4e
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: 2967540c-7b01-4168-87be-fde774e32494
Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/9f85e954-647c-46a5-90a7-396e495a805a/418bf6f8-212b-4bb0-a7a5-8231a061da4e/Q83Uqvu
Replit-Helium-Checkpoint-Created: true
This commit is contained in:
alexpaynex
2026-03-18 22:17:10 +00:00
parent ca94c0a9e5
commit 5b3d7edf6a
9 changed files with 388 additions and 15 deletions

View File

@@ -10,19 +10,22 @@
"smoke": "tsx ./src/smoke.ts"
},
"dependencies": {
"@workspace/db": "workspace:*",
"@workspace/api-zod": "workspace:*",
"@workspace/db": "workspace:*",
"@workspace/integrations-anthropic-ai": "workspace:*",
"cookie-parser": "^1.4.7",
"cors": "^2",
"drizzle-orm": "catalog:",
"express": "^5",
"cookie-parser": "^1.4.7",
"cors": "^2"
"express-rate-limit": "^8.3.1",
"ws": "^8.19.0"
},
"devDependencies": {
"@types/node": "catalog:",
"@types/express": "^5.0.6",
"@types/cors": "^2.8.19",
"@types/cookie-parser": "^1.4.10",
"@types/cors": "^2.8.19",
"@types/express": "^5.0.6",
"@types/node": "catalog:",
"@types/ws": "^8.18.1",
"esbuild": "^0.27.3",
"tsx": "catalog:"
}

View File

@@ -1,12 +1,43 @@
import express, { type Express } from "express";
import cors from "cors";
import router from "./routes";
import router from "./routes/index.js";
const app: Express = express();
app.set("trust proxy", 1);
app.use(cors());
// ── CORS (#5) ────────────────────────────────────────────────────────────────
// CORS_ORIGINS = comma-separated list of allowed origins.
// Default in production: alexanderwhitestone.com (and www. variant).
// Default in development: all origins permitted.
const isProd = process.env["NODE_ENV"] === "production";
const rawOrigins = process.env["CORS_ORIGINS"];
const allowedOrigins: string[] = rawOrigins
? rawOrigins.split(",").map((o) => o.trim()).filter(Boolean)
: isProd
? ["https://alexanderwhitestone.com", "https://www.alexanderwhitestone.com"]
: [];
app.use(
cors({
origin:
allowedOrigins.length === 0
? true
: (origin, callback) => {
if (!origin || allowedOrigins.includes(origin)) {
callback(null, true);
} else {
callback(new Error(`CORS: origin '${origin}' not allowed`));
}
},
credentials: true,
methods: ["GET", "POST", "PATCH", "DELETE", "OPTIONS"],
allowedHeaders: ["Content-Type", "Authorization", "X-Session-Token"],
exposedHeaders: ["X-Session-Token"],
}),
);
app.use(express.json());
app.use(express.urlencoded({ extended: true }));

View File

@@ -79,6 +79,48 @@ Fulfill it thoroughly and helpfully. Be concise yet complete.`,
outputTokens: message.usage.output_tokens,
};
}
/**
* Streaming variant of executeWork (#3). Calls onChunk for every text delta
* so callers can pipe tokens to an SSE stream in real time.
*/
async executeWorkStreaming(
requestText: string,
onChunk: (delta: string) => void,
): Promise<WorkResult> {
let fullText = "";
let inputTokens = 0;
let outputTokens = 0;
const stream = anthropic.messages.stream({
model: this.workModel,
max_tokens: 8192,
system: `You are Timmy, a capable AI agent. A user has paid for you to handle their request.
Fulfill it thoroughly and helpfully. Be concise yet complete.`,
messages: [{ role: "user", content: requestText }],
});
for await (const event of stream) {
if (
event.type === "content_block_delta" &&
event.delta.type === "text_delta"
) {
const delta = event.delta.text;
fullText += delta;
onChunk(delta);
} else if (event.type === "message_delta" && event.usage) {
outputTokens = event.usage.output_tokens;
} else if (event.type === "message_start" && event.message.usage) {
inputTokens = event.message.usage.input_tokens;
}
}
return {
result: fullText,
inputTokens,
outputTokens,
};
}
}
export const agentService = new AgentService();

View File

@@ -0,0 +1,33 @@
import { EventEmitter } from "events";
export type JobEvent =
| { type: "job:state"; jobId: string; state: string }
| { type: "job:paid"; jobId: string; invoiceType: "eval" | "work" }
| { type: "job:completed"; jobId: string; result: string }
| { type: "job:failed"; jobId: string; reason: string };
export type SessionEvent =
| { type: "session:state"; sessionId: string; state: string }
| { type: "session:paid"; sessionId: string; amountSats: number }
| { type: "session:balance"; sessionId: string; balanceSats: number };
export type BusEvent = JobEvent | SessionEvent;
class EventBus extends EventEmitter {
emit(event: "bus", data: BusEvent): boolean;
emit(event: string, ...args: unknown[]): boolean {
return super.emit(event, ...args);
}
on(event: "bus", listener: (data: BusEvent) => void): this;
on(event: string, listener: (...args: unknown[]) => void): this {
return super.on(event, listener);
}
publish(data: BusEvent): void {
this.emit("bus", data);
}
}
export const eventBus = new EventBus();
eventBus.setMaxListeners(256);

View File

@@ -0,0 +1,33 @@
import { rateLimit, type Options } from "express-rate-limit";
function envInt(key: string, fallback: number): number {
const v = process.env[key];
const n = v ? parseInt(v, 10) : NaN;
return Number.isNaN(n) ? fallback : n;
}
function limiter(windowMs: number, max: number, overrideKey?: string) {
const resolvedMax = overrideKey ? envInt(overrideKey, max) : max;
return rateLimit({
windowMs,
max: resolvedMax,
standardHeaders: "draft-7",
legacyHeaders: false,
handler: (_req, res) => {
res.status(429).json({
error: "rate_limited",
message: "Too many requests — please slow down.",
retryAfterSeconds: Math.ceil(windowMs / 1000),
});
},
} satisfies Partial<Options>);
}
// POST /api/jobs — 30 req/min per IP (configurable via RATE_LIMIT_JOBS)
export const jobsLimiter = limiter(60_000, 30, "RATE_LIMIT_JOBS");
// POST /api/sessions — 10 req/min per IP (configurable via RATE_LIMIT_SESSIONS)
export const sessionsLimiter = limiter(60_000, 10, "RATE_LIMIT_SESSIONS");
// POST /api/bootstrap — 3 req/hour per IP (configurable via RATE_LIMIT_BOOTSTRAP)
export const bootstrapLimiter = limiter(60 * 60_000, 3, "RATE_LIMIT_BOOTSTRAP");

View File

@@ -0,0 +1,55 @@
import { PassThrough } from "stream";
interface StreamEntry {
stream: PassThrough;
createdAt: number;
}
class StreamRegistry {
private readonly streams = new Map<string, StreamEntry>();
private readonly TTL_MS = 5 * 60 * 1000;
register(jobId: string): PassThrough {
const existing = this.streams.get(jobId);
if (existing) {
existing.stream.destroy();
}
const stream = new PassThrough();
this.streams.set(jobId, { stream, createdAt: Date.now() });
stream.on("close", () => {
this.streams.delete(jobId);
});
this.evictExpired();
return stream;
}
get(jobId: string): PassThrough | null {
return this.streams.get(jobId)?.stream ?? null;
}
write(jobId: string, chunk: string): void {
this.streams.get(jobId)?.stream.write(chunk);
}
end(jobId: string): void {
const entry = this.streams.get(jobId);
if (entry) {
entry.stream.end();
this.streams.delete(jobId);
}
}
private evictExpired(): void {
const now = Date.now();
for (const [id, entry] of this.streams.entries()) {
if (now - entry.createdAt > this.TTL_MS) {
entry.stream.destroy();
this.streams.delete(id);
}
}
}
}
export const streamRegistry = new StreamRegistry();

View File

@@ -6,6 +6,9 @@ import { CreateJobBody, GetJobParams } from "@workspace/api-zod";
import { lnbitsService } from "../lib/lnbits.js";
import { agentService } from "../lib/agent.js";
import { pricingService } from "../lib/pricing.js";
import { jobsLimiter } from "../lib/rate-limiter.js";
import { eventBus } from "../lib/event-bus.js";
import { streamRegistry } from "../lib/stream-registry.js";
const router = Router();
@@ -65,11 +68,13 @@ async function runEvalInBackground(jobId: string, request: string): Promise<void
})
.where(eq(jobs.id, jobId));
});
eventBus.publish({ type: "job:state", jobId, state: "awaiting_work_payment" });
} else {
await db
.update(jobs)
.set({ state: "rejected", rejectionReason: evalResult.reason, updatedAt: new Date() })
.where(eq(jobs.id, jobId));
eventBus.publish({ type: "job:state", jobId, state: "rejected" });
}
} catch (err) {
const message = err instanceof Error ? err.message : "Evaluation error";
@@ -77,15 +82,23 @@ async function runEvalInBackground(jobId: string, request: string): Promise<void
.update(jobs)
.set({ state: "failed", errorMessage: message, updatedAt: new Date() })
.where(eq(jobs.id, jobId));
eventBus.publish({ type: "job:failed", jobId, reason: message });
}
}
/**
* Runs the AI work execution in a background task so HTTP polls return fast.
* Uses streaming so any connected SSE client receives tokens in real time (#3).
*/
async function runWorkInBackground(jobId: string, request: string, workAmountSats: number, btcPriceUsd: number | null): Promise<void> {
try {
const workResult = await agentService.executeWork(request);
eventBus.publish({ type: "job:state", jobId, state: "executing" });
const workResult = await agentService.executeWorkStreaming(request, (delta) => {
streamRegistry.write(jobId, delta);
});
streamRegistry.end(jobId);
const actualCostUsd = pricingService.calculateActualCostUsd(
workResult.inputTokens,
@@ -112,12 +125,16 @@ async function runWorkInBackground(jobId: string, request: string, workAmountSat
updatedAt: new Date(),
})
.where(eq(jobs.id, jobId));
eventBus.publish({ type: "job:completed", jobId, result: workResult.result });
} catch (err) {
const message = err instanceof Error ? err.message : "Execution error";
streamRegistry.end(jobId);
await db
.update(jobs)
.set({ state: "failed", errorMessage: message, updatedAt: new Date() })
.where(eq(jobs.id, jobId));
eventBus.publish({ type: "job:failed", jobId, reason: message });
}
}
@@ -149,6 +166,9 @@ async function advanceJob(job: Job): Promise<Job | null> {
if (!advanced) return getJobById(job.id);
eventBus.publish({ type: "job:paid", jobId: job.id, invoiceType: "eval" });
eventBus.publish({ type: "job:state", jobId: job.id, state: "evaluating" });
// Fire AI eval in background — poll returns immediately with "evaluating"
setImmediate(() => { void runEvalInBackground(job.id, job.request); });
@@ -177,6 +197,11 @@ async function advanceJob(job: Job): Promise<Job | null> {
if (!advanced) return getJobById(job.id);
eventBus.publish({ type: "job:paid", jobId: job.id, invoiceType: "work" });
// Register stream slot before firing background work so first tokens aren't lost
streamRegistry.register(job.id);
// Fire AI work in background — poll returns immediately with "executing"
setImmediate(() => { void runWorkInBackground(job.id, job.request, job.workAmountSats ?? 0, job.btcPriceUsd); });
@@ -188,7 +213,7 @@ async function advanceJob(job: Job): Promise<Job | null> {
// ── POST /jobs ────────────────────────────────────────────────────────────────
router.post("/jobs", async (req: Request, res: Response) => {
router.post("/jobs", jobsLimiter, async (req: Request, res: Response) => {
const parseResult = CreateJobBody.safeParse(req.body);
if (!parseResult.success) {
const issue = parseResult.error.issues[0];
@@ -404,4 +429,110 @@ router.post("/jobs/:id/refund", async (req: Request, res: Response) => {
}
});
// ── GET /jobs/:id/stream ──────────────────────────────────────────────────────
// Server-Sent Events (#3): streams Claude token deltas in real time while the
// job is executing. If the job is already complete, sends the full result then
// closes. If the job isn't executing yet, waits up to 60 s for it to start.
router.get("/jobs/:id/stream", async (req: Request, res: Response) => {
const paramResult = GetJobParams.safeParse(req.params);
if (!paramResult.success) {
res.status(400).json({ error: "Invalid job id" });
return;
}
const { id } = paramResult.data;
const job = await getJobById(id);
if (!job) {
res.status(404).json({ error: "Job not found" });
return;
}
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no");
res.flushHeaders();
const sendEvent = (event: string, data: unknown) => {
res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`);
};
// Job already complete — replay full result immediately
if (job.state === "complete" && job.result) {
sendEvent("token", { text: job.result });
sendEvent("done", { jobId: id, state: "complete" });
res.end();
return;
}
if (job.state === "failed") {
sendEvent("error", { jobId: id, message: job.errorMessage ?? "Job failed" });
res.end();
return;
}
// Job is executing or about to execute — pipe the live stream
const sendHeartbeat = setInterval(() => {
res.write(": heartbeat\n\n");
}, 15_000);
const cleanup = () => {
clearInterval(sendHeartbeat);
};
req.on("close", cleanup);
// Wait up to 60 s for a stream slot to appear (work payment may not have landed yet)
let stream = streamRegistry.get(id);
if (!stream) {
await new Promise<void>((resolve) => {
const deadline = setTimeout(resolve, 60_000);
const busListener = (data: Parameters<typeof eventBus.publish>[0]) => {
if (
"jobId" in data &&
data.jobId === id &&
(data.type === "job:state" || data.type === "job:paid")
) {
clearTimeout(deadline);
eventBus.off("bus", busListener);
resolve();
}
};
eventBus.on("bus", busListener);
});
stream = streamRegistry.get(id);
}
if (!stream) {
// Still no stream — job may have completed while we waited
const refreshed = await getJobById(id);
if (refreshed?.state === "complete" && refreshed.result) {
sendEvent("token", { text: refreshed.result });
sendEvent("done", { jobId: id, state: "complete" });
} else {
sendEvent("error", { jobId: id, message: "Stream not available" });
}
res.end();
cleanup();
return;
}
stream.on("data", (chunk: Buffer) => {
sendEvent("token", { text: chunk.toString("utf8") });
});
stream.on("end", () => {
sendEvent("done", { jobId: id, state: "complete" });
res.end();
cleanup();
});
stream.on("error", (err: Error) => {
sendEvent("error", { jobId: id, message: err.message });
res.end();
cleanup();
});
});
export default router;

View File

@@ -3,6 +3,8 @@ import { randomBytes, randomUUID } from "crypto";
import { db, sessions, sessionRequests, type Session } from "@workspace/db";
import { eq, and } from "drizzle-orm";
import { lnbitsService } from "../lib/lnbits.js";
import { sessionsLimiter } from "../lib/rate-limiter.js";
import { eventBus } from "../lib/event-bus.js";
import { agentService } from "../lib/agent.js";
import { pricingService } from "../lib/pricing.js";
import { getBtcPriceUsd, usdToSats } from "../lib/btc-oracle.js";
@@ -133,7 +135,7 @@ async function advanceTopup(session: Session): Promise<Session> {
// ── POST /sessions ─────────────────────────────────────────────────────────────
router.post("/sessions", async (req: Request, res: Response) => {
router.post("/sessions", sessionsLimiter, async (req: Request, res: Response) => {
const rawAmount = req.body?.amount_sats;
const amountSats = parseInt(String(rawAmount ?? ""), 10);