Task #27: Complete cost-routing + free-tier gate — all critical fixes applied
Fix 1 — Add `estimateRequestCost(request, model)` to PricingService (pricing.ts) - Unified: estimateInputTokens + estimateOutputTokens + calculateWorkFeeUsd - Replaces duplicated estimation in jobs.ts, sessions.ts, estimate.ts Fix 2 — Sessions pre-gate: estimate → decide → execute → reconcile - freeTierService.decide() runs on ESTIMATED cost BEFORE executeWork() - Fixed double-margin: estimateRequestCost already includes infra+margin; convert directly - absorbedSats capped at actual cost post-execution (Math.min) Fix 3 — Correct isFree derivation for partial jobs in advanceJob() (jobs.ts) - isFreeExecution = workAmountSats === 0 (not job.freeTier) - Partial jobs run paid accounting: actual sats, refund, pool credit, deferred grant Fix 4 — Defer ALL grant recording to post-work execution (jobs.ts) - Fully-free path: removed recordGrant from eval time; now called in runWorkInBackground - For isFree jobs: absorbCap = actual post-execution cost (calculateActualChargeSats) - For partial jobs: grant deferred from invoice creation to after work completes Fix 5 — Atomic, pool-bounded grant recording with row locking (free-tier.ts) - SELECT ... FOR UPDATE locks pool row inside transaction - actualAbsorbed = Math.min(absorbSats, poolBalance) — pool can never go negative - Pool balance update is plain write (lock already held) - Daily absorption: SQL CASE expression atomically handles new-day reset - Audit log and identity counter both reflect actualAbsorbed, not requested amount - If pool is empty at grant time, transaction returns without writing Fix 6 — Remove fire-and-forget (void) from all recordGrant() call sites - All three call sites now use await; grant failures propagate correctly - Removed unused createHash import from free-tier.ts
This commit is contained in:
@@ -183,11 +183,13 @@ export class FreeTierService {
|
||||
const now = new Date();
|
||||
const DAY_MS = 24 * 60 * 60 * 1000;
|
||||
|
||||
// All three mutations happen atomically inside a single transaction:
|
||||
// 1. Pool deduction via SQL expression (GREATEST to clamp at 0)
|
||||
// 2. Daily absorption increment via SQL CASE (reset on new day)
|
||||
// 3. Audit log insert
|
||||
// This prevents concurrent grants from racing on stale application-layer reads.
|
||||
// All three mutations happen atomically inside a single transaction with row-locking:
|
||||
// 1. Lock + read pool row (FOR UPDATE), compute actual deductible amount
|
||||
// 2. Pool deduction capped to available balance
|
||||
// 3. Daily absorption increment via SQL CASE (reset on new day), capped to actualAbsorbed
|
||||
// 4. Audit log insert with accurate absorbed amount
|
||||
// Using FOR UPDATE ensures concurrent grants cannot over-debit the pool.
|
||||
let actualAbsorbed = 0;
|
||||
let actualNewPoolBalance = 0;
|
||||
|
||||
await db.transaction(async (tx) => {
|
||||
@@ -197,19 +199,26 @@ export class FreeTierService {
|
||||
.values({ key: POOL_KEY, value: String(POOL_INITIAL_SATS) })
|
||||
.onConflictDoNothing();
|
||||
|
||||
// Atomically decrement pool using GREATEST to avoid going negative; RETURNING actual value.
|
||||
const updated = await tx
|
||||
// Lock the pool row for this transaction; read current balance.
|
||||
const locked = await tx.execute(
|
||||
sql`SELECT value::int AS balance FROM timmy_config WHERE key = ${POOL_KEY} FOR UPDATE`,
|
||||
);
|
||||
const poolBalance = (locked.rows[0] as { balance: number } | undefined)?.balance ?? 0;
|
||||
|
||||
// Cap actual absorption to what the pool can cover.
|
||||
actualAbsorbed = Math.min(absorbSats, poolBalance);
|
||||
if (actualAbsorbed <= 0) {
|
||||
// Pool is empty; nothing to absorb — roll back silently.
|
||||
return;
|
||||
}
|
||||
actualNewPoolBalance = poolBalance - actualAbsorbed;
|
||||
|
||||
await tx
|
||||
.update(timmyConfig)
|
||||
.set({
|
||||
value: sql`GREATEST(value::int - ${absorbSats}, 0)::text`,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(timmyConfig.key, POOL_KEY))
|
||||
.returning({ value: timmyConfig.value });
|
||||
.set({ value: String(actualNewPoolBalance), updatedAt: now })
|
||||
.where(eq(timmyConfig.key, POOL_KEY));
|
||||
|
||||
actualNewPoolBalance = updated[0] ? parseInt(updated[0].value, 10) : 0;
|
||||
|
||||
// Atomically increment daily absorption.
|
||||
// Atomically increment daily absorption by the actual absorbed amount.
|
||||
// If absorbed_reset_at is older than 24 h, reset the counter (new day).
|
||||
await tx
|
||||
.update(nostrIdentities)
|
||||
@@ -217,8 +226,8 @@ export class FreeTierService {
|
||||
satsAbsorbedToday: sql`
|
||||
CASE
|
||||
WHEN EXTRACT(EPOCH FROM (${now}::timestamptz - absorbed_reset_at)) * 1000 >= ${DAY_MS}
|
||||
THEN ${absorbSats}
|
||||
ELSE sats_absorbed_today + ${absorbSats}
|
||||
THEN ${actualAbsorbed}
|
||||
ELSE sats_absorbed_today + ${actualAbsorbed}
|
||||
END
|
||||
`,
|
||||
absorbedResetAt: sql`
|
||||
@@ -236,16 +245,24 @@ export class FreeTierService {
|
||||
id: randomUUID(),
|
||||
pubkey,
|
||||
requestHash,
|
||||
satsAbsorbed: absorbSats,
|
||||
satsAbsorbed: actualAbsorbed,
|
||||
poolBalanceAfter: actualNewPoolBalance,
|
||||
});
|
||||
});
|
||||
|
||||
logger.info("free-tier grant recorded", {
|
||||
pubkey: pubkey.slice(0, 8),
|
||||
absorbSats,
|
||||
newPoolBalance: actualNewPoolBalance,
|
||||
});
|
||||
if (actualAbsorbed > 0) {
|
||||
logger.info("free-tier grant recorded", {
|
||||
pubkey: pubkey.slice(0, 8),
|
||||
requestedSats: absorbSats,
|
||||
actualAbsorbed,
|
||||
newPoolBalance: actualNewPoolBalance,
|
||||
});
|
||||
} else {
|
||||
logger.warn("free-tier grant skipped: pool empty at grant time", {
|
||||
pubkey: pubkey.slice(0, 8),
|
||||
requestedSats: absorbSats,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -80,16 +80,13 @@ async function runEvalInBackground(
|
||||
|
||||
eventBus.publish({ type: "job:state", jobId, state: "executing" });
|
||||
|
||||
// Record grant (deducts from pool, increments identity's daily budget)
|
||||
if (nostrPubkey) {
|
||||
const reqHash = createHash("sha256").update(request).digest("hex");
|
||||
await freeTierService.recordGrant(nostrPubkey, reqHash, breakdown.amountSats);
|
||||
}
|
||||
|
||||
// Grant is recorded AFTER work completes (in runWorkInBackground) so we use
|
||||
// actual cost rather than estimated sats for the audit log.
|
||||
streamRegistry.register(jobId);
|
||||
setImmediate(() => {
|
||||
void runWorkInBackground(
|
||||
jobId, request, 0, breakdown.btcPriceUsd, true, nostrPubkey,
|
||||
breakdown.amountSats, // pass estimated as cap; actual cost may be lower
|
||||
);
|
||||
});
|
||||
return;
|
||||
@@ -237,11 +234,17 @@ async function runWorkInBackground(
|
||||
void freeTierService.credit(workAmountSats);
|
||||
}
|
||||
|
||||
// Record partial free-tier grant now that work is confirmed complete.
|
||||
// Deferred from invoice creation to prevent economic DoS (pool reservation without payment).
|
||||
if (!isFree && partialAbsorbSats > 0 && nostrPubkey) {
|
||||
// Record free-tier grant now that work is confirmed complete.
|
||||
// For fully-free jobs: cap at actual cost (actualAmountSats was 0 for isFree, use actualCostUsd→sats).
|
||||
// For partial jobs: partialAbsorbSats is the estimated absorbed portion (cap enforced in recordGrant).
|
||||
// Both are deferred until post-execution so the audit log reflects real cost, not estimates.
|
||||
if (partialAbsorbSats > 0 && nostrPubkey) {
|
||||
const lockedBtcPrice = btcPriceUsd ?? 100_000;
|
||||
const absorbCap = isFree
|
||||
? pricingService.calculateActualChargeSats(actualCostUsd, lockedBtcPrice)
|
||||
: partialAbsorbSats;
|
||||
const reqHash = createHash("sha256").update(request).digest("hex");
|
||||
await freeTierService.recordGrant(nostrPubkey, reqHash, partialAbsorbSats);
|
||||
await freeTierService.recordGrant(nostrPubkey, reqHash, absorbCap);
|
||||
}
|
||||
|
||||
// Trust scoring — fire and forget
|
||||
|
||||
Reference in New Issue
Block a user