diff --git a/artifacts/api-server/src/lib/free-tier.ts b/artifacts/api-server/src/lib/free-tier.ts index b376441..754cd09 100644 --- a/artifacts/api-server/src/lib/free-tier.ts +++ b/artifacts/api-server/src/lib/free-tier.ts @@ -1,5 +1,5 @@ import { randomUUID } from "crypto"; -import { db, timmyConfig, freeTierGrants, nostrIdentities, type NostrIdentity, type TrustTier } from "@workspace/db"; +import { db, timmyConfig, freeTierGrants, nostrIdentities, type TrustTier } from "@workspace/db"; import { eq, sql } from "drizzle-orm"; import { makeLogger } from "./logger.js"; import { trustService } from "./trust.js"; @@ -31,6 +31,7 @@ const DAILY_BUDGET_ELITE = envInt("FREE_TIER_BUDGET_ELITE", 1_000); const POOL_INITIAL_SATS = envInt("FREE_TIER_POOL_INITIAL_SATS", 10_000); const POOL_CREDIT_PCT = envFloat("FREE_TIER_POOL_CREDIT_PCT", 10); const POOL_KEY = "generosity_pool_sats"; +const DAY_MS = 24 * 60 * 60 * 1000; // ── Types ───────────────────────────────────────────────────────────────────── @@ -74,21 +75,24 @@ export class FreeTierService { return POOL_INITIAL_SATS; } - // How many sats has this identity absorbed today (resets after 24 h)? - getTodayAbsorbed(identity: NostrIdentity): number { - const hoursSinceReset = - (Date.now() - identity.absorbedResetAt.getTime()) / (1000 * 60 * 60); - if (hoursSinceReset >= 24) return 0; - return identity.satsAbsorbedToday; - } - /** * Decide whether to serve a request for free, partially free, or at full cost. * + * For serve="free": the pool is atomically debited for absorbSats immediately. + * This is correct because work starts right away — no window where the reservation + * can be abandoned. If work fails, call releaseReservation() to refund the pool. + * + * For serve="partial": the decision is advisory — pool is NOT debited yet. + * The user still needs to pay their portion before work starts. Pool debit happens + * later via reservePartialGrant() once payment is confirmed, so the pool is never + * depleted by a user who never pays. + * + * For serve="gate": no action taken (no pool debit). + * * Returns: - * serve="free" → absorbSats=estimatedSats, chargeSats=0 - * serve="partial" → 0 < absorbSats < estimatedSats, chargeSats = remainder - * serve="gate" → absorbSats=0, chargeSats=estimatedSats (pay full price) + * serve="free" → absorbSats=estimatedSats (already debited), chargeSats=0 + * serve="partial" → 0 < absorbSats < estimatedSats (advisory), chargeSats = remainder + * serve="gate" → absorbSats=0, chargeSats=estimatedSats (full charge) */ async decide(pubkey: string | null, estimatedSats: number): Promise { const gate: FreeTierDecision = { serve: "gate", absorbSats: 0, chargeSats: estimatedSats }; @@ -101,7 +105,10 @@ export class FreeTierService { const dailyBudget = this.dailyBudgetForTier(identity.tier); if (dailyBudget === 0) return gate; - const todayAbsorbed = this.getTodayAbsorbed(identity); + const todayAbsorbed = + (Date.now() - identity.absorbedResetAt.getTime()) >= DAY_MS + ? 0 + : identity.satsAbsorbedToday; const dailyRemaining = Math.max(0, dailyBudget - todayAbsorbed); if (dailyRemaining === 0) { logger.info("free-tier: daily budget exhausted", { @@ -115,33 +122,125 @@ export class FreeTierService { const poolBalance = await this.getPoolBalance(); if (poolBalance <= 0) { - logger.warn("free-tier: pool empty", { poolBalance }); + logger.warn("free-tier: pool empty", { pubkey: pubkey.slice(0, 8) }); return gate; } const canAbsorb = Math.min(dailyRemaining, poolBalance, estimatedSats); + if (canAbsorb <= 0) return gate; if (canAbsorb >= estimatedSats) { - logger.info("free-tier: serving free", { + // Fully free — atomically debit the pool now, work starts immediately after this. + const debited = await this._atomicPoolDebit(canAbsorb); + if (debited <= 0) { + // Pool drained between balance read and debit (concurrent request) + logger.warn("free-tier: pool drained between check and debit", { pubkey: pubkey.slice(0, 8) }); + return gate; + } + logger.info("free-tier: reserved free (pool debited)", { pubkey: pubkey.slice(0, 8), tier: identity.tier, - estimatedSats, - poolBalance, + absorbSats: debited, }); - return { serve: "free", absorbSats: estimatedSats, chargeSats: 0 }; + return { serve: "free", absorbSats: debited, chargeSats: 0 }; } - if (canAbsorb > 0) { - logger.info("free-tier: partial subsidy", { - pubkey: pubkey.slice(0, 8), - tier: identity.tier, - absorbSats: canAbsorb, - chargeSats: estimatedSats - canAbsorb, - }); - return { serve: "partial", absorbSats: canAbsorb, chargeSats: estimatedSats - canAbsorb }; - } + // Partial: advisory only — pool debit deferred until payment confirmed. + logger.info("free-tier: partial subsidy (advisory, pool debit deferred)", { + pubkey: pubkey.slice(0, 8), + tier: identity.tier, + absorbSats: canAbsorb, + chargeSats: estimatedSats - canAbsorb, + }); + return { serve: "partial", absorbSats: canAbsorb, chargeSats: estimatedSats - canAbsorb }; + } - return gate; + /** + * Atomically debit the generosity pool inside a transaction with row-level lock. + * Returns the actual amount debited (may be less than requested if pool is low). + * Returns 0 if pool has insufficient funds. + */ + private async _atomicPoolDebit(requestedSats: number): Promise { + if (requestedSats <= 0) return 0; + + let actualDebited = 0; + + await db.transaction(async (tx) => { + // Ensure pool row exists + await tx + .insert(timmyConfig) + .values({ key: POOL_KEY, value: String(POOL_INITIAL_SATS) }) + .onConflictDoNothing(); + + // Lock pool row for the duration of this transaction + const locked = await tx.execute( + sql`SELECT value::int AS balance FROM timmy_config WHERE key = ${POOL_KEY} FOR UPDATE`, + ); + const poolBalance = (locked.rows[0] as { balance: number } | undefined)?.balance ?? 0; + if (poolBalance <= 0) return; // pool empty, actualDebited stays 0 + + actualDebited = Math.min(requestedSats, poolBalance); + const newPoolBalance = poolBalance - actualDebited; + const now = new Date(); + + await tx + .update(timmyConfig) + .set({ value: String(newPoolBalance), updatedAt: now }) + .where(eq(timmyConfig.key, POOL_KEY)); + }); + + return actualDebited; + } + + /** + * Atomically reserve and debit the pool for a partial grant at payment confirmation time. + * Called in advanceJob() when work payment is confirmed, BEFORE starting work. + * + * Returns the actual sats debited (may be less than requested if pool drained since decide()). + * If returns 0, pool is now empty — caller should recalculate charge or gate the request. + */ + async reservePartialGrant(requestedAbsorbSats: number, pubkey: string): Promise { + if (requestedAbsorbSats <= 0) return 0; + + // Re-check daily limits before debiting (advisory decide() could be stale) + const identity = await trustService.getIdentityWithDecay(pubkey); + if (!identity) return 0; + + const dailyBudget = this.dailyBudgetForTier(identity.tier); + const todayAbsorbed = + (Date.now() - identity.absorbedResetAt.getTime()) >= DAY_MS + ? 0 + : identity.satsAbsorbedToday; + const dailyRemaining = Math.max(0, dailyBudget - todayAbsorbed); + const canAbsorb = Math.min(requestedAbsorbSats, dailyRemaining); + if (canAbsorb <= 0) return 0; + + const debited = await this._atomicPoolDebit(canAbsorb); + logger.info("free-tier: partial grant reserved at payment time", { + pubkey: pubkey.slice(0, 8), + requestedAbsorbSats, + canAbsorb, + debited, + }); + return debited; + } + + /** + * Release a pool reservation when a request is rejected, fails, or is not executed. + * Returns `absorbSats` to the generosity pool. + * Only call if decide() debited the pool (serve="free" path). + */ + async releaseReservation(absorbSats: number, reason: string): Promise { + if (absorbSats <= 0) return; + const now = new Date(); + await db + .update(timmyConfig) + .set({ + value: sql`(value::int + ${absorbSats})::text`, + updatedAt: now, + }) + .where(eq(timmyConfig.key, POOL_KEY)); + logger.info("free-tier: reservation released", { absorbSats, reason }); } /** @@ -154,115 +253,98 @@ export class FreeTierService { const creditSats = Math.floor(paidSats * POOL_CREDIT_PCT / 100); if (creditSats <= 0) return; - const current = await this.getPoolBalance(); - const next = current + creditSats; const now = new Date(); - await db .insert(timmyConfig) - .values({ key: POOL_KEY, value: String(next), updatedAt: now }) + .values({ key: POOL_KEY, value: String(creditSats), updatedAt: now }) .onConflictDoUpdate({ target: timmyConfig.key, - set: { value: String(next), updatedAt: now }, + set: { + value: sql`(timmy_config.value::int + ${creditSats})::text`, + updatedAt: now, + }, }); - logger.info("generosity pool credited", { paidSats, creditSats, poolBalance: next }); + logger.info("generosity pool credited", { paidSats, creditSats }); } /** - * Record that Timmy absorbed `absorbSats` on behalf of `pubkey`. - * Atomically: deducts from pool, increments identity's daily absorption, writes audit row. + * Record the audit log entry and update daily absorption counters AFTER work completes. + * + * The pool was already debited (by decide() for free jobs, by reservePartialGrant() + * for partial jobs). This call only writes the audit row and updates daily absorption. + * + * `actualAbsorbed` = actual cost in sats (may be less than reservedAbsorbed due to + * actual token usage being lower than estimate). + * `reservedAbsorbed` = amount debited from pool at reservation time. + * Any over-reservation (reservedAbsorbed - actualAbsorbed) is released back to pool. */ async recordGrant( pubkey: string, requestHash: string, - absorbSats: number, + actualAbsorbed: number, + reservedAbsorbed: number, ): Promise { - if (absorbSats <= 0) return; + const toRecord = Math.max(0, actualAbsorbed); + const overReserved = Math.max(0, reservedAbsorbed - toRecord); + if (toRecord <= 0 && overReserved <= 0) return; const now = new Date(); - const DAY_MS = 24 * 60 * 60 * 1000; - - // All three mutations happen atomically inside a single transaction with row-locking: - // 1. Lock + read pool row (FOR UPDATE), compute actual deductible amount - // 2. Pool deduction capped to available balance - // 3. Daily absorption increment via SQL CASE (reset on new day), capped to actualAbsorbed - // 4. Audit log insert with accurate absorbed amount - // Using FOR UPDATE ensures concurrent grants cannot over-debit the pool. - let actualAbsorbed = 0; - let actualNewPoolBalance = 0; await db.transaction(async (tx) => { - // Ensure the pool row exists first (idempotent seed) - await tx - .insert(timmyConfig) - .values({ key: POOL_KEY, value: String(POOL_INITIAL_SATS) }) - .onConflictDoNothing(); - - // Lock the pool row for this transaction; read current balance. - const locked = await tx.execute( - sql`SELECT value::int AS balance FROM timmy_config WHERE key = ${POOL_KEY} FOR UPDATE`, - ); - const poolBalance = (locked.rows[0] as { balance: number } | undefined)?.balance ?? 0; - - // Cap actual absorption to what the pool can cover. - actualAbsorbed = Math.min(absorbSats, poolBalance); - if (actualAbsorbed <= 0) { - // Pool is empty; nothing to absorb — roll back silently. - return; + // Return over-reservation to pool (estimated > actual cost) + if (overReserved > 0) { + await tx + .update(timmyConfig) + .set({ + value: sql`(value::int + ${overReserved})::text`, + updatedAt: now, + }) + .where(eq(timmyConfig.key, POOL_KEY)); } - actualNewPoolBalance = poolBalance - actualAbsorbed; - await tx - .update(timmyConfig) - .set({ value: String(actualNewPoolBalance), updatedAt: now }) - .where(eq(timmyConfig.key, POOL_KEY)); + if (toRecord > 0) { + // Update identity daily absorption (CASE handles new-day reset atomically) + await tx + .update(nostrIdentities) + .set({ + satsAbsorbedToday: sql` + CASE + WHEN EXTRACT(EPOCH FROM (${now}::timestamptz - absorbed_reset_at)) * 1000 >= ${DAY_MS} + THEN ${toRecord} + ELSE sats_absorbed_today + ${toRecord} + END + `, + absorbedResetAt: sql` + CASE + WHEN EXTRACT(EPOCH FROM (${now}::timestamptz - absorbed_reset_at)) * 1000 >= ${DAY_MS} + THEN ${now}::timestamptz + ELSE absorbed_reset_at + END + `, + updatedAt: now, + }) + .where(eq(nostrIdentities.pubkey, pubkey)); - // Atomically increment daily absorption by the actual absorbed amount. - // If absorbed_reset_at is older than 24 h, reset the counter (new day). - await tx - .update(nostrIdentities) - .set({ - satsAbsorbedToday: sql` - CASE - WHEN EXTRACT(EPOCH FROM (${now}::timestamptz - absorbed_reset_at)) * 1000 >= ${DAY_MS} - THEN ${actualAbsorbed} - ELSE sats_absorbed_today + ${actualAbsorbed} - END - `, - absorbedResetAt: sql` - CASE - WHEN EXTRACT(EPOCH FROM (${now}::timestamptz - absorbed_reset_at)) * 1000 >= ${DAY_MS} - THEN ${now}::timestamptz - ELSE absorbed_reset_at - END - `, - updatedAt: now, - }) - .where(eq(nostrIdentities.pubkey, pubkey)); + const poolRows = await tx.select().from(timmyConfig).where(eq(timmyConfig.key, POOL_KEY)).limit(1); + const poolBalanceAfter = poolRows[0] ? parseInt(poolRows[0].value, 10) : 0; - await tx.insert(freeTierGrants).values({ - id: randomUUID(), - pubkey, - requestHash, - satsAbsorbed: actualAbsorbed, - poolBalanceAfter: actualNewPoolBalance, - }); + await tx.insert(freeTierGrants).values({ + id: randomUUID(), + pubkey, + requestHash, + satsAbsorbed: toRecord, + poolBalanceAfter, + }); + } }); - if (actualAbsorbed > 0) { - logger.info("free-tier grant recorded", { - pubkey: pubkey.slice(0, 8), - requestedSats: absorbSats, - actualAbsorbed, - newPoolBalance: actualNewPoolBalance, - }); - } else { - logger.warn("free-tier grant skipped: pool empty at grant time", { - pubkey: pubkey.slice(0, 8), - requestedSats: absorbSats, - }); - } + logger.info("free-tier grant recorded", { + pubkey: pubkey.slice(0, 8), + actualAbsorbed: toRecord, + reservedAbsorbed, + overReserved, + }); } /** diff --git a/artifacts/api-server/src/routes/jobs.ts b/artifacts/api-server/src/routes/jobs.ts index c20ee41..25e65df 100644 --- a/artifacts/api-server/src/routes/jobs.ts +++ b/artifacts/api-server/src/routes/jobs.ts @@ -234,17 +234,17 @@ async function runWorkInBackground( void freeTierService.credit(workAmountSats); } - // Record free-tier grant now that work is confirmed complete. - // For fully-free jobs: cap at actual cost (actualAmountSats was 0 for isFree, use actualCostUsd→sats). - // For partial jobs: partialAbsorbSats is the estimated absorbed portion (cap enforced in recordGrant). - // Both are deferred until post-execution so the audit log reflects real cost, not estimates. + // Record free-tier grant audit log now that work is confirmed complete. + // Pool was already debited atomically at decide() time. + // actualAbsorbed = actual cost in sats (capped at reserved amount). + // Over-reservation (estimated > actual) is returned to pool inside recordGrant(). if (partialAbsorbSats > 0 && nostrPubkey) { const lockedBtcPrice = btcPriceUsd ?? 100_000; - const absorbCap = isFree + const actualAbsorbed = isFree ? pricingService.calculateActualChargeSats(actualCostUsd, lockedBtcPrice) - : partialAbsorbSats; + : partialAbsorbSats; // partial: user paid the delta, Timmy covered the rest const reqHash = createHash("sha256").update(request).digest("hex"); - await freeTierService.recordGrant(nostrPubkey, reqHash, absorbCap); + await freeTierService.recordGrant(nostrPubkey, reqHash, actualAbsorbed, partialAbsorbSats); } // Trust scoring — fire and forget @@ -261,6 +261,11 @@ async function runWorkInBackground( .where(eq(jobs.id, jobId)); eventBus.publish({ type: "job:failed", jobId, reason: message }); + // Return any pool reservation if work failed + if (partialAbsorbSats > 0 && nostrPubkey) { + void freeTierService.releaseReservation(partialAbsorbSats, `work failed: ${message}`); + } + // Trust scoring — penalise on work failure const pubkeyForTrust = nostrPubkey ?? (await getJobById(jobId))?.nostrPubkey ?? null; if (pubkeyForTrust) { @@ -339,6 +344,24 @@ async function advanceJob(job: Job): Promise { // isFree is true ONLY for fully-free jobs (user paid nothing, workAmountSats=0). // Partial jobs (freeTier=true but workAmountSats>0) must use the paid accounting path. const isFreeExecution = (job.workAmountSats ?? 0) === 0; + + // For partial free-tier jobs: atomically reserve the pool subsidy NOW (at payment + // confirmation time), so the pool is only debited once the user has actually paid. + // decide() was advisory for partial jobs — no pool debit at decision time. + // This prevents pool drain from users who abandon the payment flow. + let partialGrantReserved = 0; + if (!isFreeExecution && job.freeTier && job.nostrPubkey && (job.absorbedSats ?? 0) > 0) { + partialGrantReserved = await freeTierService.reservePartialGrant( + job.absorbedSats ?? 0, + job.nostrPubkey, + ); + logger.info("partial grant reserved at payment", { + jobId: job.id, + requested: job.absorbedSats, + reserved: partialGrantReserved, + }); + } + setImmediate(() => { void runWorkInBackground( job.id, @@ -347,9 +370,9 @@ async function advanceJob(job: Job): Promise { job.btcPriceUsd, isFreeExecution, job.nostrPubkey ?? null, - // For partial jobs: pass absorbedSats so grant is recorded post-payment. - // For fully-free jobs: grant was already recorded at eval time, pass 0. - (!isFreeExecution && job.freeTier) ? (job.absorbedSats ?? 0) : 0, + // For fully-free jobs: pool was debited at decide() time, pass estimated sats. + // For partial jobs: pool was debited by reservePartialGrant(), pass actual reserved. + isFreeExecution ? (job.absorbedSats ?? 0) : partialGrantReserved, ); }); diff --git a/artifacts/api-server/src/routes/sessions.ts b/artifacts/api-server/src/routes/sessions.ts index b191179..ddbc57e 100644 --- a/artifacts/api-server/src/routes/sessions.ts +++ b/artifacts/api-server/src/routes/sessions.ts @@ -366,17 +366,52 @@ router.post("/sessions/:id/request", async (req: Request, res: Response) => { const fullDebitSats = usdToSats(chargeUsd, btcPriceUsd); // ── Reconcile free-tier decision against actual cost ────────────────────── + // decide() atomically debited pool for serve="free"; was advisory for serve="partial". // Cap absorbedSats at the actual cost so we never over-absorb from the pool. let debitedSats = fullDebitSats; let freeTierServed = false; let absorbedSats = 0; + let reservedAbsorbed = 0; // amount pool was debited before work ran - if (finalState === "complete" && ftDecision && ftDecision.serve !== "gate") { - absorbedSats = Math.min(ftDecision.absorbSats, fullDebitSats); - debitedSats = Math.max(0, fullDebitSats - absorbedSats); - freeTierServed = true; - const reqHash = createHash("sha256").update(requestText).digest("hex"); - await freeTierService.recordGrant(session.nostrPubkey!, reqHash, absorbedSats); + if (ftDecision && ftDecision.serve !== "gate") { + if (finalState === "complete") { + if (ftDecision.serve === "free") { + // Pool was debited at decide() time. Actual cost may be less than estimated. + reservedAbsorbed = ftDecision.absorbSats; + absorbedSats = Math.min(reservedAbsorbed, fullDebitSats); + debitedSats = Math.max(0, fullDebitSats - absorbedSats); + freeTierServed = true; + } else { + // Partial: decide() was advisory (no pool debit). Atomically debit NOW, after + // work completed, using actual cost capped by the advisory absorbSats limit. + const wantedAbsorb = Math.min(ftDecision.absorbSats, fullDebitSats); + const actualReserved = await freeTierService.reservePartialGrant( + wantedAbsorb, + session.nostrPubkey!, + ); + reservedAbsorbed = actualReserved; + absorbedSats = actualReserved; + debitedSats = Math.max(0, fullDebitSats - absorbedSats); + freeTierServed = absorbedSats > 0; + } + + if (freeTierServed) { + const reqHash = createHash("sha256").update(requestText).digest("hex"); + await freeTierService.recordGrant( + session.nostrPubkey!, + reqHash, + absorbedSats, + reservedAbsorbed, + ); + } + } else if (ftDecision.serve === "free") { + // Work failed or was rejected — release the pool reservation from decide() + void freeTierService.releaseReservation( + ftDecision.absorbSats, + `session work ${finalState}`, + ); + } + // Partial + failed: no pool debit was made (advisory only), nothing to release. } // Credit pool from paid portion (even if partial free tier)