Add session mode for pre-funded request processing

Implement session-based API endpoints for creating, managing, and interacting with pre-funded sessions, including deposit and top-up invoice generation, macaroon authentication, and per-request debiting of compute costs.

Replit-Commit-Author: Agent
Replit-Commit-Session-Id: 418bf6f8-212b-4bb0-a7a5-8231a061da4e
Replit-Commit-Checkpoint-Type: full_checkpoint
Replit-Commit-Event-Id: 2dc3847e-7186-4a22-9c7e-16cd31bca8d9
Replit-Commit-Screenshot-Url: https://storage.googleapis.com/screenshot-production-us-central1/9f85e954-647c-46a5-90a7-396e495a805a/418bf6f8-212b-4bb0-a7a5-8231a061da4e/sPDHkg8
Replit-Helium-Checkpoint-Created: true
This commit is contained in:
alexpaynex
2026-03-18 20:00:24 +00:00
parent dfc9ecdc7b
commit ab2cc06a79
29 changed files with 1075 additions and 978 deletions

View File

@@ -3,6 +3,8 @@ import { anthropic } from "@workspace/integrations-anthropic-ai";
export interface EvalResult {
accepted: boolean;
reason: string;
inputTokens: number;
outputTokens: number;
}
export interface WorkResult {
@@ -49,7 +51,12 @@ Respond ONLY with valid JSON: {"accepted": true, "reason": "..."} or {"accepted"
throw new Error(`Failed to parse eval JSON: ${block.text}`);
}
return { accepted: Boolean(parsed.accepted), reason: parsed.reason ?? "" };
return {
accepted: Boolean(parsed.accepted),
reason: parsed.reason ?? "",
inputTokens: message.usage.input_tokens,
outputTokens: message.usage.output_tokens,
};
}
async executeWork(requestText: string): Promise<WorkResult> {

View File

@@ -2,6 +2,7 @@ import { Router, type IRouter } from "express";
import healthRouter from "./health.js";
import jobsRouter from "./jobs.js";
import bootstrapRouter from "./bootstrap.js";
import sessionsRouter from "./sessions.js";
import demoRouter from "./demo.js";
import devRouter from "./dev.js";
import testkitRouter from "./testkit.js";
@@ -12,6 +13,7 @@ const router: IRouter = Router();
router.use(healthRouter);
router.use(jobsRouter);
router.use(bootstrapRouter);
router.use(sessionsRouter);
router.use(demoRouter);
router.use(testkitRouter);
router.use(uiRouter);

View File

@@ -0,0 +1,438 @@
import { Router, type Request, type Response } from "express";
import { randomBytes, randomUUID } from "crypto";
import { db, sessions, sessionRequests, type Session } from "@workspace/db";
import { eq, and } from "drizzle-orm";
import { lnbitsService } from "../lib/lnbits.js";
import { agentService } from "../lib/agent.js";
import { pricingService } from "../lib/pricing.js";
import { getBtcPriceUsd, usdToSats } from "../lib/btc-oracle.js";
const router = Router();
// ── Env-var config ─────────────────────────────────────────────────────────────
function envInt(name: string, fallback: number): number {
const raw = parseInt(process.env[name] ?? "", 10);
return Number.isFinite(raw) && raw > 0 ? raw : fallback;
}
const MIN_DEPOSIT_SATS = envInt("SESSION_MIN_DEPOSIT_SATS", 100);
const MAX_DEPOSIT_SATS = envInt("SESSION_MAX_DEPOSIT_SATS", 10_000);
const MIN_BALANCE_SATS = envInt("SESSION_MIN_BALANCE_SATS", 50);
const EXPIRY_HOURS = envInt("SESSION_EXPIRY_HOURS", 24);
const EXPIRY_MS = EXPIRY_HOURS * 60 * 60 * 1000;
// ── Helpers ────────────────────────────────────────────────────────────────────
async function getSessionById(id: string): Promise<Session | null> {
const rows = await db.select().from(sessions).where(eq(sessions.id, id)).limit(1);
return rows[0] ?? null;
}
function checkExpired(session: Session): boolean {
return session.expiresAt !== null && new Date() > session.expiresAt;
}
function extractMacaroon(req: Request): string | null {
const auth = req.headers.authorization ?? "";
if (auth.startsWith("Bearer ")) return auth.slice(7).trim();
return null;
}
function sessionView(session: Session, includeInvoice = false) {
const base = {
sessionId: session.id,
state: session.state,
balanceSats: session.balanceSats,
expiresAt: session.expiresAt?.toISOString() ?? null,
minimumBalanceSats: MIN_BALANCE_SATS,
...(session.macaroon && (session.state === "active" || session.state === "paused")
? { macaroon: session.macaroon }
: {}),
};
if (includeInvoice && session.state === "awaiting_payment") {
return {
...base,
invoice: {
paymentRequest: session.depositPaymentRequest,
amountSats: session.depositAmountSats,
...(lnbitsService.stubMode ? { paymentHash: session.depositPaymentHash } : {}),
},
};
}
if (session.topupPaymentHash && !session.topupPaid) {
return {
...base,
pendingTopup: {
paymentRequest: session.topupPaymentRequest,
amountSats: session.topupAmountSats,
...(lnbitsService.stubMode ? { paymentHash: session.topupPaymentHash } : {}),
},
};
}
return base;
}
// ── Auto-advance: awaiting_payment → active ────────────────────────────────────
async function advanceSessionPayment(session: Session): Promise<Session> {
if (session.state !== "awaiting_payment" || session.depositPaid) return session;
const paid = await lnbitsService.checkInvoicePaid(session.depositPaymentHash);
if (!paid) return session;
const macaroon = randomBytes(32).toString("hex");
const expiresAt = new Date(Date.now() + EXPIRY_MS);
const updated = await db
.update(sessions)
.set({
state: "active",
depositPaid: true,
balanceSats: session.depositAmountSats,
macaroon,
expiresAt,
updatedAt: new Date(),
})
.where(and(eq(sessions.id, session.id), eq(sessions.state, "awaiting_payment")))
.returning();
return updated[0] ?? session;
}
// ── Auto-advance: pending topup paid → credit balance ─────────────────────────
async function advanceTopup(session: Session): Promise<Session> {
if (!session.topupPaymentHash || session.topupPaid) return session;
const paid = await lnbitsService.checkInvoicePaid(session.topupPaymentHash);
if (!paid) return session;
const newBalance = session.balanceSats + (session.topupAmountSats ?? 0);
const newState =
session.state === "paused" && newBalance >= MIN_BALANCE_SATS ? "active" : session.state;
const expiresAt = new Date(Date.now() + EXPIRY_MS);
const updated = await db
.update(sessions)
.set({
state: newState,
topupPaid: true,
balanceSats: newBalance,
expiresAt,
updatedAt: new Date(),
})
.where(eq(sessions.id, session.id))
.returning();
return updated[0] ?? session;
}
// ── POST /sessions ─────────────────────────────────────────────────────────────
router.post("/sessions", async (req: Request, res: Response) => {
const rawAmount = req.body?.amount_sats;
const amountSats = parseInt(String(rawAmount ?? ""), 10);
if (!Number.isFinite(amountSats) || amountSats < MIN_DEPOSIT_SATS || amountSats > MAX_DEPOSIT_SATS) {
res.status(400).json({
error: `amount_sats must be an integer between ${MIN_DEPOSIT_SATS} and ${MAX_DEPOSIT_SATS}`,
});
return;
}
try {
const sessionId = randomUUID();
const invoice = await lnbitsService.createInvoice(amountSats, `Session deposit ${sessionId}`);
await db.insert(sessions).values({
id: sessionId,
state: "awaiting_payment",
balanceSats: 0,
depositAmountSats: amountSats,
depositPaymentHash: invoice.paymentHash,
depositPaymentRequest: invoice.paymentRequest,
depositPaid: false,
expiresAt: new Date(Date.now() + EXPIRY_MS),
});
res.status(201).json({
sessionId,
state: "awaiting_payment",
invoice: {
paymentRequest: invoice.paymentRequest,
amountSats,
...(lnbitsService.stubMode ? { paymentHash: invoice.paymentHash } : {}),
},
});
} catch (err) {
res.status(500).json({ error: err instanceof Error ? err.message : "Failed to create session" });
}
});
// ── GET /sessions/:id ─────────────────────────────────────────────────────────
router.get("/sessions/:id", async (req: Request, res: Response) => {
const id = req.params.id as string;
try {
let session = await getSessionById(id);
if (!session) { res.status(404).json({ error: "Session not found" }); return; }
// Mark expired sessions
if (checkExpired(session) && session.state !== "expired") {
await db
.update(sessions)
.set({ state: "expired", updatedAt: new Date() })
.where(eq(sessions.id, id));
session = (await getSessionById(id))!;
}
// Auto-advance deposit payment
if (session.state === "awaiting_payment") {
session = await advanceSessionPayment(session);
}
// Auto-advance topup payment
if (session.topupPaymentHash && !session.topupPaid) {
session = await advanceTopup(session);
}
res.json(sessionView(session, true));
} catch (err) {
res.status(500).json({ error: err instanceof Error ? err.message : "Failed to fetch session" });
}
});
// ── POST /sessions/:id/request ────────────────────────────────────────────────
router.post("/sessions/:id/request", async (req: Request, res: Response) => {
const id = req.params.id as string;
const macaroon = extractMacaroon(req);
const requestText = typeof req.body?.request === "string" ? req.body.request.trim() : "";
if (!requestText) {
res.status(400).json({ error: "Body must include 'request' string" });
return;
}
try {
let session = await getSessionById(id);
if (!session) { res.status(404).json({ error: "Session not found" }); return; }
// Auth
if (!macaroon || macaroon !== session.macaroon) {
res.status(401).json({ error: "Invalid or missing macaroon. Include 'Authorization: Bearer <macaroon>' header." });
return;
}
// State checks
if (checkExpired(session) || session.state === "expired") {
res.status(410).json({ error: "Session has expired" });
return;
}
if (session.state === "paused") {
res.status(402).json({
error: "Insufficient balance",
balance: session.balanceSats,
minimumRequired: MIN_BALANCE_SATS,
});
return;
}
if (session.state !== "active") {
res.status(409).json({ error: `Session is in state '${session.state}'` });
return;
}
if (session.balanceSats < MIN_BALANCE_SATS) {
// Mark as paused before returning
await db
.update(sessions)
.set({ state: "paused", updatedAt: new Date() })
.where(eq(sessions.id, id));
res.status(402).json({
error: "Insufficient balance",
balance: session.balanceSats,
minimumRequired: MIN_BALANCE_SATS,
});
return;
}
// ── Run the request ───────────────────────────────────────────────────────
const requestId = randomUUID();
const btcPriceUsd = await getBtcPriceUsd();
// Eval phase
const evalResult = await agentService.evaluateRequest(requestText);
const evalCostUsd = pricingService.calculateActualCostUsd(
evalResult.inputTokens,
evalResult.outputTokens,
agentService.evalModel,
);
let workInputTokens = 0;
let workOutputTokens = 0;
let workCostUsd = 0;
let result: string | null = null;
let finalState: "complete" | "rejected" | "failed" = "rejected";
let reason: string | null = null;
let errorMessage: string | null = null;
if (evalResult.accepted) {
try {
const workResult = await agentService.executeWork(requestText);
workInputTokens = workResult.inputTokens;
workOutputTokens = workResult.outputTokens;
workCostUsd = pricingService.calculateActualCostUsd(
workResult.inputTokens,
workResult.outputTokens,
agentService.workModel,
);
result = workResult.result;
finalState = "complete";
} catch (err) {
errorMessage = err instanceof Error ? err.message : "Execution error";
finalState = "failed";
}
} else {
reason = evalResult.reason;
}
// ── Honest accounting ────────────────────────────────────────────────────
const totalTokenCostUsd = evalCostUsd + workCostUsd;
const chargeUsd = pricingService.calculateActualChargeUsd(totalTokenCostUsd);
const debitedSats = usdToSats(chargeUsd, btcPriceUsd);
const newBalance = session.balanceSats - debitedSats;
const newSessionState = newBalance < MIN_BALANCE_SATS ? "paused" : "active";
const expiresAt = new Date(Date.now() + EXPIRY_MS);
// Persist session request + update session balance atomically
await db.transaction(async (tx) => {
await tx.insert(sessionRequests).values({
id: requestId,
sessionId: id,
request: requestText,
state: finalState,
result,
reason,
errorMessage,
evalInputTokens: evalResult.inputTokens,
evalOutputTokens: evalResult.outputTokens,
workInputTokens: workInputTokens || null,
workOutputTokens: workOutputTokens || null,
debitedSats,
balanceAfterSats: newBalance,
btcPriceUsd,
});
await tx
.update(sessions)
.set({
balanceSats: newBalance,
state: newSessionState,
expiresAt,
updatedAt: new Date(),
})
.where(eq(sessions.id, id));
});
res.json({
requestId,
state: finalState,
...(result ? { result } : {}),
...(reason ? { reason } : {}),
...(errorMessage ? { errorMessage } : {}),
debitedSats,
balanceRemaining: newBalance,
cost: {
evalSats: usdToSats(
pricingService.calculateActualChargeUsd(evalCostUsd),
btcPriceUsd,
),
workSats: workCostUsd > 0
? usdToSats(pricingService.calculateActualChargeUsd(workCostUsd), btcPriceUsd)
: 0,
totalSats: debitedSats,
btcPriceUsd,
},
});
} catch (err) {
res.status(500).json({ error: err instanceof Error ? err.message : "Request failed" });
}
});
// ── POST /sessions/:id/topup ──────────────────────────────────────────────────
router.post("/sessions/:id/topup", async (req: Request, res: Response) => {
const id = req.params.id as string;
const macaroon = extractMacaroon(req);
const rawAmount = req.body?.amount_sats;
const amountSats = parseInt(String(rawAmount ?? ""), 10);
if (!Number.isFinite(amountSats) || amountSats < MIN_DEPOSIT_SATS || amountSats > MAX_DEPOSIT_SATS) {
res.status(400).json({
error: `amount_sats must be an integer between ${MIN_DEPOSIT_SATS} and ${MAX_DEPOSIT_SATS}`,
});
return;
}
try {
const session = await getSessionById(id);
if (!session) { res.status(404).json({ error: "Session not found" }); return; }
if (!macaroon || macaroon !== session.macaroon) {
res.status(401).json({ error: "Invalid or missing macaroon" });
return;
}
if (session.state !== "active" && session.state !== "paused") {
res.status(409).json({ error: `Cannot top up a session in state '${session.state}'` });
return;
}
if (session.topupPaymentHash && !session.topupPaid) {
res.status(409).json({
error: "A topup invoice is already pending. Pay it first or poll GET /sessions/:id.",
pendingTopup: {
paymentRequest: session.topupPaymentRequest,
amountSats: session.topupAmountSats,
...(lnbitsService.stubMode ? { paymentHash: session.topupPaymentHash } : {}),
},
});
return;
}
const invoice = await lnbitsService.createInvoice(amountSats, `Session topup ${id}`);
await db
.update(sessions)
.set({
topupAmountSats: amountSats,
topupPaymentHash: invoice.paymentHash,
topupPaymentRequest: invoice.paymentRequest,
topupPaid: false,
updatedAt: new Date(),
})
.where(eq(sessions.id, id));
res.json({
sessionId: id,
topup: {
paymentRequest: invoice.paymentRequest,
amountSats,
...(lnbitsService.stubMode ? { paymentHash: invoice.paymentHash } : {}),
},
});
} catch (err) {
res.status(500).json({ error: err instanceof Error ? err.message : "Topup failed" });
}
});
export default router;

View File

@@ -300,6 +300,146 @@ else
fi
fi
# ---------------------------------------------------------------------------
# Test 11 — Session: create session
# ---------------------------------------------------------------------------
sep "Test 11 — Session: create session (awaiting_payment)"
T11_RES=$(curl -s -w "\\n%{http_code}" -X POST "$BASE/api/sessions" \\
-H "Content-Type: application/json" \\
-d '{"amount_sats": 200}')
T11_BODY=$(echo "$T11_RES" | head -n-1)
T11_CODE=$(echo "$T11_RES" | tail -n1)
SESSION_ID=$(echo "$T11_BODY" | jq -r '.sessionId' 2>/dev/null || echo "")
T11_STATE=$(echo "$T11_BODY" | jq -r '.state' 2>/dev/null || echo "")
T11_AMT=$(echo "$T11_BODY" | jq -r '.invoice.amountSats' 2>/dev/null || echo "")
DEPOSIT_HASH=$(echo "$T11_BODY" | jq -r '.invoice.paymentHash' 2>/dev/null || echo "")
if [[ "$T11_CODE" == "201" && -n "$SESSION_ID" && "$T11_STATE" == "awaiting_payment" && "$T11_AMT" == "200" ]]; then
note PASS "HTTP 201, sessionId=$SESSION_ID, state=awaiting_payment, amount=200"
PASS=$((PASS+1))
else
note FAIL "code=$T11_CODE body=$T11_BODY"
FAIL=$((FAIL+1))
fi
# ---------------------------------------------------------------------------
# Test 12 — Session: poll before payment (stub hash present)
# ---------------------------------------------------------------------------
sep "Test 12 — Session: poll before payment"
T12_RES=$(curl -s -w "\\n%{http_code}" "$BASE/api/sessions/$SESSION_ID")
T12_BODY=$(echo "$T12_RES" | head -n-1)
T12_CODE=$(echo "$T12_RES" | tail -n1)
T12_STATE=$(echo "$T12_BODY" | jq -r '.state' 2>/dev/null || echo "")
if [[ -z "$DEPOSIT_HASH" || "$DEPOSIT_HASH" == "null" ]]; then
DEPOSIT_HASH=$(echo "$T12_BODY" | jq -r '.invoice.paymentHash' 2>/dev/null || echo "")
fi
if [[ "$T12_CODE" == "200" && "$T12_STATE" == "awaiting_payment" ]]; then
note PASS "state=awaiting_payment before payment"
PASS=$((PASS+1))
else
note FAIL "code=$T12_CODE body=$T12_BODY"
FAIL=$((FAIL+1))
fi
# ---------------------------------------------------------------------------
# Test 13 — Session: pay deposit + activate
# ---------------------------------------------------------------------------
sep "Test 13 — Session: pay deposit (stub) + auto-advance to active"
if [[ -n "$DEPOSIT_HASH" && "$DEPOSIT_HASH" != "null" ]]; then
curl -s -X POST "$BASE/api/dev/stub/pay/$DEPOSIT_HASH" >/dev/null
sleep 1
T13_RES=$(curl -s -w "\\n%{http_code}" "$BASE/api/sessions/$SESSION_ID")
T13_BODY=$(echo "$T13_RES" | head -n-1)
T13_CODE=$(echo "$T13_RES" | tail -n1)
T13_STATE=$(echo "$T13_BODY" | jq -r '.state' 2>/dev/null || echo "")
T13_BAL=$(echo "$T13_BODY" | jq -r '.balanceSats' 2>/dev/null || echo "")
SESSION_MACAROON=$(echo "$T13_BODY" | jq -r '.macaroon' 2>/dev/null || echo "")
if [[ "$T13_CODE" == "200" && "$T13_STATE" == "active" && "$T13_BAL" == "200" && -n "$SESSION_MACAROON" && "$SESSION_MACAROON" != "null" ]]; then
note PASS "state=active, balanceSats=200, macaroon present"
PASS=$((PASS+1))
else
note FAIL "code=$T13_CODE state=$T13_STATE bal=$T13_BAL body=$T13_BODY"
FAIL=$((FAIL+1))
fi
else
note SKIP "No deposit hash (stub mode not active)"
SKIP=$((SKIP+1))
fi
# ---------------------------------------------------------------------------
# Test 14 — Session: submit request (accepted path)
# ---------------------------------------------------------------------------
sep "Test 14 — Session: submit request (accepted)"
if [[ -n "$SESSION_MACAROON" && "$SESSION_MACAROON" != "null" ]]; then
START_T14=$(date +%s)
T14_RES=$(curl -s -w "\\n%{http_code}" -X POST "$BASE/api/sessions/$SESSION_ID/request" \\
-H "Content-Type: application/json" \\
-H "Authorization: Bearer $SESSION_MACAROON" \\
-d '{"request":"What is Bitcoin in one sentence?"}')
T14_BODY=$(echo "$T14_RES" | head -n-1)
T14_CODE=$(echo "$T14_RES" | tail -n1)
T14_STATE=$(echo "$T14_BODY" | jq -r '.state' 2>/dev/null || echo "")
T14_DEBITED=$(echo "$T14_BODY" | jq -r '.debitedSats' 2>/dev/null || echo "")
T14_BAL=$(echo "$T14_BODY" | jq -r '.balanceRemaining' 2>/dev/null || echo "")
END_T14=$(date +%s)
ELAPSED_T14=$((END_T14 - START_T14))
if [[ "$T14_CODE" == "200" && ("$T14_STATE" == "complete" || "$T14_STATE" == "rejected") && -n "$T14_DEBITED" && "$T14_DEBITED" != "null" && -n "$T14_BAL" ]]; then
note PASS "state=$T14_STATE in ${ELAPSED_T14}s, debitedSats=$T14_DEBITED, balanceRemaining=$T14_BAL"
PASS=$((PASS+1))
else
note FAIL "code=$T14_CODE body=$T14_BODY"
FAIL=$((FAIL+1))
fi
else
note SKIP "No macaroon — skipping"
SKIP=$((SKIP+1))
fi
# ---------------------------------------------------------------------------
# Test 15 — Session: missing/invalid macaroon → 401
# ---------------------------------------------------------------------------
sep "Test 15 — Session: reject request without valid macaroon"
if [[ -n "$SESSION_ID" ]]; then
T15_RES=$(curl -s -w "\\n%{http_code}" -X POST "$BASE/api/sessions/$SESSION_ID/request" \\
-H "Content-Type: application/json" \\
-d '{"request":"What is Bitcoin?"}')
T15_CODE=$(echo "$T15_RES" | tail -n1)
if [[ "$T15_CODE" == "401" ]]; then
note PASS "HTTP 401 without macaroon"
PASS=$((PASS+1))
else
note FAIL "Expected 401, got code=$T15_CODE"
FAIL=$((FAIL+1))
fi
else
note SKIP "No session ID — skipping"
SKIP=$((SKIP+1))
fi
# ---------------------------------------------------------------------------
# Test 16 — Session: topup invoice creation
# ---------------------------------------------------------------------------
sep "Test 16 — Session: topup invoice creation"
if [[ -n "$SESSION_MACAROON" && "$SESSION_MACAROON" != "null" ]]; then
T16_RES=$(curl -s -w "\\n%{http_code}" -X POST "$BASE/api/sessions/$SESSION_ID/topup" \\
-H "Content-Type: application/json" \\
-H "Authorization: Bearer $SESSION_MACAROON" \\
-d '{"amount_sats": 500}')
T16_BODY=$(echo "$T16_RES" | head -n-1)
T16_CODE=$(echo "$T16_RES" | tail -n1)
T16_PR=$(echo "$T16_BODY" | jq -r '.topup.paymentRequest' 2>/dev/null || echo "")
T16_AMT=$(echo "$T16_BODY" | jq -r '.topup.amountSats' 2>/dev/null || echo "")
if [[ "$T16_CODE" == "200" && -n "$T16_PR" && "$T16_PR" != "null" && "$T16_AMT" == "500" ]]; then
note PASS "Topup invoice created, amountSats=500"
PASS=$((PASS+1))
else
note FAIL "code=$T16_CODE body=$T16_BODY"
FAIL=$((FAIL+1))
fi
else
note SKIP "No macaroon — skipping"
SKIP=$((SKIP+1))
fi
# ---------------------------------------------------------------------------
# Summary
# ---------------------------------------------------------------------------