diff --git a/artifacts/api-server/package.json b/artifacts/api-server/package.json index c40f475..dda75ed 100644 --- a/artifacts/api-server/package.json +++ b/artifacts/api-server/package.json @@ -10,19 +10,22 @@ "smoke": "tsx ./src/smoke.ts" }, "dependencies": { - "@workspace/db": "workspace:*", "@workspace/api-zod": "workspace:*", + "@workspace/db": "workspace:*", "@workspace/integrations-anthropic-ai": "workspace:*", + "cookie-parser": "^1.4.7", + "cors": "^2", "drizzle-orm": "catalog:", "express": "^5", - "cookie-parser": "^1.4.7", - "cors": "^2" + "express-rate-limit": "^8.3.1", + "ws": "^8.19.0" }, "devDependencies": { - "@types/node": "catalog:", - "@types/express": "^5.0.6", - "@types/cors": "^2.8.19", "@types/cookie-parser": "^1.4.10", + "@types/cors": "^2.8.19", + "@types/express": "^5.0.6", + "@types/node": "catalog:", + "@types/ws": "^8.18.1", "esbuild": "^0.27.3", "tsx": "catalog:" } diff --git a/artifacts/api-server/src/app.ts b/artifacts/api-server/src/app.ts index da1b38b..7b6176d 100644 --- a/artifacts/api-server/src/app.ts +++ b/artifacts/api-server/src/app.ts @@ -1,12 +1,43 @@ import express, { type Express } from "express"; import cors from "cors"; -import router from "./routes"; +import router from "./routes/index.js"; const app: Express = express(); app.set("trust proxy", 1); -app.use(cors()); +// ── CORS (#5) ──────────────────────────────────────────────────────────────── +// CORS_ORIGINS = comma-separated list of allowed origins. +// Default in production: alexanderwhitestone.com (and www. variant). +// Default in development: all origins permitted. +const isProd = process.env["NODE_ENV"] === "production"; + +const rawOrigins = process.env["CORS_ORIGINS"]; +const allowedOrigins: string[] = rawOrigins + ? rawOrigins.split(",").map((o) => o.trim()).filter(Boolean) + : isProd + ? ["https://alexanderwhitestone.com", "https://www.alexanderwhitestone.com"] + : []; + +app.use( + cors({ + origin: + allowedOrigins.length === 0 + ? true + : (origin, callback) => { + if (!origin || allowedOrigins.includes(origin)) { + callback(null, true); + } else { + callback(new Error(`CORS: origin '${origin}' not allowed`)); + } + }, + credentials: true, + methods: ["GET", "POST", "PATCH", "DELETE", "OPTIONS"], + allowedHeaders: ["Content-Type", "Authorization", "X-Session-Token"], + exposedHeaders: ["X-Session-Token"], + }), +); + app.use(express.json()); app.use(express.urlencoded({ extended: true })); diff --git a/artifacts/api-server/src/lib/agent.ts b/artifacts/api-server/src/lib/agent.ts index 4a8d7f3..c05b01f 100644 --- a/artifacts/api-server/src/lib/agent.ts +++ b/artifacts/api-server/src/lib/agent.ts @@ -79,6 +79,48 @@ Fulfill it thoroughly and helpfully. Be concise yet complete.`, outputTokens: message.usage.output_tokens, }; } + + /** + * Streaming variant of executeWork (#3). Calls onChunk for every text delta + * so callers can pipe tokens to an SSE stream in real time. + */ + async executeWorkStreaming( + requestText: string, + onChunk: (delta: string) => void, + ): Promise { + let fullText = ""; + let inputTokens = 0; + let outputTokens = 0; + + const stream = anthropic.messages.stream({ + model: this.workModel, + max_tokens: 8192, + system: `You are Timmy, a capable AI agent. A user has paid for you to handle their request. +Fulfill it thoroughly and helpfully. Be concise yet complete.`, + messages: [{ role: "user", content: requestText }], + }); + + for await (const event of stream) { + if ( + event.type === "content_block_delta" && + event.delta.type === "text_delta" + ) { + const delta = event.delta.text; + fullText += delta; + onChunk(delta); + } else if (event.type === "message_delta" && event.usage) { + outputTokens = event.usage.output_tokens; + } else if (event.type === "message_start" && event.message.usage) { + inputTokens = event.message.usage.input_tokens; + } + } + + return { + result: fullText, + inputTokens, + outputTokens, + }; + } } export const agentService = new AgentService(); diff --git a/artifacts/api-server/src/lib/event-bus.ts b/artifacts/api-server/src/lib/event-bus.ts new file mode 100644 index 0000000..bbf60aa --- /dev/null +++ b/artifacts/api-server/src/lib/event-bus.ts @@ -0,0 +1,33 @@ +import { EventEmitter } from "events"; + +export type JobEvent = + | { type: "job:state"; jobId: string; state: string } + | { type: "job:paid"; jobId: string; invoiceType: "eval" | "work" } + | { type: "job:completed"; jobId: string; result: string } + | { type: "job:failed"; jobId: string; reason: string }; + +export type SessionEvent = + | { type: "session:state"; sessionId: string; state: string } + | { type: "session:paid"; sessionId: string; amountSats: number } + | { type: "session:balance"; sessionId: string; balanceSats: number }; + +export type BusEvent = JobEvent | SessionEvent; + +class EventBus extends EventEmitter { + emit(event: "bus", data: BusEvent): boolean; + emit(event: string, ...args: unknown[]): boolean { + return super.emit(event, ...args); + } + + on(event: "bus", listener: (data: BusEvent) => void): this; + on(event: string, listener: (...args: unknown[]) => void): this { + return super.on(event, listener); + } + + publish(data: BusEvent): void { + this.emit("bus", data); + } +} + +export const eventBus = new EventBus(); +eventBus.setMaxListeners(256); diff --git a/artifacts/api-server/src/lib/rate-limiter.ts b/artifacts/api-server/src/lib/rate-limiter.ts new file mode 100644 index 0000000..0f69d5f --- /dev/null +++ b/artifacts/api-server/src/lib/rate-limiter.ts @@ -0,0 +1,33 @@ +import { rateLimit, type Options } from "express-rate-limit"; + +function envInt(key: string, fallback: number): number { + const v = process.env[key]; + const n = v ? parseInt(v, 10) : NaN; + return Number.isNaN(n) ? fallback : n; +} + +function limiter(windowMs: number, max: number, overrideKey?: string) { + const resolvedMax = overrideKey ? envInt(overrideKey, max) : max; + return rateLimit({ + windowMs, + max: resolvedMax, + standardHeaders: "draft-7", + legacyHeaders: false, + handler: (_req, res) => { + res.status(429).json({ + error: "rate_limited", + message: "Too many requests — please slow down.", + retryAfterSeconds: Math.ceil(windowMs / 1000), + }); + }, + } satisfies Partial); +} + +// POST /api/jobs — 30 req/min per IP (configurable via RATE_LIMIT_JOBS) +export const jobsLimiter = limiter(60_000, 30, "RATE_LIMIT_JOBS"); + +// POST /api/sessions — 10 req/min per IP (configurable via RATE_LIMIT_SESSIONS) +export const sessionsLimiter = limiter(60_000, 10, "RATE_LIMIT_SESSIONS"); + +// POST /api/bootstrap — 3 req/hour per IP (configurable via RATE_LIMIT_BOOTSTRAP) +export const bootstrapLimiter = limiter(60 * 60_000, 3, "RATE_LIMIT_BOOTSTRAP"); diff --git a/artifacts/api-server/src/lib/stream-registry.ts b/artifacts/api-server/src/lib/stream-registry.ts new file mode 100644 index 0000000..7d02eec --- /dev/null +++ b/artifacts/api-server/src/lib/stream-registry.ts @@ -0,0 +1,55 @@ +import { PassThrough } from "stream"; + +interface StreamEntry { + stream: PassThrough; + createdAt: number; +} + +class StreamRegistry { + private readonly streams = new Map(); + private readonly TTL_MS = 5 * 60 * 1000; + + register(jobId: string): PassThrough { + const existing = this.streams.get(jobId); + if (existing) { + existing.stream.destroy(); + } + const stream = new PassThrough(); + this.streams.set(jobId, { stream, createdAt: Date.now() }); + + stream.on("close", () => { + this.streams.delete(jobId); + }); + + this.evictExpired(); + return stream; + } + + get(jobId: string): PassThrough | null { + return this.streams.get(jobId)?.stream ?? null; + } + + write(jobId: string, chunk: string): void { + this.streams.get(jobId)?.stream.write(chunk); + } + + end(jobId: string): void { + const entry = this.streams.get(jobId); + if (entry) { + entry.stream.end(); + this.streams.delete(jobId); + } + } + + private evictExpired(): void { + const now = Date.now(); + for (const [id, entry] of this.streams.entries()) { + if (now - entry.createdAt > this.TTL_MS) { + entry.stream.destroy(); + this.streams.delete(id); + } + } + } +} + +export const streamRegistry = new StreamRegistry(); diff --git a/artifacts/api-server/src/routes/jobs.ts b/artifacts/api-server/src/routes/jobs.ts index 78f67ef..cfe8613 100644 --- a/artifacts/api-server/src/routes/jobs.ts +++ b/artifacts/api-server/src/routes/jobs.ts @@ -6,6 +6,9 @@ import { CreateJobBody, GetJobParams } from "@workspace/api-zod"; import { lnbitsService } from "../lib/lnbits.js"; import { agentService } from "../lib/agent.js"; import { pricingService } from "../lib/pricing.js"; +import { jobsLimiter } from "../lib/rate-limiter.js"; +import { eventBus } from "../lib/event-bus.js"; +import { streamRegistry } from "../lib/stream-registry.js"; const router = Router(); @@ -65,11 +68,13 @@ async function runEvalInBackground(jobId: string, request: string): Promise { try { - const workResult = await agentService.executeWork(request); + eventBus.publish({ type: "job:state", jobId, state: "executing" }); + + const workResult = await agentService.executeWorkStreaming(request, (delta) => { + streamRegistry.write(jobId, delta); + }); + + streamRegistry.end(jobId); const actualCostUsd = pricingService.calculateActualCostUsd( workResult.inputTokens, @@ -112,12 +125,16 @@ async function runWorkInBackground(jobId: string, request: string, workAmountSat updatedAt: new Date(), }) .where(eq(jobs.id, jobId)); + + eventBus.publish({ type: "job:completed", jobId, result: workResult.result }); } catch (err) { const message = err instanceof Error ? err.message : "Execution error"; + streamRegistry.end(jobId); await db .update(jobs) .set({ state: "failed", errorMessage: message, updatedAt: new Date() }) .where(eq(jobs.id, jobId)); + eventBus.publish({ type: "job:failed", jobId, reason: message }); } } @@ -149,6 +166,9 @@ async function advanceJob(job: Job): Promise { if (!advanced) return getJobById(job.id); + eventBus.publish({ type: "job:paid", jobId: job.id, invoiceType: "eval" }); + eventBus.publish({ type: "job:state", jobId: job.id, state: "evaluating" }); + // Fire AI eval in background — poll returns immediately with "evaluating" setImmediate(() => { void runEvalInBackground(job.id, job.request); }); @@ -177,6 +197,11 @@ async function advanceJob(job: Job): Promise { if (!advanced) return getJobById(job.id); + eventBus.publish({ type: "job:paid", jobId: job.id, invoiceType: "work" }); + + // Register stream slot before firing background work so first tokens aren't lost + streamRegistry.register(job.id); + // Fire AI work in background — poll returns immediately with "executing" setImmediate(() => { void runWorkInBackground(job.id, job.request, job.workAmountSats ?? 0, job.btcPriceUsd); }); @@ -188,7 +213,7 @@ async function advanceJob(job: Job): Promise { // ── POST /jobs ──────────────────────────────────────────────────────────────── -router.post("/jobs", async (req: Request, res: Response) => { +router.post("/jobs", jobsLimiter, async (req: Request, res: Response) => { const parseResult = CreateJobBody.safeParse(req.body); if (!parseResult.success) { const issue = parseResult.error.issues[0]; @@ -404,4 +429,110 @@ router.post("/jobs/:id/refund", async (req: Request, res: Response) => { } }); +// ── GET /jobs/:id/stream ────────────────────────────────────────────────────── +// Server-Sent Events (#3): streams Claude token deltas in real time while the +// job is executing. If the job is already complete, sends the full result then +// closes. If the job isn't executing yet, waits up to 60 s for it to start. + +router.get("/jobs/:id/stream", async (req: Request, res: Response) => { + const paramResult = GetJobParams.safeParse(req.params); + if (!paramResult.success) { + res.status(400).json({ error: "Invalid job id" }); + return; + } + const { id } = paramResult.data; + + const job = await getJobById(id); + if (!job) { + res.status(404).json({ error: "Job not found" }); + return; + } + + res.setHeader("Content-Type", "text/event-stream"); + res.setHeader("Cache-Control", "no-cache"); + res.setHeader("Connection", "keep-alive"); + res.setHeader("X-Accel-Buffering", "no"); + res.flushHeaders(); + + const sendEvent = (event: string, data: unknown) => { + res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`); + }; + + // Job already complete — replay full result immediately + if (job.state === "complete" && job.result) { + sendEvent("token", { text: job.result }); + sendEvent("done", { jobId: id, state: "complete" }); + res.end(); + return; + } + + if (job.state === "failed") { + sendEvent("error", { jobId: id, message: job.errorMessage ?? "Job failed" }); + res.end(); + return; + } + + // Job is executing or about to execute — pipe the live stream + const sendHeartbeat = setInterval(() => { + res.write(": heartbeat\n\n"); + }, 15_000); + + const cleanup = () => { + clearInterval(sendHeartbeat); + }; + + req.on("close", cleanup); + + // Wait up to 60 s for a stream slot to appear (work payment may not have landed yet) + let stream = streamRegistry.get(id); + if (!stream) { + await new Promise((resolve) => { + const deadline = setTimeout(resolve, 60_000); + const busListener = (data: Parameters[0]) => { + if ( + "jobId" in data && + data.jobId === id && + (data.type === "job:state" || data.type === "job:paid") + ) { + clearTimeout(deadline); + eventBus.off("bus", busListener); + resolve(); + } + }; + eventBus.on("bus", busListener); + }); + stream = streamRegistry.get(id); + } + + if (!stream) { + // Still no stream — job may have completed while we waited + const refreshed = await getJobById(id); + if (refreshed?.state === "complete" && refreshed.result) { + sendEvent("token", { text: refreshed.result }); + sendEvent("done", { jobId: id, state: "complete" }); + } else { + sendEvent("error", { jobId: id, message: "Stream not available" }); + } + res.end(); + cleanup(); + return; + } + + stream.on("data", (chunk: Buffer) => { + sendEvent("token", { text: chunk.toString("utf8") }); + }); + + stream.on("end", () => { + sendEvent("done", { jobId: id, state: "complete" }); + res.end(); + cleanup(); + }); + + stream.on("error", (err: Error) => { + sendEvent("error", { jobId: id, message: err.message }); + res.end(); + cleanup(); + }); +}); + export default router; diff --git a/artifacts/api-server/src/routes/sessions.ts b/artifacts/api-server/src/routes/sessions.ts index e23a597..5c27f52 100644 --- a/artifacts/api-server/src/routes/sessions.ts +++ b/artifacts/api-server/src/routes/sessions.ts @@ -3,6 +3,8 @@ import { randomBytes, randomUUID } from "crypto"; import { db, sessions, sessionRequests, type Session } from "@workspace/db"; import { eq, and } from "drizzle-orm"; import { lnbitsService } from "../lib/lnbits.js"; +import { sessionsLimiter } from "../lib/rate-limiter.js"; +import { eventBus } from "../lib/event-bus.js"; import { agentService } from "../lib/agent.js"; import { pricingService } from "../lib/pricing.js"; import { getBtcPriceUsd, usdToSats } from "../lib/btc-oracle.js"; @@ -133,7 +135,7 @@ async function advanceTopup(session: Session): Promise { // ── POST /sessions ───────────────────────────────────────────────────────────── -router.post("/sessions", async (req: Request, res: Response) => { +router.post("/sessions", sessionsLimiter, async (req: Request, res: Response) => { const rawAmount = req.body?.amount_sats; const amountSats = parseInt(String(rawAmount ?? ""), 10); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d2857ea..b264864 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -184,6 +184,12 @@ importers: express: specifier: ^5 version: 5.2.1 + express-rate-limit: + specifier: ^8.3.1 + version: 8.3.1(express@5.2.1) + ws: + specifier: ^8.19.0 + version: 8.19.0 devDependencies: '@types/cookie-parser': specifier: ^1.4.10 @@ -197,6 +203,9 @@ importers: '@types/node': specifier: 'catalog:' version: 25.3.5 + '@types/ws': + specifier: ^8.18.1 + version: 8.18.1 esbuild: specifier: 0.27.3 version: 0.27.3 @@ -443,10 +452,6 @@ importers: version: 7.1.1 scripts: - dependencies: - '@workspace/integrations-anthropic-ai': - specifier: workspace:* - version: link:../lib/integrations-anthropic-ai devDependencies: '@types/node': specifier: 'catalog:' @@ -1516,6 +1521,9 @@ packages: '@types/unist@3.0.3': resolution: {integrity: sha512-ko/gIFJRv177XgZsZcBwnqJN5x/Gien8qNOn0D5bQU/zAzVf9Zt3BlcUiLqhV9y4ARk0GbT3tnUiPNgnTXzc/Q==} + '@types/ws@8.18.1': + resolution: {integrity: sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==} + '@vitejs/plugin-react@5.1.4': resolution: {integrity: sha512-VIcFLdRi/VYRU8OL/puL7QXMYafHmqOnwTZY50U1JPlCNj30PxCMx65c494b1K9be9hX83KVt0+gTEwTWLqToA==} engines: {node: ^20.19.0 || >=22.12.0} @@ -1928,6 +1936,12 @@ packages: resolution: {integrity: sha512-9Be3ZoN4LmYR90tUoVu2te2BsbzHfhJyfEiAVfz7N5/zv+jduIfLrV2xdQXOHbaD6KgpGdO9PRPM1Y4Q9QkPkA==} engines: {node: ^18.19.0 || >=20.5.0} + express-rate-limit@8.3.1: + resolution: {integrity: sha512-D1dKN+cmyPWuvB+G2SREQDzPY1agpBIcTa9sJxOPMCNeH3gwzhqJRDWCXW3gg0y//+LQ/8j52JbMROWyrKdMdw==} + engines: {node: '>= 16'} + peerDependencies: + express: '>= 4.11' + express@5.2.1: resolution: {integrity: sha512-hIS4idWWai69NezIdRt2xFVofaF4j+6INOpJlVOLDO8zXGpUVEVzIYk12UUi2JzjEzWL3IOAxcTubgz9Po0yXw==} engines: {node: '>= 18'} @@ -2083,6 +2097,10 @@ packages: resolution: {integrity: sha512-5Hh7Y1wQbvY5ooGgPbDaL5iYLAPzMTUrjMulskHLH6wnv/A+1q5rgEaiuqEjB+oxGXIVZs1FF+R/KPN3ZSQYYg==} engines: {node: '>=12'} + ip-address@10.1.0: + resolution: {integrity: sha512-XXADHxXmvT9+CRxhXg56LJovE+bmWnEWB78LB83VZTprKTmaC5QfruXocxzTZ2Kl0DNwKuBdlIhjL8LeY8Sf8Q==} + engines: {node: '>= 12'} + ipaddr.js@1.9.1: resolution: {integrity: sha512-0KI/607xoxSToH7GjN1FfSbLoU0+btTicjsQSWQlh/hZykN8KpmMf7uYwPW3R+akZ6R/w18ZlXSHBYXiYUPO3g==} engines: {node: '>= 0.10'} @@ -2841,6 +2859,18 @@ packages: wrappy@1.0.2: resolution: {integrity: sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==} + ws@8.19.0: + resolution: {integrity: sha512-blAT2mjOEIi0ZzruJfIhb3nps74PRWTCz1IjglWEEpQl5XS/UNama6u2/rjFkDDouqr4L67ry+1aGIALViWjDg==} + engines: {node: '>=10.0.0'} + peerDependencies: + bufferutil: ^4.0.1 + utf-8-validate: '>=5.0.2' + peerDependenciesMeta: + bufferutil: + optional: true + utf-8-validate: + optional: true + xtend@4.0.2: resolution: {integrity: sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==} engines: {node: '>=0.4'} @@ -4092,6 +4122,10 @@ snapshots: '@types/unist@3.0.3': {} + '@types/ws@8.18.1': + dependencies: + '@types/node': 25.3.5 + '@vitejs/plugin-react@5.1.4(vite@7.3.1(@types/node@25.3.5)(jiti@2.6.1)(lightningcss@1.31.1)(tsx@4.21.0)(yaml@2.8.2))': dependencies: '@babel/core': 7.29.0 @@ -4404,6 +4438,11 @@ snapshots: strip-final-newline: 4.0.0 yoctocolors: 2.1.2 + express-rate-limit@8.3.1(express@5.2.1): + dependencies: + express: 5.2.1 + ip-address: 10.1.0 + express@5.2.1: dependencies: accepts: 2.0.0 @@ -4586,6 +4625,8 @@ snapshots: internmap@2.0.3: {} + ip-address@10.1.0: {} + ipaddr.js@1.9.1: {} is-extglob@2.1.1: {} @@ -5271,6 +5312,8 @@ snapshots: wrappy@1.0.2: {} + ws@8.19.0: {} + xtend@4.0.2: {} yallist@3.1.1: {}