Files
timmy-tower/artifacts/api-server/src/routes/sessions.ts

612 lines
23 KiB
TypeScript

import { Router, type Request, type Response } from "express";
import { randomBytes, randomUUID, createHash } from "crypto";
import { db, sessions, sessionRequests, sessionMessages, getSessionHistory, type Session } from "@workspace/db";
import { eq, and } from "drizzle-orm";
import { lnbitsService } from "../lib/lnbits.js";
import { sessionsLimiter } from "../lib/rate-limiter.js";
import { eventBus } from "../lib/event-bus.js";
import { agentService } from "../lib/agent.js";
import { pricingService } from "../lib/pricing.js";
import { getBtcPriceUsd, usdToSats } from "../lib/btc-oracle.js";
import { trustService } from "../lib/trust.js";
import { freeTierService } from "../lib/free-tier.js";
const router = Router();
// ── Env-var config ─────────────────────────────────────────────────────────────
function envInt(name: string, fallback: number): number {
const raw = parseInt(process.env[name] ?? "", 10);
return Number.isFinite(raw) && raw > 0 ? raw : fallback;
}
const MIN_DEPOSIT_SATS = envInt("SESSION_MIN_DEPOSIT_SATS", 100);
const MAX_DEPOSIT_SATS = envInt("SESSION_MAX_DEPOSIT_SATS", 10_000);
const MIN_BALANCE_SATS = envInt("SESSION_MIN_BALANCE_SATS", 50);
const EXPIRY_HOURS = envInt("SESSION_EXPIRY_HOURS", 24);
const EXPIRY_MS = EXPIRY_HOURS * 60 * 60 * 1000;
// ── Helpers ────────────────────────────────────────────────────────────────────
async function getSessionById(id: string): Promise<Session | null> {
const rows = await db.select().from(sessions).where(eq(sessions.id, id)).limit(1);
return rows[0] ?? null;
}
function checkExpired(session: Session): boolean {
return session.expiresAt !== null && new Date() > session.expiresAt;
}
function extractMacaroon(req: Request): string | null {
const auth = req.headers.authorization ?? "";
if (auth.startsWith("Bearer ")) return auth.slice(7).trim();
return null;
}
function sessionView(session: Session, includeInvoice = false, trustTier?: string) {
const base = {
sessionId: session.id,
state: session.state,
balanceSats: session.balanceSats,
expiresAt: session.expiresAt?.toISOString() ?? null,
minimumBalanceSats: MIN_BALANCE_SATS,
...(session.nostrPubkey ? { nostrPubkey: session.nostrPubkey } : {}),
trust_tier: trustTier ?? "anonymous",
...(session.macaroon && (session.state === "active" || session.state === "paused")
? { macaroon: session.macaroon }
: {}),
};
if (includeInvoice && session.state === "awaiting_payment") {
return {
...base,
invoice: {
paymentRequest: session.depositPaymentRequest,
amountSats: session.depositAmountSats,
paymentHash: session.depositPaymentHash,
},
};
}
if (session.topupPaymentHash && !session.topupPaid) {
return {
...base,
pendingTopup: {
paymentRequest: session.topupPaymentRequest,
amountSats: session.topupAmountSats,
paymentHash: session.topupPaymentHash,
},
};
}
return base;
}
// ── Auto-advance: awaiting_payment → active ────────────────────────────────────
async function advanceSessionPayment(session: Session): Promise<Session> {
if (session.state !== "awaiting_payment" || session.depositPaid) return session;
const paid = await lnbitsService.checkInvoicePaid(session.depositPaymentHash);
if (!paid) return session;
const macaroon = randomBytes(32).toString("hex");
const expiresAt = new Date(Date.now() + EXPIRY_MS);
const updated = await db
.update(sessions)
.set({
state: "active",
depositPaid: true,
balanceSats: session.depositAmountSats,
macaroon,
expiresAt,
updatedAt: new Date(),
})
.where(and(eq(sessions.id, session.id), eq(sessions.state, "awaiting_payment")))
.returning();
return updated[0] ?? session;
}
// ── Auto-advance: pending topup paid → credit balance ─────────────────────────
async function advanceTopup(session: Session): Promise<Session> {
if (!session.topupPaymentHash || session.topupPaid) return session;
const paid = await lnbitsService.checkInvoicePaid(session.topupPaymentHash);
if (!paid) return session;
const newBalance = session.balanceSats + (session.topupAmountSats ?? 0);
const newState =
session.state === "paused" && newBalance >= MIN_BALANCE_SATS ? "active" : session.state;
const expiresAt = new Date(Date.now() + EXPIRY_MS);
const updated = await db
.update(sessions)
.set({
state: newState,
topupPaid: true,
balanceSats: newBalance,
expiresAt,
updatedAt: new Date(),
})
.where(eq(sessions.id, session.id))
.returning();
return updated[0] ?? session;
}
// ── Resolve Nostr pubkey from token header or body ────────────────────────────
/** Resolves a Nostr token from the request.
* Returns `{ pubkey, rejected }` where:
* rejected=false, pubkey=null → no token supplied (anonymous)
* rejected=true, pubkey=null → token supplied but invalid/expired → caller should 401
* rejected=false, pubkey=str → valid token
*/
function resolveNostrPubkey(req: Request): { pubkey: string | null; rejected: boolean } {
const header = req.headers["x-nostr-token"];
const bodyToken = req.body?.nostr_token;
const raw = typeof header === "string" ? header : (typeof bodyToken === "string" ? bodyToken : null);
if (!raw) return { pubkey: null, rejected: false };
const parsed = trustService.verifyToken(raw.trim());
if (!parsed) return { pubkey: null, rejected: true };
return { pubkey: parsed.pubkey, rejected: false };
}
// ── POST /sessions ─────────────────────────────────────────────────────────────
router.post("/sessions", sessionsLimiter, async (req: Request, res: Response) => {
const rawAmount = req.body?.amount_sats;
const amountSats = parseInt(String(rawAmount ?? ""), 10);
if (!Number.isFinite(amountSats) || amountSats < MIN_DEPOSIT_SATS || amountSats > MAX_DEPOSIT_SATS) {
res.status(400).json({
error: `amount_sats must be an integer between ${MIN_DEPOSIT_SATS} and ${MAX_DEPOSIT_SATS}`,
});
return;
}
// Optionally bind a Nostr identity — ensure row exists before FK insert
const tokenResult = resolveNostrPubkey(req);
if (tokenResult.rejected) {
res.status(401).json({ error: "Invalid or expired nostr_token" });
return;
}
const nostrPubkey = tokenResult.pubkey;
if (nostrPubkey) await trustService.getOrCreate(nostrPubkey);
try {
const sessionId = randomUUID();
const invoice = await lnbitsService.createInvoice(amountSats, `Session deposit ${sessionId}`);
await db.insert(sessions).values({
id: sessionId,
state: "awaiting_payment",
balanceSats: 0,
depositAmountSats: amountSats,
depositPaymentHash: invoice.paymentHash,
depositPaymentRequest: invoice.paymentRequest,
depositPaid: false,
expiresAt: new Date(Date.now() + EXPIRY_MS),
...(nostrPubkey ? { nostrPubkey } : {}),
});
const trust = nostrPubkey
? await trustService.getIdentityWithDecay(nostrPubkey)
: null;
res.status(201).json({
sessionId,
state: "awaiting_payment",
...(nostrPubkey ? { nostrPubkey } : {}),
trust_tier: trust ? trust.tier : "anonymous",
invoice: {
paymentRequest: invoice.paymentRequest,
amountSats,
paymentHash: invoice.paymentHash,
},
});
} catch (err) {
res.status(500).json({ error: err instanceof Error ? err.message : "Failed to create session" });
}
});
// ── GET /sessions/:id ─────────────────────────────────────────────────────────
router.get("/sessions/:id", async (req: Request, res: Response) => {
const id = req.params.id as string;
try {
let session = await getSessionById(id);
if (!session) { res.status(404).json({ error: "Session not found" }); return; }
// Mark expired sessions
if (checkExpired(session) && session.state !== "expired") {
await db
.update(sessions)
.set({ state: "expired", updatedAt: new Date() })
.where(eq(sessions.id, id));
await db.delete(sessionMessages).where(eq(sessionMessages.sessionId, id));
session = (await getSessionById(id))!;
}
// Auto-advance deposit payment
if (session.state === "awaiting_payment") {
session = await advanceSessionPayment(session);
}
// Auto-advance topup payment
if (session.topupPaymentHash && !session.topupPaid) {
session = await advanceTopup(session);
}
const trustTier = session.nostrPubkey
? await trustService.getTier(session.nostrPubkey)
: undefined;
res.json(sessionView(session, true, trustTier));
} catch (err) {
res.status(500).json({ error: err instanceof Error ? err.message : "Failed to fetch session" });
}
});
// ── POST /sessions/:id/request ────────────────────────────────────────────────
router.post("/sessions/:id/request", async (req: Request, res: Response) => {
const id = req.params.id as string;
const macaroon = extractMacaroon(req);
const requestText = typeof req.body?.request === "string" ? req.body.request.trim() : "";
if (!requestText) {
res.status(400).json({ error: "Body must include 'request' string" });
return;
}
try {
const session = await getSessionById(id);
if (!session) { res.status(404).json({ error: "Session not found" }); return; }
// Auth
if (!macaroon || macaroon !== session.macaroon) {
res.status(401).json({ error: "Invalid or missing macaroon. Include 'Authorization: Bearer <macaroon>' header." });
return;
}
// State checks
if (checkExpired(session) || session.state === "expired") {
res.status(410).json({ error: "Session has expired" });
return;
}
if (session.state === "paused") {
res.status(402).json({
error: "Insufficient balance",
balance: session.balanceSats,
minimumRequired: MIN_BALANCE_SATS,
});
return;
}
if (session.state !== "active") {
res.status(409).json({ error: `Session is in state '${session.state}'` });
return;
}
if (session.balanceSats < MIN_BALANCE_SATS) {
// Mark as paused before returning
await db
.update(sessions)
.set({ state: "paused", updatedAt: new Date() })
.where(eq(sessions.id, id));
res.status(402).json({
error: "Insufficient balance",
balance: session.balanceSats,
minimumRequired: MIN_BALANCE_SATS,
});
return;
}
// ── Run the request ───────────────────────────────────────────────────────
const requestId = randomUUID();
const btcPriceUsd = await getBtcPriceUsd();
// Load conversation history for context injection
const history = await getSessionHistory(id, 8, 4000);
// Defensive check: log a warning if history still exceeds budget
const currentTokenCount = history.reduce((sum, msg) => sum + Math.ceil(msg.content.length / 4), 0);
if (currentTokenCount > 4000) {
console.warn(`Session ${id}: History exceeds 4000 token budget after retrieval. Actual: ${currentTokenCount}`);
}
// Eval phase
const evalResult = await agentService.evaluateRequest(requestText);
const evalCostUsd = pricingService.calculateActualCostUsd(
evalResult.inputTokens,
evalResult.outputTokens,
agentService.evalModel,
);
let workInputTokens = 0;
let workOutputTokens = 0;
let workCostUsd = 0;
let result: string | null = null;
let finalState: "complete" | "rejected" | "failed" = "rejected";
let reason: string | null = null;
let errorMessage: string | null = null;
// ── Pre-gate: free-tier decision on ESTIMATED cost before executing work ──
// Estimate total request cost (work portion) pre-execution to determine subsidy.
// Final accounting uses eval+work actual cost (fullDebitSats). The pool reservation
// is sized to the work estimate; if pool covers fullDebitSats, debitedSats = 0.
// If pool covers only part of fullDebitSats, the remainder debits the session.
let ftDecision: import("../lib/free-tier.js").FreeTierDecision | null = null;
if (evalResult.accepted && session.nostrPubkey) {
// estimateRequestCost includes infra + margin. Convert to sats for decide().
const { estimatedCostUsd } = pricingService.estimateRequestCost(requestText, agentService.workModel);
const estimatedSats = usdToSats(estimatedCostUsd, btcPriceUsd);
ftDecision = await freeTierService.decide(session.nostrPubkey, estimatedSats);
}
if (evalResult.accepted) {
try {
const workResult = await agentService.executeWork(requestText, history);
workInputTokens = workResult.inputTokens;
workOutputTokens = workResult.outputTokens;
workCostUsd = pricingService.calculateActualCostUsd(
workResult.inputTokens,
workResult.outputTokens,
agentService.workModel,
);
result = workResult.result;
finalState = "complete";
} catch (err) {
errorMessage = err instanceof Error ? err.message : "Execution error";
finalState = "failed";
}
} else {
reason = evalResult.reason;
}
// ── Honest accounting ────────────────────────────────────────────────────
const totalTokenCostUsd = evalCostUsd + workCostUsd;
const chargeUsd = pricingService.calculateActualChargeUsd(totalTokenCostUsd);
const fullDebitSats = usdToSats(chargeUsd, btcPriceUsd);
// ── Reconcile free-tier decision against actual cost ──────────────────────
// decide() atomically debited pool for serve="free"; was advisory for serve="partial".
// Cap absorbedSats at the actual cost so we never over-absorb from the pool.
let debitedSats = fullDebitSats;
let freeTierServed = false;
let absorbedSats = 0;
let reservedAbsorbed = 0; // amount pool was debited before work ran
if (ftDecision && ftDecision.serve !== "gate") {
if (finalState === "complete") {
if (ftDecision.serve === "free") {
// Pool was debited at decide() time. Actual cost may be less than estimated.
reservedAbsorbed = ftDecision.absorbSats;
absorbedSats = Math.min(reservedAbsorbed, fullDebitSats);
debitedSats = Math.max(0, fullDebitSats - absorbedSats);
freeTierServed = true;
} else {
// Partial: decide() was advisory (no pool debit). Atomically debit NOW, after
// work completed, using actual cost capped by the advisory absorbSats limit.
const wantedAbsorb = Math.min(ftDecision.absorbSats, fullDebitSats);
const actualReserved = await freeTierService.reservePartialGrant(
wantedAbsorb,
session.nostrPubkey!,
);
reservedAbsorbed = actualReserved;
absorbedSats = actualReserved;
debitedSats = Math.max(0, fullDebitSats - absorbedSats);
freeTierServed = absorbedSats > 0;
}
if (freeTierServed) {
const reqHash = createHash("sha256").update(requestText).digest("hex");
await freeTierService.recordGrant(
session.nostrPubkey!,
reqHash,
absorbedSats,
reservedAbsorbed,
);
}
} else if (ftDecision.serve === "free") {
// Work failed or was rejected — release the pool reservation from decide()
void freeTierService.releaseReservation(
ftDecision.absorbSats,
`session work ${finalState}`,
);
}
// Partial + failed: no pool debit was made (advisory only), nothing to release.
}
// Credit pool from paid portion (even if partial free tier)
if (finalState === "complete" && debitedSats > 0) {
void freeTierService.credit(debitedSats);
}
const newBalance = session.balanceSats - debitedSats;
const newSessionState = newBalance < MIN_BALANCE_SATS ? "paused" : "active";
const expiresAt = new Date(Date.now() + EXPIRY_MS);
// Persist session request + update session balance atomically
await db.transaction(async (tx) => {
await tx.insert(sessionRequests).values({
id: requestId,
sessionId: id,
request: requestText,
state: finalState,
result,
reason,
errorMessage,
evalInputTokens: evalResult.inputTokens,
evalOutputTokens: evalResult.outputTokens,
workInputTokens: workInputTokens || null,
workOutputTokens: workOutputTokens || null,
debitedSats,
balanceAfterSats: newBalance,
btcPriceUsd,
});
await tx
.update(sessions)
.set({
balanceSats: newBalance,
state: newSessionState,
expiresAt,
updatedAt: new Date(),
})
.where(eq(sessions.id, id));
// Persist conversation history only for completed requests
if (finalState === "complete") {
await tx.insert(sessionMessages).values([
{ sessionId: id, role: "user" as const, content: requestText, tokenCount: Math.ceil(requestText.length / 4) },
{ sessionId: id, role: "assistant" as const, content: result ?? "", tokenCount: Math.ceil((result ?? "").length / 4) },
]);
}
});
// Emit real-time cost update for the UI cost ticker (#68)
if (finalState === "complete" && debitedSats > 0) {
eventBus.publish({ type: "cost:update", jobId: requestId, sats: debitedSats, phase: "session", isFinal: true });
}
// ── Trust scoring ────────────────────────────────────────────────────────
if (session.nostrPubkey) {
if (finalState === "complete") {
void trustService.recordSuccess(session.nostrPubkey, debitedSats);
} else if (finalState === "rejected" || finalState === "failed") {
void trustService.recordFailure(session.nostrPubkey, reason ?? errorMessage ?? finalState);
}
}
res.json({
requestId,
state: finalState,
...(result ? { result } : {}),
...(reason ? { reason } : {}),
...(errorMessage ? { errorMessage } : {}),
debitedSats,
balanceRemaining: newBalance,
...(freeTierServed ? { free_tier: true, absorbed_sats: absorbedSats } : {}),
cost: {
evalSats: usdToSats(
pricingService.calculateActualChargeUsd(evalCostUsd),
btcPriceUsd,
),
workSats: workCostUsd > 0
? usdToSats(pricingService.calculateActualChargeUsd(workCostUsd), btcPriceUsd)
: 0,
totalSats: fullDebitSats,
chargedSats: debitedSats,
absorbedSats,
btcPriceUsd,
},
});
} catch (err) {
res.status(500).json({ error: err instanceof Error ? err.message : "Request failed" });
}
});
// ── POST /sessions/:id/topup ──────────────────────────────────────────────────
router.post("/sessions/:id/topup", async (req: Request, res: Response) => {
const id = req.params.id as string;
const macaroon = extractMacaroon(req);
const rawAmount = req.body?.amount_sats;
const amountSats = parseInt(String(rawAmount ?? ""), 10);
if (!Number.isFinite(amountSats) || amountSats < MIN_DEPOSIT_SATS || amountSats > MAX_DEPOSIT_SATS) {
res.status(400).json({
error: `amount_sats must be an integer between ${MIN_DEPOSIT_SATS} and ${MAX_DEPOSIT_SATS}`,
});
return;
}
try {
const session = await getSessionById(id);
if (!session) { res.status(404).json({ error: "Session not found" }); return; }
if (!macaroon || macaroon !== session.macaroon) {
res.status(401).json({ error: "Invalid or missing macaroon" });
return;
}
if (session.state !== "active" && session.state !== "paused") {
res.status(409).json({ error: `Cannot top up a session in state '${session.state}'` });
return;
}
if (session.topupPaymentHash && !session.topupPaid) {
res.status(409).json({
error: "A topup invoice is already pending. Pay it first or poll GET /sessions/:id.",
pendingTopup: {
paymentRequest: session.topupPaymentRequest,
amountSats: session.topupAmountSats,
paymentHash: session.topupPaymentHash,
},
});
return;
}
const invoice = await lnbitsService.createInvoice(amountSats, `Session topup ${id}`);
await db
.update(sessions)
.set({
topupAmountSats: amountSats,
topupPaymentHash: invoice.paymentHash,
topupPaymentRequest: invoice.paymentRequest,
topupPaid: false,
updatedAt: new Date(),
})
.where(eq(sessions.id, id));
res.json({
sessionId: id,
topup: {
paymentRequest: invoice.paymentRequest,
amountSats,
paymentHash: invoice.paymentHash,
},
});
} catch (err) {
res.status(500).json({ error: err instanceof Error ? err.message : "Topup failed" });
}
});
// ── DELETE /sessions/:id/history ─────────────────────────────────────────────
router.delete("/sessions/:id/history", async (req: Request, res: Response) => {
const id = req.params.id as string;
const macaroon = extractMacaroon(req);
try {
const session = await getSessionById(id);
if (!session) { res.status(404).json({ error: "Session not found" }); return; }
if (!macaroon || macaroon !== session.macaroon) {
res.status(401).json({ error: "Invalid or missing macaroon. Include 'Authorization: Bearer <macaroon>' header." });
return;
}
if (checkExpired(session) || session.state === "expired") {
res.status(410).json({ error: "Session has expired" });
return;
}
await db.delete(sessionMessages).where(eq(sessionMessages.sessionId, id));
res.json({ cleared: true });
} catch (err) {
res.status(500).json({ error: err instanceof Error ? err.message : "Failed to clear history" });
}
});
export default router;