import { Router, type Request, type Response } from "express"; import { randomUUID } from "crypto"; import { db, jobs, invoices, type Job } from "@workspace/db"; import { eq, and } from "drizzle-orm"; import { CreateJobBody, GetJobParams } from "@workspace/api-zod"; import { lnbitsService } from "../lib/lnbits.js"; import { agentService } from "../lib/agent.js"; import { pricingService } from "../lib/pricing.js"; import { jobsLimiter } from "../lib/rate-limiter.js"; import { eventBus } from "../lib/event-bus.js"; import { streamRegistry } from "../lib/stream-registry.js"; import { makeLogger } from "../lib/logger.js"; import { latencyHistogram } from "../lib/histogram.js"; const logger = makeLogger("jobs"); const router = Router(); async function getJobById(id: string): Promise { const rows = await db.select().from(jobs).where(eq(jobs.id, id)).limit(1); return rows[0] ?? null; } async function getInvoiceById(id: string) { const rows = await db.select().from(invoices).where(eq(invoices.id, id)).limit(1); return rows[0] ?? null; } /** * Runs the AI eval in a background task (fire-and-forget) so HTTP polls * return immediately with "evaluating" state instead of blocking 5-8 seconds. */ async function runEvalInBackground(jobId: string, request: string): Promise { const evalStart = Date.now(); try { const evalResult = await agentService.evaluateRequest(request); latencyHistogram.record("eval_phase", Date.now() - evalStart); logger.info("eval result", { jobId, accepted: evalResult.accepted, reason: evalResult.reason, inputTokens: evalResult.inputTokens, outputTokens: evalResult.outputTokens, }); if (evalResult.accepted) { const inputEst = pricingService.estimateInputTokens(request); const outputEst = pricingService.estimateOutputTokens(request); const breakdown = await pricingService.calculateWorkFeeSats( inputEst, outputEst, agentService.workModel, ); const workInvoiceData = await lnbitsService.createInvoice( breakdown.amountSats, `Work fee for job ${jobId}`, ); const workInvoiceId = randomUUID(); await db.transaction(async (tx) => { await tx.insert(invoices).values({ id: workInvoiceId, jobId, paymentHash: workInvoiceData.paymentHash, paymentRequest: workInvoiceData.paymentRequest, amountSats: breakdown.amountSats, type: "work", paid: false, }); await tx .update(jobs) .set({ state: "awaiting_work_payment", workInvoiceId, workAmountSats: breakdown.amountSats, estimatedCostUsd: breakdown.estimatedCostUsd, marginPct: breakdown.marginPct, btcPriceUsd: breakdown.btcPriceUsd, updatedAt: new Date(), }) .where(eq(jobs.id, jobId)); }); eventBus.publish({ type: "job:state", jobId, state: "awaiting_work_payment" }); } else { await db .update(jobs) .set({ state: "rejected", rejectionReason: evalResult.reason, updatedAt: new Date() }) .where(eq(jobs.id, jobId)); eventBus.publish({ type: "job:state", jobId, state: "rejected" }); } } catch (err) { const message = err instanceof Error ? err.message : "Evaluation error"; await db .update(jobs) .set({ state: "failed", errorMessage: message, updatedAt: new Date() }) .where(eq(jobs.id, jobId)); eventBus.publish({ type: "job:failed", jobId, reason: message }); } } /** * Runs the AI work execution in a background task so HTTP polls return fast. * Uses streaming so any connected SSE client receives tokens in real time (#3). */ async function runWorkInBackground(jobId: string, request: string, workAmountSats: number, btcPriceUsd: number | null): Promise { const workStart = Date.now(); try { eventBus.publish({ type: "job:state", jobId, state: "executing" }); const workResult = await agentService.executeWorkStreaming(request, (delta) => { streamRegistry.write(jobId, delta); }); streamRegistry.end(jobId); latencyHistogram.record("work_phase", Date.now() - workStart); const actualCostUsd = pricingService.calculateActualCostUsd( workResult.inputTokens, workResult.outputTokens, agentService.workModel, ); const lockedBtcPrice = btcPriceUsd ?? 100_000; const actualAmountSats = pricingService.calculateActualChargeSats(actualCostUsd, lockedBtcPrice); const refundAmountSats = pricingService.calculateRefundSats(workAmountSats, actualAmountSats); const refundState = refundAmountSats > 0 ? "pending" : "not_applicable"; await db .update(jobs) .set({ state: "complete", result: workResult.result, actualInputTokens: workResult.inputTokens, actualOutputTokens: workResult.outputTokens, actualCostUsd, actualAmountSats, refundAmountSats, refundState, updatedAt: new Date(), }) .where(eq(jobs.id, jobId)); logger.info("work completed", { jobId, inputTokens: workResult.inputTokens, outputTokens: workResult.outputTokens, actualAmountSats, refundAmountSats, refundState, }); eventBus.publish({ type: "job:completed", jobId, result: workResult.result }); } catch (err) { const message = err instanceof Error ? err.message : "Execution error"; streamRegistry.end(jobId); await db .update(jobs) .set({ state: "failed", errorMessage: message, updatedAt: new Date() }) .where(eq(jobs.id, jobId)); eventBus.publish({ type: "job:failed", jobId, reason: message }); } } /** * Checks whether the active invoice for a job has been paid and, if so, * advances the state machine. AI work runs in the background so this * returns quickly. Returns the refreshed job after any DB transitions. */ async function advanceJob(job: Job): Promise { if (job.state === "awaiting_eval_payment" && job.evalInvoiceId) { const evalInvoice = await getInvoiceById(job.evalInvoiceId); if (!evalInvoice || evalInvoice.paid) return getJobById(job.id); const isPaid = await lnbitsService.checkInvoicePaid(evalInvoice.paymentHash); if (!isPaid) return job; const advanced = await db.transaction(async (tx) => { await tx .update(invoices) .set({ paid: true, paidAt: new Date() }) .where(eq(invoices.id, evalInvoice.id)); const updated = await tx .update(jobs) .set({ state: "evaluating", updatedAt: new Date() }) .where(and(eq(jobs.id, job.id), eq(jobs.state, "awaiting_eval_payment"))) .returning(); return updated.length > 0; }); if (!advanced) return getJobById(job.id); logger.info("invoice paid", { jobId: job.id, invoiceType: "eval", paymentHash: evalInvoice.paymentHash }); eventBus.publish({ type: "job:paid", jobId: job.id, invoiceType: "eval" }); eventBus.publish({ type: "job:state", jobId: job.id, state: "evaluating" }); // Fire AI eval in background — poll returns immediately with "evaluating" setImmediate(() => { void runEvalInBackground(job.id, job.request); }); return getJobById(job.id); } if (job.state === "awaiting_work_payment" && job.workInvoiceId) { const workInvoice = await getInvoiceById(job.workInvoiceId); if (!workInvoice || workInvoice.paid) return getJobById(job.id); const isPaid = await lnbitsService.checkInvoicePaid(workInvoice.paymentHash); if (!isPaid) return job; const advanced = await db.transaction(async (tx) => { await tx .update(invoices) .set({ paid: true, paidAt: new Date() }) .where(eq(invoices.id, workInvoice.id)); const updated = await tx .update(jobs) .set({ state: "executing", updatedAt: new Date() }) .where(and(eq(jobs.id, job.id), eq(jobs.state, "awaiting_work_payment"))) .returning(); return updated.length > 0; }); if (!advanced) return getJobById(job.id); logger.info("invoice paid", { jobId: job.id, invoiceType: "work", paymentHash: workInvoice.paymentHash }); eventBus.publish({ type: "job:paid", jobId: job.id, invoiceType: "work" }); // Register stream slot before firing background work so first tokens aren't lost streamRegistry.register(job.id); // Fire AI work in background — poll returns immediately with "executing" setImmediate(() => { void runWorkInBackground(job.id, job.request, job.workAmountSats ?? 0, job.btcPriceUsd); }); return getJobById(job.id); } return job; } // ── POST /jobs ──────────────────────────────────────────────────────────────── router.post("/jobs", jobsLimiter, async (req: Request, res: Response) => { const parseResult = CreateJobBody.safeParse(req.body); if (!parseResult.success) { const issue = parseResult.error.issues[0]; const error = issue?.code === "too_big" ? "Invalid request: 'request' must be 500 characters or fewer" : "Invalid request: 'request' string is required"; res.status(400).json({ error }); return; } const { request } = parseResult.data; try { const evalFee = pricingService.calculateEvalFeeSats(); const jobId = randomUUID(); const invoiceId = randomUUID(); const createdAt = new Date(); const lnbitsInvoice = await lnbitsService.createInvoice(evalFee, `Eval fee for job ${jobId}`); await db.transaction(async (tx) => { await tx.insert(jobs).values({ id: jobId, request, state: "awaiting_eval_payment", evalAmountSats: evalFee, createdAt }); await tx.insert(invoices).values({ id: invoiceId, jobId, paymentHash: lnbitsInvoice.paymentHash, paymentRequest: lnbitsInvoice.paymentRequest, amountSats: evalFee, type: "eval", paid: false, }); await tx.update(jobs).set({ evalInvoiceId: invoiceId, updatedAt: new Date() }).where(eq(jobs.id, jobId)); }); logger.info("job created", { jobId, evalAmountSats: evalFee, stubMode: lnbitsService.stubMode }); res.status(201).json({ jobId, createdAt: createdAt.toISOString(), evalInvoice: { paymentRequest: lnbitsInvoice.paymentRequest, amountSats: evalFee, ...(lnbitsService.stubMode ? { paymentHash: lnbitsInvoice.paymentHash } : {}), }, }); } catch (err) { const message = err instanceof Error ? err.message : "Failed to create job"; logger.error("job creation failed", { error: message }); res.status(500).json({ error: message }); } }); // ── GET /jobs/:id ───────────────────────────────────────────────────────────── router.get("/jobs/:id", async (req: Request, res: Response) => { const paramResult = GetJobParams.safeParse(req.params); if (!paramResult.success) { res.status(400).json({ error: "Invalid job id" }); return; } const { id } = paramResult.data; try { let job = await getJobById(id); if (!job) { res.status(404).json({ error: "Job not found" }); return; } const advanced = await advanceJob(job); if (advanced) job = advanced; const base = { jobId: job.id, state: job.state, createdAt: job.createdAt.toISOString(), completedAt: job.state === "complete" ? job.updatedAt.toISOString() : null, }; switch (job.state) { case "awaiting_eval_payment": { const inv = job.evalInvoiceId ? await getInvoiceById(job.evalInvoiceId) : null; res.json({ ...base, ...(inv ? { evalInvoice: { paymentRequest: inv.paymentRequest, amountSats: inv.amountSats, ...(lnbitsService.stubMode ? { paymentHash: inv.paymentHash } : {}), }, } : {}), }); break; } case "awaiting_work_payment": { const inv = job.workInvoiceId ? await getInvoiceById(job.workInvoiceId) : null; res.json({ ...base, ...(inv ? { workInvoice: { paymentRequest: inv.paymentRequest, amountSats: inv.amountSats, ...(lnbitsService.stubMode ? { paymentHash: inv.paymentHash } : {}), }, } : {}), ...(job.estimatedCostUsd != null ? { pricingBreakdown: { estimatedCostUsd: job.estimatedCostUsd, marginPct: job.marginPct, btcPriceUsd: job.btcPriceUsd, }, } : {}), }); break; } case "rejected": res.json({ ...base, reason: job.rejectionReason ?? undefined }); break; case "complete": res.json({ ...base, result: job.result ?? undefined, ...(job.actualCostUsd != null ? { costLedger: { // Token usage actualInputTokens: job.actualInputTokens, actualOutputTokens: job.actualOutputTokens, totalTokens: (job.actualInputTokens ?? 0) + (job.actualOutputTokens ?? 0), // USD costs actualCostUsd: job.actualCostUsd, actualChargeUsd: job.actualAmountSats != null && job.btcPriceUsd ? (job.actualAmountSats / 1e8) * job.btcPriceUsd : undefined, estimatedCostUsd: job.estimatedCostUsd, // Sat amounts actualAmountSats: job.actualAmountSats, workAmountSats: job.workAmountSats, // Refund refundAmountSats: job.refundAmountSats, refundState: job.refundState, // Rate context marginPct: job.marginPct, btcPriceUsd: job.btcPriceUsd, }, } : {}), }); break; case "failed": res.json({ ...base, errorMessage: job.errorMessage ?? undefined }); break; default: res.json(base); } } catch (err) { const message = err instanceof Error ? err.message : "Failed to fetch job"; res.status(500).json({ error: message }); } }); // ── POST /jobs/:id/refund ───────────────────────────────────────────────────── router.post("/jobs/:id/refund", async (req: Request, res: Response) => { const paramResult = GetJobParams.safeParse(req.params); if (!paramResult.success) { res.status(400).json({ error: "Invalid job id" }); return; } const { id } = paramResult.data; const { invoice } = req.body as { invoice?: string }; if (!invoice || typeof invoice !== "string" || invoice.trim().length === 0) { res.status(400).json({ error: "Body must include 'invoice' (BOLT11 string)" }); return; } const bolt11 = invoice.trim(); try { const job = await getJobById(id); if (!job) { res.status(404).json({ error: "Job not found" }); return; } if (job.state !== "complete") { res.status(409).json({ error: `Job is in state '${job.state}', not 'complete'` }); return; } if (job.refundState !== "pending") { if (job.refundState === "not_applicable") { res.status(409).json({ error: "No refund is owed for this job (actual cost matched or exceeded the invoice amount)" }); } else if (job.refundState === "paid") { res.status(409).json({ error: "Refund has already been sent", refundPaymentHash: job.refundPaymentHash }); } else { res.status(409).json({ error: "Refund is not available for this job" }); } return; } const refundSats = job.refundAmountSats ?? 0; if (refundSats <= 0) { res.status(409).json({ error: "Refund amount is zero — nothing to return" }); return; } // ── Validate invoice amount ─────────────────────────────────────────── const decoded = await lnbitsService.decodeInvoice(bolt11); if (decoded !== null && decoded.amountSats !== refundSats) { res.status(400).json({ error: `Invoice amount (${decoded.amountSats} sats) does not match refund owed (${refundSats} sats)`, refundAmountSats: refundSats, }); return; } // ── Send refund ─────────────────────────────────────────────────────── const paymentHash = await lnbitsService.payInvoice(bolt11); await db .update(jobs) .set({ refundState: "paid", refundPaymentHash: paymentHash, updatedAt: new Date() }) .where(and(eq(jobs.id, id), eq(jobs.refundState, "pending"))); res.json({ ok: true, refundAmountSats: refundSats, paymentHash, message: `Refund of ${refundSats} sats sent successfully`, }); } catch (err) { const message = err instanceof Error ? err.message : "Failed to process refund"; res.status(500).json({ error: message }); } }); // ── GET /jobs/:id/stream ────────────────────────────────────────────────────── // Server-Sent Events (#3): streams Claude token deltas in real time while the // job is executing. If the job is already complete, sends the full result then // closes. If the job isn't executing yet, waits up to 60 s for it to start. router.get("/jobs/:id/stream", async (req: Request, res: Response) => { const paramResult = GetJobParams.safeParse(req.params); if (!paramResult.success) { res.status(400).json({ error: "Invalid job id" }); return; } const { id } = paramResult.data; const job = await getJobById(id); if (!job) { res.status(404).json({ error: "Job not found" }); return; } res.setHeader("Content-Type", "text/event-stream"); res.setHeader("Cache-Control", "no-cache"); res.setHeader("Connection", "keep-alive"); res.setHeader("X-Accel-Buffering", "no"); res.flushHeaders(); const sendEvent = (event: string, data: unknown) => { res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`); }; // Job already complete — replay full result immediately if (job.state === "complete" && job.result) { sendEvent("token", { text: job.result }); sendEvent("done", { jobId: id, state: "complete" }); res.end(); return; } if (job.state === "failed") { sendEvent("error", { jobId: id, message: job.errorMessage ?? "Job failed" }); res.end(); return; } // Job is executing or about to execute — pipe the live stream const sendHeartbeat = setInterval(() => { res.write(": heartbeat\n\n"); }, 15_000); const cleanup = () => { clearInterval(sendHeartbeat); }; req.on("close", cleanup); // ── Wait for stream slot (fixes #16 race condition) ────────────────────── // After the bus wait we re-check BOTH the stream registry AND the DB so we // handle: (a) job completed while we waited (stream already gone), (b) job // still executing but stream was registered after we first checked. let stream = streamRegistry.get(id); let currentJob = job; if (!stream) { await new Promise((resolve) => { // 90 s timeout — generous enough for slow payment confirmations on mainnet const deadline = setTimeout(resolve, 90_000); const busListener = (data: Parameters[0]) => { if ("jobId" in data && data.jobId === id) { clearTimeout(deadline); eventBus.off("bus", busListener); resolve(); } }; eventBus.on("bus", busListener); }); // Refresh both stream slot and job state after waiting stream = streamRegistry.get(id); currentJob = (await getJobById(id)) ?? currentJob; } // ── Resolve: stream available ───────────────────────────────────────────── if (stream) { const attachToStream = (s: typeof stream) => { s!.on("data", (chunk: Buffer) => { sendEvent("token", { text: chunk.toString("utf8") }); }); s!.on("end", () => { sendEvent("done", { jobId: id, state: "complete" }); res.end(); cleanup(); }); s!.on("error", (err: Error) => { sendEvent("error", { jobId: id, message: err.message }); res.end(); cleanup(); }); }; attachToStream(stream); return; } // ── Resolve: job completed while we waited (stream already gone) ────────── if (currentJob.state === "complete" && currentJob.result) { sendEvent("token", { text: currentJob.result }); sendEvent("done", { jobId: id, state: "complete" }); res.end(); cleanup(); return; } if (currentJob.state === "failed") { sendEvent("error", { jobId: id, message: currentJob.errorMessage ?? "Job failed" }); res.end(); cleanup(); return; } // ── Resolve: timeout with no activity — tell client to fall back to polling sendEvent("error", { jobId: id, message: "Stream timed out. Poll GET /api/jobs/:id for current state.", }); res.end(); cleanup(); }); export default router;