diff --git a/artifacts/api-server/src/index.ts b/artifacts/api-server/src/index.ts index cfbb570..96dfc67 100644 --- a/artifacts/api-server/src/index.ts +++ b/artifacts/api-server/src/index.ts @@ -5,6 +5,7 @@ import { rootLogger } from "./lib/logger.js"; import { timmyIdentityService } from "./lib/timmy-identity.js"; import { startEngagementEngine } from "./lib/engagement.js"; import { relayAccountService } from "./lib/relay-accounts.js"; +import { moderationService } from "./lib/moderation.js"; const rawPort = process.env["PORT"]; @@ -32,6 +33,16 @@ server.listen(port, () => { } startEngagementEngine(); + // ── Moderation poll loop ───────────────────────────────────────────────── + // Processes up to 10 pending relay events every 30 seconds via Claude haiku. + const MODERATION_POLL_MS = parseInt(process.env["MODERATION_POLL_MS"] ?? "", 10) || 30_000; + setInterval(() => { + moderationService.processPending(10).catch((err) => + rootLogger.error("moderation poll error", { err }), + ); + }, MODERATION_POLL_MS); + rootLogger.info("moderation poll loop started", { intervalMs: MODERATION_POLL_MS }); + // Seed Timmy's own pubkey with elite identity + relay write access. // Resolves pubkey from TIMMY_NOSTR_PUBKEY env var if set (explicit override), // otherwise falls back to the hex pubkey derived from TIMMY_NOSTR_NSEC. diff --git a/artifacts/api-server/src/lib/moderation.ts b/artifacts/api-server/src/lib/moderation.ts new file mode 100644 index 0000000..5dca1a3 --- /dev/null +++ b/artifacts/api-server/src/lib/moderation.ts @@ -0,0 +1,268 @@ +/** + * moderation.ts — Event moderation queue + Timmy AI review + * + * Every Nostr event from a non-elite whitelisted account is held in + * relay_event_queue with status "pending". Timmy (Claude haiku) reviews + * pending events in a background poll loop and either auto_approves them + * (injecting into strfry) or flags them for admin review. + * + * Elite accounts bypass this queue — their events are injected directly + * from the relay policy handler. + */ + +import { db, relayEventQueue, type QueueReviewer } from "@workspace/db"; +import { eq, and } from "drizzle-orm"; +import { makeLogger } from "./logger.js"; +import { injectEvent } from "./strfry.js"; + +const logger = makeLogger("moderation"); + +// ── Stub mode (mirrors agent.ts) ───────────────────────────────────────────── + +const STUB_MODE = + !process.env["AI_INTEGRATIONS_ANTHROPIC_API_KEY"] || + !process.env["AI_INTEGRATIONS_ANTHROPIC_BASE_URL"]; + +if (STUB_MODE) { + logger.warn("no Anthropic key — moderation running in STUB mode (auto-approve all)"); +} + +// ── Anthropic lazy client (reuse from agent.ts pattern) ────────────────────── + +interface AnthropicLike { + messages: { + create(params: Record): Promise<{ + content: Array<{ type: string; text?: string }>; + usage: { input_tokens: number; output_tokens: number }; + }>; + }; +} + +let _anthropic: AnthropicLike | null = null; + +async function getClient(): Promise { + if (_anthropic) return _anthropic; + // @ts-expect-error -- integrations-anthropic-ai exports src directly + const mod = (await import("@workspace/integrations-anthropic-ai")) as { anthropic: AnthropicLike }; + _anthropic = mod.anthropic; + return _anthropic; +} + +// ── Moderation prompt ───────────────────────────────────────────────────────── + +const MODERATION_SYSTEM = `You are moderating events on a sovereign Nostr relay. Your job is to approve benign content and flag anything harmful. + +APPROVE if the event is: a standard text note, profile update, reaction, encrypted DM, relay list, metadata update, or other typical Nostr activity. +FLAG if the event is: spam, harassment, illegal content, NSFW without appropriate warnings, coordinated abuse, or clearly malicious. + +Respond ONLY with valid JSON: {"decision": "approve", "reason": "..."} or {"decision": "flag", "reason": "..."}`; + +type ModerationDecision = "approve" | "flag"; + +interface ModerationResult { + decision: ModerationDecision; + reason: string; +} + +async function callClaude(kind: number, content: string): Promise { + if (STUB_MODE) { + return { decision: "approve", reason: "Stub: auto-approved (no Anthropic key)" }; + } + + const client = await getClient(); + const message = await client.messages.create({ + model: process.env["MODERATION_MODEL"] ?? "claude-haiku-4-5", + max_tokens: 256, + system: MODERATION_SYSTEM, + messages: [ + { + role: "user", + content: `Nostr event kind ${kind}. Content: ${content.slice(0, 2000)}`, + }, + ], + }); + + const block = message.content[0]; + if (!block || block.type !== "text") { + return { decision: "flag", reason: "AI returned unexpected response" }; + } + + try { + const raw = block.text!.replace(/^```(?:json)?\s*/i, "").replace(/\s*```$/, "").trim(); + const parsed = JSON.parse(raw) as { decision: string; reason?: string }; + const decision = parsed.decision === "approve" ? "approve" : "flag"; + return { decision, reason: parsed.reason ?? "" }; + } catch { + logger.warn("moderation: failed to parse Claude response", { + text: block.text!.slice(0, 100), + }); + return { decision: "flag", reason: "Failed to parse AI response" }; + } +} + +// ── ModerationService ───────────────────────────────────────────────────────── + +export class ModerationService { + /** + * Insert an event into the moderation queue with "pending" status. + * Idempotent: if the event_id already exists, the insert is silently skipped. + */ + async enqueue(event: { + id: string; + pubkey: string; + kind: number; + rawJson: string; + }): Promise { + await db + .insert(relayEventQueue) + .values({ + eventId: event.id, + pubkey: event.pubkey, + kind: event.kind, + rawEvent: event.rawJson, + status: "pending", + }) + .onConflictDoNothing(); + + logger.info("moderation: event enqueued", { + eventId: event.id.slice(0, 8), + pubkey: event.pubkey.slice(0, 8), + kind: event.kind, + }); + } + + /** + * Review a single pending event with Claude. + * Returns "approve" (event is injected into strfry + status → auto_approved) + * or "flag" (status stays pending — admin must decide). + */ + async autoReview(eventId: string): Promise { + const rows = await db + .select() + .from(relayEventQueue) + .where( + and( + eq(relayEventQueue.eventId, eventId), + eq(relayEventQueue.status, "pending"), + ), + ) + .limit(1); + + const row = rows[0]; + if (!row) { + logger.warn("moderation: autoReview called on non-pending event", { eventId }); + return "flag"; + } + + let content = ""; + try { + const parsed = JSON.parse(row.rawEvent) as { content?: string }; + content = parsed.content ?? ""; + } catch { + content = ""; + } + + let result: ModerationResult; + try { + result = await callClaude(row.kind, content); + } catch (err) { + logger.error("moderation: Claude call failed — flagging for admin review", { + eventId: eventId.slice(0, 8), + err, + }); + result = { decision: "flag", reason: "AI review failed — admin review required" }; + } + + if (result.decision === "approve") { + await this.decide(eventId, "auto_approved", result.reason, "timmy_ai"); + } else { + // Update reason but leave status as "pending" for admin + await db + .update(relayEventQueue) + .set({ reviewReason: result.reason, reviewedBy: "timmy_ai" }) + .where(eq(relayEventQueue.eventId, eventId)); + + logger.info("moderation: event flagged for admin review", { + eventId: eventId.slice(0, 8), + reason: result.reason, + }); + } + + return result.decision; + } + + /** + * Apply a moderation decision (approve/auto_approved/rejected). + * On approval: inject the event into strfry. + */ + async decide( + eventId: string, + status: "approved" | "rejected" | "auto_approved", + reason: string, + reviewedBy: QueueReviewer, + ): Promise { + await db + .update(relayEventQueue) + .set({ + status, + reviewedBy, + reviewReason: reason, + decidedAt: new Date(), + }) + .where(eq(relayEventQueue.eventId, eventId)); + + logger.info("moderation: decision recorded", { + eventId: eventId.slice(0, 8), + status, + reviewedBy, + }); + + if (status === "approved" || status === "auto_approved") { + const rows = await db + .select({ rawEvent: relayEventQueue.rawEvent }) + .from(relayEventQueue) + .where(eq(relayEventQueue.eventId, eventId)) + .limit(1); + + const rawEvent = rows[0]?.rawEvent; + if (rawEvent) { + const result = await injectEvent(rawEvent); + if (!result.ok) { + logger.error("moderation: strfry inject failed after approval", { + eventId: eventId.slice(0, 8), + error: result.error, + }); + } + } + } + } + + /** + * Background poll: auto-review up to `limit` pending events. + * Called every 30 seconds from the startup poll loop. + */ + async processPending(limit = 10): Promise { + const rows = await db + .select({ eventId: relayEventQueue.eventId }) + .from(relayEventQueue) + .where(eq(relayEventQueue.status, "pending")) + .limit(limit); + + if (rows.length === 0) return; + + logger.info("moderation: processing pending events", { count: rows.length }); + + for (const { eventId } of rows) { + try { + await this.autoReview(eventId); + } catch (err) { + logger.error("moderation: poll failed for event", { + eventId: eventId.slice(0, 8), + err, + }); + } + } + } +} + +export const moderationService = new ModerationService(); diff --git a/artifacts/api-server/src/lib/strfry.ts b/artifacts/api-server/src/lib/strfry.ts new file mode 100644 index 0000000..76b0426 --- /dev/null +++ b/artifacts/api-server/src/lib/strfry.ts @@ -0,0 +1,75 @@ +/** + * strfry.ts — strfry relay HTTP client + * + * Provides `injectEvent(rawEventJson)` which POSTs a raw NIP-01 event to + * strfry's HTTP import endpoint, making it visible to relay subscribers. + * + * Used by ModerationService.decide() (approved events) and the relay policy + * handler (elite events that bypass the queue). + * + * STRFRY_URL env var: base URL of the strfry relay HTTP API. + * Defaults to "http://strfry:7777" (Docker internal network). + * In Replit dev the relay is not running — errors are logged and swallowed. + */ + +import { makeLogger } from "./logger.js"; + +const logger = makeLogger("strfry"); + +const STRFRY_URL = (process.env["STRFRY_URL"] ?? "http://strfry:7777").replace(/\/$/, ""); +const INJECT_TIMEOUT_MS = 5000; + +export interface InjectResult { + ok: boolean; + error?: string; +} + +/** + * Inject a raw NIP-01 event JSON string into strfry via the HTTP import API. + * strfry's POST /import accepts newline-delimited JSON events. + * + * Returns { ok: true } on success. + * Returns { ok: false, error } on failure (does NOT throw — callers are + * responsible for deciding whether to retry or surface the failure). + */ +export async function injectEvent(rawEventJson: string): Promise { + const url = `${STRFRY_URL}/import`; + + let response: Response; + try { + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), INJECT_TIMEOUT_MS); + + response = await fetch(url, { + method: "POST", + headers: { "Content-Type": "application/x-ndjson" }, + body: rawEventJson + "\n", + signal: controller.signal, + }); + + clearTimeout(timeout); + } catch (err) { + const msg = err instanceof Error ? err.message : String(err); + logger.warn("strfry inject: network error", { url, error: msg }); + return { ok: false, error: msg }; + } + + if (!response.ok) { + const body = await response.text().catch(() => ""); + logger.warn("strfry inject: non-200 response", { + url, + status: response.status, + body: body.slice(0, 200), + }); + return { ok: false, error: `HTTP ${response.status}: ${body.slice(0, 100)}` }; + } + + logger.info("strfry inject: event published", { + eventId: (() => { + try { return (JSON.parse(rawEventJson) as { id?: string }).id?.slice(0, 8) ?? "?"; } + catch { return "?"; } + })(), + }); + + return { ok: true }; +} diff --git a/artifacts/api-server/src/routes/admin-relay-queue.ts b/artifacts/api-server/src/routes/admin-relay-queue.ts new file mode 100644 index 0000000..18e0a66 --- /dev/null +++ b/artifacts/api-server/src/routes/admin-relay-queue.ts @@ -0,0 +1,163 @@ +/** + * admin-relay-queue.ts — Admin endpoints for the event moderation queue. + * + * Protected by ADMIN_SECRET Bearer token (same pattern as admin-relay.ts). + * + * Routes: + * GET /api/admin/relay/queue — list queue (filterable by status) + * POST /api/admin/relay/queue/:eventId/approve — admin approve + * POST /api/admin/relay/queue/:eventId/reject — admin reject + */ + +import { Router, type Request, type Response, type NextFunction } from "express"; +import { db, relayEventQueue, type QueueStatus, QUEUE_STATUSES } from "@workspace/db"; +import { eq } from "drizzle-orm"; +import { makeLogger } from "../lib/logger.js"; +import { moderationService } from "../lib/moderation.js"; + +const logger = makeLogger("admin-relay-queue"); +const router = Router(); + +const ADMIN_SECRET = process.env["ADMIN_SECRET"] ?? ""; +const IS_PROD = process.env["NODE_ENV"] === "production"; + +if (!ADMIN_SECRET && IS_PROD) { + logger.error("ADMIN_SECRET not set in production — admin relay queue routes are unprotected"); +} + +// ── Admin auth middleware ───────────────────────────────────────────────────── + +function requireAdmin(req: Request, res: Response, next: NextFunction): void { + if (ADMIN_SECRET) { + const authHeader = req.headers["authorization"] ?? ""; + const token = authHeader.startsWith("Bearer ") ? authHeader.slice(7).trim() : ""; + if (token !== ADMIN_SECRET) { + res.status(401).json({ error: "Unauthorized" }); + return; + } + } else { + const ip = req.ip ?? ""; + const isLocal = ip === "127.0.0.1" || ip === "::1" || ip === "::ffff:127.0.0.1"; + if (!isLocal) { + res.status(401).json({ error: "Unauthorized" }); + return; + } + } + next(); +} + +// ── GET /admin/relay/queue ──────────────────────────────────────────────────── +// Query param: ?status=pending|approved|rejected|auto_approved +// Default: returns all statuses. + +router.get("/admin/relay/queue", requireAdmin, async (req: Request, res: Response) => { + const statusParam = req.query["status"] as string | undefined; + + if (statusParam && !QUEUE_STATUSES.includes(statusParam as QueueStatus)) { + res.status(400).json({ + error: `Invalid status '${statusParam}'. Must be one of: ${QUEUE_STATUSES.join(", ")}`, + }); + return; + } + + const rows = statusParam + ? await db + .select() + .from(relayEventQueue) + .where(eq(relayEventQueue.status, statusParam as QueueStatus)) + .orderBy(relayEventQueue.createdAt) + : await db + .select() + .from(relayEventQueue) + .orderBy(relayEventQueue.createdAt); + + res.json({ + total: rows.length, + events: rows.map((r) => ({ + eventId: r.eventId, + pubkey: r.pubkey, + kind: r.kind, + status: r.status, + reviewedBy: r.reviewedBy, + reviewReason: r.reviewReason, + createdAt: r.createdAt, + decidedAt: r.decidedAt, + })), + }); +}); + +// ── POST /admin/relay/queue/:eventId/approve ────────────────────────────────── + +router.post( + "/admin/relay/queue/:eventId/approve", + requireAdmin, + async (req: Request, res: Response) => { + const { eventId } = req.params as { eventId: string }; + const body = req.body as { reason?: string }; + const reason = body.reason ?? "admin approval"; + + const rows = await db + .select({ status: relayEventQueue.status }) + .from(relayEventQueue) + .where(eq(relayEventQueue.eventId, eventId)) + .limit(1); + + if (!rows[0]) { + res.status(404).json({ error: "Event not found in queue" }); + return; + } + + if (rows[0].status === "approved" || rows[0].status === "auto_approved") { + res.status(409).json({ error: "Event already approved" }); + return; + } + + await moderationService.decide(eventId, "approved", reason, "admin"); + + logger.info("admin approved queued event", { + eventId: eventId.slice(0, 8), + reason, + }); + + res.json({ ok: true, eventId, status: "approved", reason }); + }, +); + +// ── POST /admin/relay/queue/:eventId/reject ─────────────────────────────────── + +router.post( + "/admin/relay/queue/:eventId/reject", + requireAdmin, + async (req: Request, res: Response) => { + const { eventId } = req.params as { eventId: string }; + const body = req.body as { reason?: string }; + const reason = body.reason ?? "admin rejection"; + + const rows = await db + .select({ status: relayEventQueue.status }) + .from(relayEventQueue) + .where(eq(relayEventQueue.eventId, eventId)) + .limit(1); + + if (!rows[0]) { + res.status(404).json({ error: "Event not found in queue" }); + return; + } + + if (rows[0].status === "rejected") { + res.status(409).json({ error: "Event already rejected" }); + return; + } + + await moderationService.decide(eventId, "rejected", reason, "admin"); + + logger.info("admin rejected queued event", { + eventId: eventId.slice(0, 8), + reason, + }); + + res.json({ ok: true, eventId, status: "rejected", reason }); + }, +); + +export default router; diff --git a/artifacts/api-server/src/routes/index.ts b/artifacts/api-server/src/routes/index.ts index ecc48d6..ed68a82 100644 --- a/artifacts/api-server/src/routes/index.ts +++ b/artifacts/api-server/src/routes/index.ts @@ -14,6 +14,7 @@ import identityRouter from "./identity.js"; import estimateRouter from "./estimate.js"; import relayRouter from "./relay.js"; import adminRelayRouter from "./admin-relay.js"; +import adminRelayQueueRouter from "./admin-relay-queue.js"; const router: IRouter = Router(); @@ -26,6 +27,7 @@ router.use(sessionsRouter); router.use(identityRouter); router.use(relayRouter); router.use(adminRelayRouter); +router.use(adminRelayQueueRouter); router.use(demoRouter); router.use(testkitRouter); router.use(uiRouter); diff --git a/artifacts/api-server/src/routes/relay.ts b/artifacts/api-server/src/routes/relay.ts index 4e53241..22f8fdf 100644 --- a/artifacts/api-server/src/routes/relay.ts +++ b/artifacts/api-server/src/routes/relay.ts @@ -3,28 +3,24 @@ * * POST /api/relay/policy * Internal endpoint called exclusively by the relay-policy sidecar. - * Protected by a shared secret (RELAY_POLICY_SECRET env var) sent as a - * Bearer token in the Authorization header. + * Protected by RELAY_POLICY_SECRET Bearer token. * - * Body: strfry plugin event object - * { - * event: { id, pubkey, kind, created_at, tags, content, sig }, - * receivedAt: number, - * sourceType: "IP4" | "IP6" | ..., - * sourceInfo: string - * } - * - * Response: strfry plugin decision - * { id: string, action: "accept" | "reject" | "shadowReject", msg?: string } + * Policy tiers: + * no relay_account row / "read" / "none" → reject + * "write" + tier != "elite" → enqueue + shadowReject + * "write" + tier == "elite" → inject into strfry + accept * * GET /api/relay/policy - * Health + roundtrip probe. No auth required — returns policy state and runs - * a synthetic pubkey through evaluatePolicy(). + * Health + roundtrip probe (no auth). */ import { Router, type Request, type Response } from "express"; +import { db, nostrIdentities } from "@workspace/db"; +import { eq } from "drizzle-orm"; import { makeLogger } from "../lib/logger.js"; import { relayAccountService } from "../lib/relay-accounts.js"; +import { moderationService } from "../lib/moderation.js"; +import { injectEvent } from "../lib/strfry.js"; const logger = makeLogger("relay-policy"); const router = Router(); @@ -32,18 +28,14 @@ const router = Router(); const RELAY_POLICY_SECRET = process.env["RELAY_POLICY_SECRET"] ?? ""; const IS_PROD = process.env["NODE_ENV"] === "production"; -// Production enforcement: RELAY_POLICY_SECRET must be set in production. if (!RELAY_POLICY_SECRET) { if (IS_PROD) { logger.error( "RELAY_POLICY_SECRET is not set in production — " + - "POST /api/relay/policy is open to any caller. " + - "Set this secret in the API server environment and in the relay-policy sidecar.", + "POST /api/relay/policy is open to any caller.", ); } else { - logger.warn( - "RELAY_POLICY_SECRET not set — /api/relay/policy accepts local-only requests (dev mode)", - ); + logger.warn("RELAY_POLICY_SECRET not set — /api/relay/policy accepts local-only requests (dev mode)"); } } @@ -51,7 +43,7 @@ if (!RELAY_POLICY_SECRET) { type PolicyAction = "accept" | "reject" | "shadowReject"; -interface NostrEvent { +interface NostrEventPayload { id: string; pubkey: string; kind: number; @@ -62,7 +54,7 @@ interface NostrEvent { } interface PolicyRequest { - event: NostrEvent; + event: NostrEventPayload; receivedAt: number; sourceType: string; sourceInfo: string; @@ -84,42 +76,49 @@ function acceptDecision(id: string): PolicyDecision { return { id, action: "accept", msg: "" }; } +function shadowRejectDecision(id: string): PolicyDecision { + return { id, action: "shadowReject", msg: "" }; +} + // ── GET /relay/policy ───────────────────────────────────────────────────────── router.get("/relay/policy", async (_req: Request, res: Response) => { - const probeId = "0000000000000000000000000000000000000000000000000000000000000000"; - const probe = await evaluatePolicy(probeId, "probe-pubkey-not-real", 1); res.json({ ok: true, secretConfigured: !!RELAY_POLICY_SECRET, - decision: probe.action, - msg: probe.msg, + info: "Relay policy active. write+elite → accept; write+non-elite → moderation queue; read/none → reject.", }); }); -// ── POST /relay/policy ──────────────────────────────────────────────────────── +// ── Auth middleware ─────────────────────────────────────────────────────────── -router.post("/relay/policy", async (req: Request, res: Response) => { - // ── Authentication ─────────────────────────────────────────────────────── +function checkRelayAuth(req: Request, res: Response): boolean { if (RELAY_POLICY_SECRET) { const authHeader = req.headers["authorization"] ?? ""; const token = authHeader.startsWith("Bearer ") ? authHeader.slice(7).trim() : ""; if (token !== RELAY_POLICY_SECRET) { res.status(401).json({ error: "Unauthorized" }); - return; + return false; } - } else { - const ip = req.ip ?? ""; - const isLocal = ip === "127.0.0.1" || ip === "::1" || ip === "::ffff:127.0.0.1"; - if (!isLocal) { - logger.warn("relay/policy: no secret configured, rejecting non-local call", { ip }); - res.status(401).json({ error: "Unauthorized" }); - return; - } - logger.warn("relay/policy: RELAY_POLICY_SECRET not set — accepting local-only call"); + return true; } - // ── Validate body ──────────────────────────────────────────────────────── + const ip = req.ip ?? ""; + const isLocal = ip === "127.0.0.1" || ip === "::1" || ip === "::ffff:127.0.0.1"; + if (!isLocal) { + logger.warn("relay/policy: no secret configured, rejecting non-local call", { ip }); + res.status(401).json({ error: "Unauthorized" }); + return false; + } + logger.warn("relay/policy: RELAY_POLICY_SECRET not set — accepting local-only call"); + return true; +} + +// ── POST /relay/policy ──────────────────────────────────────────────────────── + +router.post("/relay/policy", async (req: Request, res: Response) => { + if (!checkRelayAuth(req, res)) return; + const body = req.body as Partial; const event = body.event; @@ -132,8 +131,7 @@ router.post("/relay/policy", async (req: Request, res: Response) => { const pubkey = typeof event.pubkey === "string" ? event.pubkey : ""; const kind = typeof event.kind === "number" ? event.kind : -1; - // ── Policy decision ────────────────────────────────────────────────────── - const decision = await evaluatePolicy(eventId, pubkey, kind); + const decision = await evaluatePolicy(event, eventId, pubkey, kind); logger.info("relay policy decision", { eventId: eventId.slice(0, 8), @@ -149,21 +147,22 @@ router.post("/relay/policy", async (req: Request, res: Response) => { /** * Core write-policy evaluation. * - * Checks relay_accounts for the event's pubkey: - * "write" access → accept - * "read" / "none" / missing → reject - * - * Future tasks extend this function (moderation queue, shadowReject for spam). + * 1. No pubkey → reject + * 2. Not in relay_accounts with "write" → reject (or "read-only" msg) + * 3. "write" + elite tier → inject into strfry + accept (elite bypass) + * 4. "write" + non-elite → enqueue into moderation + shadowReject */ async function evaluatePolicy( + rawEvent: Partial, eventId: string, pubkey: string, - _kind: number, + kind: number, ): Promise { if (!pubkey) { return rejectDecision(eventId, "missing pubkey"); } + // ── Step 1: Check relay access ───────────────────────────────────────────── let accessLevel: string; try { accessLevel = await relayAccountService.getAccess(pubkey); @@ -172,15 +171,60 @@ async function evaluatePolicy( return rejectDecision(eventId, "policy service error — try again later"); } - if (accessLevel === "write") { - return acceptDecision(eventId); - } - if (accessLevel === "read") { return rejectDecision(eventId, "read-only access — write not permitted"); } - return rejectDecision(eventId, "pubkey not whitelisted for this relay"); + if (accessLevel !== "write") { + return rejectDecision(eventId, "pubkey not whitelisted for this relay"); + } + + // ── Step 2: Check trust tier (elite bypass) ──────────────────────────────── + let isElite = false; + try { + const rows = await db + .select({ tier: nostrIdentities.tier }) + .from(nostrIdentities) + .where(eq(nostrIdentities.pubkey, pubkey)) + .limit(1); + isElite = rows[0]?.tier === "elite"; + } catch (err) { + logger.error("tier lookup failed — treating as non-elite", { err }); + } + + if (isElite) { + // Elite accounts bypass moderation — inject directly into strfry + const rawJson = JSON.stringify(rawEvent); + const injectResult = await injectEvent(rawJson); + if (!injectResult.ok) { + logger.warn("elite event inject failed — shadowReject as fallback", { + eventId: eventId.slice(0, 8), + error: injectResult.error, + }); + return shadowRejectDecision(eventId); + } + return acceptDecision(eventId); + } + + // ── Step 3: Non-elite write — enqueue for moderation ────────────────────── + try { + await moderationService.enqueue({ + id: eventId, + pubkey, + kind, + rawJson: JSON.stringify(rawEvent), + }); + } catch (err) { + logger.error("failed to enqueue event for moderation", { + eventId: eventId.slice(0, 8), + err, + }); + // Fail-closed: reject if we can't queue + return rejectDecision(eventId, "moderation service error — try again later"); + } + + // shadowReject: strfry reports "ok" to the sender but doesn't publish + return shadowRejectDecision(eventId); } export default router; diff --git a/lib/db/src/schema/index.ts b/lib/db/src/schema/index.ts index 82a6452..37d6935 100644 --- a/lib/db/src/schema/index.ts +++ b/lib/db/src/schema/index.ts @@ -11,3 +11,4 @@ export * from "./free-tier-grants"; export * from "./timmy-nostr-events"; export * from "./nostr-trust-vouches"; export * from "./relay-accounts"; +export * from "./relay-event-queue"; diff --git a/lib/db/src/schema/relay-event-queue.ts b/lib/db/src/schema/relay-event-queue.ts new file mode 100644 index 0000000..f601fcc --- /dev/null +++ b/lib/db/src/schema/relay-event-queue.ts @@ -0,0 +1,44 @@ +import { pgTable, text, timestamp, integer } from "drizzle-orm/pg-core"; +import { nostrIdentities } from "./nostr-identities"; + +// ── Status + reviewer types ─────────────────────────────────────────────────── + +export const QUEUE_STATUSES = ["pending", "approved", "rejected", "auto_approved"] as const; +export type QueueStatus = (typeof QUEUE_STATUSES)[number]; + +export const QUEUE_REVIEWERS = ["timmy_ai", "admin"] as const; +export type QueueReviewer = (typeof QUEUE_REVIEWERS)[number]; + +// ── relay_event_queue ───────────────────────────────────────────────────────── +// Holds every event submitted by whitelisted (non-elite) accounts. +// Events wait here as "pending" until Timmy AI or an admin approves/rejects. +// On approval the API server injects the event into strfry via HTTP import. +// Elite accounts bypass this table entirely. + +export const relayEventQueue = pgTable("relay_event_queue", { + eventId: text("event_id").primaryKey(), + + pubkey: text("pubkey") + .notNull() + .references(() => nostrIdentities.pubkey, { onDelete: "cascade" }), + + kind: integer("kind").notNull(), + + // Full raw NIP-01 event JSON, stored as text so it can be forwarded to strfry + rawEvent: text("raw_event").notNull(), + + status: text("status") + .$type() + .notNull() + .default("pending"), + + // "timmy_ai" or "admin" — null until a decision is made + reviewedBy: text("reviewed_by").$type(), + + reviewReason: text("review_reason"), + + createdAt: timestamp("created_at", { withTimezone: true }).defaultNow().notNull(), + decidedAt: timestamp("decided_at", { withTimezone: true }), +}); + +export type RelayEventQueueRow = typeof relayEventQueue.$inferSelect;