fix(testkit): macOS compat + fix test 8c ordering (#24)
This commit is contained in:
@@ -6,6 +6,13 @@ import { CreateJobBody, GetJobParams } from "@workspace/api-zod";
|
||||
import { lnbitsService } from "../lib/lnbits.js";
|
||||
import { agentService } from "../lib/agent.js";
|
||||
import { pricingService } from "../lib/pricing.js";
|
||||
import { jobsLimiter } from "../lib/rate-limiter.js";
|
||||
import { eventBus } from "../lib/event-bus.js";
|
||||
import { streamRegistry } from "../lib/stream-registry.js";
|
||||
import { makeLogger } from "../lib/logger.js";
|
||||
import { latencyHistogram } from "../lib/histogram.js";
|
||||
|
||||
const logger = makeLogger("jobs");
|
||||
|
||||
const router = Router();
|
||||
|
||||
@@ -24,8 +31,18 @@ async function getInvoiceById(id: string) {
|
||||
* return immediately with "evaluating" state instead of blocking 5-8 seconds.
|
||||
*/
|
||||
async function runEvalInBackground(jobId: string, request: string): Promise<void> {
|
||||
const evalStart = Date.now();
|
||||
try {
|
||||
const evalResult = await agentService.evaluateRequest(request);
|
||||
latencyHistogram.record("eval_phase", Date.now() - evalStart);
|
||||
|
||||
logger.info("eval result", {
|
||||
jobId,
|
||||
accepted: evalResult.accepted,
|
||||
reason: evalResult.reason,
|
||||
inputTokens: evalResult.inputTokens,
|
||||
outputTokens: evalResult.outputTokens,
|
||||
});
|
||||
|
||||
if (evalResult.accepted) {
|
||||
const inputEst = pricingService.estimateInputTokens(request);
|
||||
@@ -65,11 +82,13 @@ async function runEvalInBackground(jobId: string, request: string): Promise<void
|
||||
})
|
||||
.where(eq(jobs.id, jobId));
|
||||
});
|
||||
eventBus.publish({ type: "job:state", jobId, state: "awaiting_work_payment" });
|
||||
} else {
|
||||
await db
|
||||
.update(jobs)
|
||||
.set({ state: "rejected", rejectionReason: evalResult.reason, updatedAt: new Date() })
|
||||
.where(eq(jobs.id, jobId));
|
||||
eventBus.publish({ type: "job:state", jobId, state: "rejected" });
|
||||
}
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : "Evaluation error";
|
||||
@@ -77,15 +96,25 @@ async function runEvalInBackground(jobId: string, request: string): Promise<void
|
||||
.update(jobs)
|
||||
.set({ state: "failed", errorMessage: message, updatedAt: new Date() })
|
||||
.where(eq(jobs.id, jobId));
|
||||
eventBus.publish({ type: "job:failed", jobId, reason: message });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs the AI work execution in a background task so HTTP polls return fast.
|
||||
* Uses streaming so any connected SSE client receives tokens in real time (#3).
|
||||
*/
|
||||
async function runWorkInBackground(jobId: string, request: string, workAmountSats: number, btcPriceUsd: number | null): Promise<void> {
|
||||
const workStart = Date.now();
|
||||
try {
|
||||
const workResult = await agentService.executeWork(request);
|
||||
eventBus.publish({ type: "job:state", jobId, state: "executing" });
|
||||
|
||||
const workResult = await agentService.executeWorkStreaming(request, (delta) => {
|
||||
streamRegistry.write(jobId, delta);
|
||||
});
|
||||
|
||||
streamRegistry.end(jobId);
|
||||
latencyHistogram.record("work_phase", Date.now() - workStart);
|
||||
|
||||
const actualCostUsd = pricingService.calculateActualCostUsd(
|
||||
workResult.inputTokens,
|
||||
@@ -112,12 +141,24 @@ async function runWorkInBackground(jobId: string, request: string, workAmountSat
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(jobs.id, jobId));
|
||||
|
||||
logger.info("work completed", {
|
||||
jobId,
|
||||
inputTokens: workResult.inputTokens,
|
||||
outputTokens: workResult.outputTokens,
|
||||
actualAmountSats,
|
||||
refundAmountSats,
|
||||
refundState,
|
||||
});
|
||||
eventBus.publish({ type: "job:completed", jobId, result: workResult.result });
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : "Execution error";
|
||||
streamRegistry.end(jobId);
|
||||
await db
|
||||
.update(jobs)
|
||||
.set({ state: "failed", errorMessage: message, updatedAt: new Date() })
|
||||
.where(eq(jobs.id, jobId));
|
||||
eventBus.publish({ type: "job:failed", jobId, reason: message });
|
||||
}
|
||||
}
|
||||
|
||||
@@ -149,6 +190,10 @@ async function advanceJob(job: Job): Promise<Job | null> {
|
||||
|
||||
if (!advanced) return getJobById(job.id);
|
||||
|
||||
logger.info("invoice paid", { jobId: job.id, invoiceType: "eval", paymentHash: evalInvoice.paymentHash });
|
||||
eventBus.publish({ type: "job:paid", jobId: job.id, invoiceType: "eval" });
|
||||
eventBus.publish({ type: "job:state", jobId: job.id, state: "evaluating" });
|
||||
|
||||
// Fire AI eval in background — poll returns immediately with "evaluating"
|
||||
setImmediate(() => { void runEvalInBackground(job.id, job.request); });
|
||||
|
||||
@@ -177,6 +222,12 @@ async function advanceJob(job: Job): Promise<Job | null> {
|
||||
|
||||
if (!advanced) return getJobById(job.id);
|
||||
|
||||
logger.info("invoice paid", { jobId: job.id, invoiceType: "work", paymentHash: workInvoice.paymentHash });
|
||||
eventBus.publish({ type: "job:paid", jobId: job.id, invoiceType: "work" });
|
||||
|
||||
// Register stream slot before firing background work so first tokens aren't lost
|
||||
streamRegistry.register(job.id);
|
||||
|
||||
// Fire AI work in background — poll returns immediately with "executing"
|
||||
setImmediate(() => { void runWorkInBackground(job.id, job.request, job.workAmountSats ?? 0, job.btcPriceUsd); });
|
||||
|
||||
@@ -188,7 +239,7 @@ async function advanceJob(job: Job): Promise<Job | null> {
|
||||
|
||||
// ── POST /jobs ────────────────────────────────────────────────────────────────
|
||||
|
||||
router.post("/jobs", async (req: Request, res: Response) => {
|
||||
router.post("/jobs", jobsLimiter, async (req: Request, res: Response) => {
|
||||
const parseResult = CreateJobBody.safeParse(req.body);
|
||||
if (!parseResult.success) {
|
||||
const issue = parseResult.error.issues[0];
|
||||
@@ -221,6 +272,8 @@ router.post("/jobs", async (req: Request, res: Response) => {
|
||||
await tx.update(jobs).set({ evalInvoiceId: invoiceId, updatedAt: new Date() }).where(eq(jobs.id, jobId));
|
||||
});
|
||||
|
||||
logger.info("job created", { jobId, evalAmountSats: evalFee, stubMode: lnbitsService.stubMode });
|
||||
|
||||
res.status(201).json({
|
||||
jobId,
|
||||
evalInvoice: {
|
||||
@@ -231,6 +284,7 @@ router.post("/jobs", async (req: Request, res: Response) => {
|
||||
});
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : "Failed to create job";
|
||||
logger.error("job creation failed", { error: message });
|
||||
res.status(500).json({ error: message });
|
||||
}
|
||||
});
|
||||
@@ -404,4 +458,130 @@ router.post("/jobs/:id/refund", async (req: Request, res: Response) => {
|
||||
}
|
||||
});
|
||||
|
||||
// ── GET /jobs/:id/stream ──────────────────────────────────────────────────────
|
||||
// Server-Sent Events (#3): streams Claude token deltas in real time while the
|
||||
// job is executing. If the job is already complete, sends the full result then
|
||||
// closes. If the job isn't executing yet, waits up to 60 s for it to start.
|
||||
|
||||
router.get("/jobs/:id/stream", async (req: Request, res: Response) => {
|
||||
const paramResult = GetJobParams.safeParse(req.params);
|
||||
if (!paramResult.success) {
|
||||
res.status(400).json({ error: "Invalid job id" });
|
||||
return;
|
||||
}
|
||||
const { id } = paramResult.data;
|
||||
|
||||
const job = await getJobById(id);
|
||||
if (!job) {
|
||||
res.status(404).json({ error: "Job not found" });
|
||||
return;
|
||||
}
|
||||
|
||||
res.setHeader("Content-Type", "text/event-stream");
|
||||
res.setHeader("Cache-Control", "no-cache");
|
||||
res.setHeader("Connection", "keep-alive");
|
||||
res.setHeader("X-Accel-Buffering", "no");
|
||||
res.flushHeaders();
|
||||
|
||||
const sendEvent = (event: string, data: unknown) => {
|
||||
res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`);
|
||||
};
|
||||
|
||||
// Job already complete — replay full result immediately
|
||||
if (job.state === "complete" && job.result) {
|
||||
sendEvent("token", { text: job.result });
|
||||
sendEvent("done", { jobId: id, state: "complete" });
|
||||
res.end();
|
||||
return;
|
||||
}
|
||||
|
||||
if (job.state === "failed") {
|
||||
sendEvent("error", { jobId: id, message: job.errorMessage ?? "Job failed" });
|
||||
res.end();
|
||||
return;
|
||||
}
|
||||
|
||||
// Job is executing or about to execute — pipe the live stream
|
||||
const sendHeartbeat = setInterval(() => {
|
||||
res.write(": heartbeat\n\n");
|
||||
}, 15_000);
|
||||
|
||||
const cleanup = () => {
|
||||
clearInterval(sendHeartbeat);
|
||||
};
|
||||
|
||||
req.on("close", cleanup);
|
||||
|
||||
// ── Wait for stream slot (fixes #16 race condition) ──────────────────────
|
||||
// After the bus wait we re-check BOTH the stream registry AND the DB so we
|
||||
// handle: (a) job completed while we waited (stream already gone), (b) job
|
||||
// still executing but stream was registered after we first checked.
|
||||
let stream = streamRegistry.get(id);
|
||||
let currentJob = job;
|
||||
|
||||
if (!stream) {
|
||||
await new Promise<void>((resolve) => {
|
||||
// 90 s timeout — generous enough for slow payment confirmations on mainnet
|
||||
const deadline = setTimeout(resolve, 90_000);
|
||||
const busListener = (data: Parameters<typeof eventBus.publish>[0]) => {
|
||||
if ("jobId" in data && data.jobId === id) {
|
||||
clearTimeout(deadline);
|
||||
eventBus.off("bus", busListener);
|
||||
resolve();
|
||||
}
|
||||
};
|
||||
eventBus.on("bus", busListener);
|
||||
});
|
||||
|
||||
// Refresh both stream slot and job state after waiting
|
||||
stream = streamRegistry.get(id);
|
||||
currentJob = (await getJobById(id)) ?? currentJob;
|
||||
}
|
||||
|
||||
// ── Resolve: stream available ─────────────────────────────────────────────
|
||||
if (stream) {
|
||||
const attachToStream = (s: typeof stream) => {
|
||||
s!.on("data", (chunk: Buffer) => {
|
||||
sendEvent("token", { text: chunk.toString("utf8") });
|
||||
});
|
||||
s!.on("end", () => {
|
||||
sendEvent("done", { jobId: id, state: "complete" });
|
||||
res.end();
|
||||
cleanup();
|
||||
});
|
||||
s!.on("error", (err: Error) => {
|
||||
sendEvent("error", { jobId: id, message: err.message });
|
||||
res.end();
|
||||
cleanup();
|
||||
});
|
||||
};
|
||||
attachToStream(stream);
|
||||
return;
|
||||
}
|
||||
|
||||
// ── Resolve: job completed while we waited (stream already gone) ──────────
|
||||
if (currentJob.state === "complete" && currentJob.result) {
|
||||
sendEvent("token", { text: currentJob.result });
|
||||
sendEvent("done", { jobId: id, state: "complete" });
|
||||
res.end();
|
||||
cleanup();
|
||||
return;
|
||||
}
|
||||
|
||||
if (currentJob.state === "failed") {
|
||||
sendEvent("error", { jobId: id, message: currentJob.errorMessage ?? "Job failed" });
|
||||
res.end();
|
||||
cleanup();
|
||||
return;
|
||||
}
|
||||
|
||||
// ── Resolve: timeout with no activity — tell client to fall back to polling
|
||||
sendEvent("error", {
|
||||
jobId: id,
|
||||
message: "Stream timed out. Poll GET /api/jobs/:id for current state.",
|
||||
});
|
||||
res.end();
|
||||
cleanup();
|
||||
});
|
||||
|
||||
export default router;
|
||||
|
||||
Reference in New Issue
Block a user