Files
timmy-tower/artifacts/api-server/src/lib/strfry.ts
alexpaynex a95fd76ebd task/32: Event moderation queue + Timmy AI review
## What was built
Full moderation pipeline: relay_event_queue table, strfry inject helper,
ModerationService with Claude haiku review, policy tier routing, 30s poll loop,
admin approve/reject/list endpoints.

## DB schema (`lib/db/src/schema/relay-event-queue.ts`)
relay_event_queue: event_id (PK), pubkey (FK → nostr_identities), kind,
raw_event (text JSON), status (pending/approved/rejected/auto_approved),
reviewed_by (timmy_ai/admin/null), review_reason, created_at, decided_at.
Exported from schema/index.ts. Pushed via pnpm run push.

## strfry HTTP client (`artifacts/api-server/src/lib/strfry.ts`)
injectEvent(rawEventJson) — POST {STRFRY_URL}/import (NDJSON).
STRFRY_URL defaults to "http://strfry:7777" (Docker internal network).
5s timeout; graceful failure in dev when strfry not running; never throws.

## ModerationService (`artifacts/api-server/src/lib/moderation.ts`)
- enqueue(event) — insert pending row; idempotent onConflictDoNothing
- autoReview(eventId) — Claude haiku prompt: approve or flag. On flag, marks
  reviewedBy=timmy_ai and leaves pending for admin. On approve, calls decide().
- decide(eventId, status, reason, reviewedBy) — updates DB + calls injectEvent
- processPending(limit=10) — batch poll: auto-review up to limit pending events
- Stub mode: auto-approves all events when Anthropic key absent

## Policy endpoint update (`artifacts/api-server/src/routes/relay.ts`)
Tier routing in evaluatePolicy:
  read/none → reject (unchanged)
  write + elite tier → injectEvent + accept (elite bypass; shadowReject if inject fails)
  write + non-elite → enqueue + shadowReject (held for moderation)
Imports db/nostrIdentities directly for tier check. Both inject and enqueue errors
are fail-closed (reject vs shadowReject respectively).

## Background poll loop (`artifacts/api-server/src/index.ts`)
setInterval every 30s calling moderationService.processPending(10).
Interval configurable via MODERATION_POLL_MS env var.
Errors caught per-event; poll loop never crashes the server.

## Admin queue routes (`artifacts/api-server/src/routes/admin-relay-queue.ts`)
ADMIN_SECRET Bearer auth (same pattern as admin-relay.ts).
GET  /api/admin/relay/queue?status=...        — list all / by status
POST /api/admin/relay/queue/:eventId/approve  — approve + inject into strfry
POST /api/admin/relay/queue/:eventId/reject   — reject (no inject)
409 on duplicate decisions. Registered in routes/index.ts.

## Smoke tests (all pass)
Unknown → reject ✓; elite → shadowReject (strfry unavailable in dev) ✓;
non-elite write → shadowReject + pending in queue ✓; admin approve → approved ✓;
moderation poll loop started ✓; TypeScript 0 errors.
2026-03-19 20:35:39 +00:00

76 lines
2.3 KiB
TypeScript

/**
* strfry.ts — strfry relay HTTP client
*
* Provides `injectEvent(rawEventJson)` which POSTs a raw NIP-01 event to
* strfry's HTTP import endpoint, making it visible to relay subscribers.
*
* Used by ModerationService.decide() (approved events) and the relay policy
* handler (elite events that bypass the queue).
*
* STRFRY_URL env var: base URL of the strfry relay HTTP API.
* Defaults to "http://strfry:7777" (Docker internal network).
* In Replit dev the relay is not running — errors are logged and swallowed.
*/
import { makeLogger } from "./logger.js";
const logger = makeLogger("strfry");
const STRFRY_URL = (process.env["STRFRY_URL"] ?? "http://strfry:7777").replace(/\/$/, "");
const INJECT_TIMEOUT_MS = 5000;
export interface InjectResult {
ok: boolean;
error?: string;
}
/**
* Inject a raw NIP-01 event JSON string into strfry via the HTTP import API.
* strfry's POST /import accepts newline-delimited JSON events.
*
* Returns { ok: true } on success.
* Returns { ok: false, error } on failure (does NOT throw — callers are
* responsible for deciding whether to retry or surface the failure).
*/
export async function injectEvent(rawEventJson: string): Promise<InjectResult> {
const url = `${STRFRY_URL}/import`;
let response: Response;
try {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), INJECT_TIMEOUT_MS);
response = await fetch(url, {
method: "POST",
headers: { "Content-Type": "application/x-ndjson" },
body: rawEventJson + "\n",
signal: controller.signal,
});
clearTimeout(timeout);
} catch (err) {
const msg = err instanceof Error ? err.message : String(err);
logger.warn("strfry inject: network error", { url, error: msg });
return { ok: false, error: msg };
}
if (!response.ok) {
const body = await response.text().catch(() => "");
logger.warn("strfry inject: non-200 response", {
url,
status: response.status,
body: body.slice(0, 200),
});
return { ok: false, error: `HTTP ${response.status}: ${body.slice(0, 100)}` };
}
logger.info("strfry inject: event published", {
eventId: (() => {
try { return (JSON.parse(rawEventJson) as { id?: string }).id?.slice(0, 8) ?? "?"; }
catch { return "?"; }
})(),
});
return { ok: true };
}