Files
timmy-tower/artifacts/api-server/src/lib/moderation.ts
alexpaynex a95fd76ebd task/32: Event moderation queue + Timmy AI review
## What was built
Full moderation pipeline: relay_event_queue table, strfry inject helper,
ModerationService with Claude haiku review, policy tier routing, 30s poll loop,
admin approve/reject/list endpoints.

## DB schema (`lib/db/src/schema/relay-event-queue.ts`)
relay_event_queue: event_id (PK), pubkey (FK → nostr_identities), kind,
raw_event (text JSON), status (pending/approved/rejected/auto_approved),
reviewed_by (timmy_ai/admin/null), review_reason, created_at, decided_at.
Exported from schema/index.ts. Pushed via pnpm run push.

## strfry HTTP client (`artifacts/api-server/src/lib/strfry.ts`)
injectEvent(rawEventJson) — POST {STRFRY_URL}/import (NDJSON).
STRFRY_URL defaults to "http://strfry:7777" (Docker internal network).
5s timeout; graceful failure in dev when strfry not running; never throws.

## ModerationService (`artifacts/api-server/src/lib/moderation.ts`)
- enqueue(event) — insert pending row; idempotent onConflictDoNothing
- autoReview(eventId) — Claude haiku prompt: approve or flag. On flag, marks
  reviewedBy=timmy_ai and leaves pending for admin. On approve, calls decide().
- decide(eventId, status, reason, reviewedBy) — updates DB + calls injectEvent
- processPending(limit=10) — batch poll: auto-review up to limit pending events
- Stub mode: auto-approves all events when Anthropic key absent

## Policy endpoint update (`artifacts/api-server/src/routes/relay.ts`)
Tier routing in evaluatePolicy:
  read/none → reject (unchanged)
  write + elite tier → injectEvent + accept (elite bypass; shadowReject if inject fails)
  write + non-elite → enqueue + shadowReject (held for moderation)
Imports db/nostrIdentities directly for tier check. Both inject and enqueue errors
are fail-closed (reject vs shadowReject respectively).

## Background poll loop (`artifacts/api-server/src/index.ts`)
setInterval every 30s calling moderationService.processPending(10).
Interval configurable via MODERATION_POLL_MS env var.
Errors caught per-event; poll loop never crashes the server.

## Admin queue routes (`artifacts/api-server/src/routes/admin-relay-queue.ts`)
ADMIN_SECRET Bearer auth (same pattern as admin-relay.ts).
GET  /api/admin/relay/queue?status=...        — list all / by status
POST /api/admin/relay/queue/:eventId/approve  — approve + inject into strfry
POST /api/admin/relay/queue/:eventId/reject   — reject (no inject)
409 on duplicate decisions. Registered in routes/index.ts.

## Smoke tests (all pass)
Unknown → reject ✓; elite → shadowReject (strfry unavailable in dev) ✓;
non-elite write → shadowReject + pending in queue ✓; admin approve → approved ✓;
moderation poll loop started ✓; TypeScript 0 errors.
2026-03-19 20:35:39 +00:00

269 lines
8.4 KiB
TypeScript

/**
* moderation.ts — Event moderation queue + Timmy AI review
*
* Every Nostr event from a non-elite whitelisted account is held in
* relay_event_queue with status "pending". Timmy (Claude haiku) reviews
* pending events in a background poll loop and either auto_approves them
* (injecting into strfry) or flags them for admin review.
*
* Elite accounts bypass this queue — their events are injected directly
* from the relay policy handler.
*/
import { db, relayEventQueue, type QueueReviewer } from "@workspace/db";
import { eq, and } from "drizzle-orm";
import { makeLogger } from "./logger.js";
import { injectEvent } from "./strfry.js";
const logger = makeLogger("moderation");
// ── Stub mode (mirrors agent.ts) ─────────────────────────────────────────────
const STUB_MODE =
!process.env["AI_INTEGRATIONS_ANTHROPIC_API_KEY"] ||
!process.env["AI_INTEGRATIONS_ANTHROPIC_BASE_URL"];
if (STUB_MODE) {
logger.warn("no Anthropic key — moderation running in STUB mode (auto-approve all)");
}
// ── Anthropic lazy client (reuse from agent.ts pattern) ──────────────────────
interface AnthropicLike {
messages: {
create(params: Record<string, unknown>): Promise<{
content: Array<{ type: string; text?: string }>;
usage: { input_tokens: number; output_tokens: number };
}>;
};
}
let _anthropic: AnthropicLike | null = null;
async function getClient(): Promise<AnthropicLike> {
if (_anthropic) return _anthropic;
// @ts-expect-error -- integrations-anthropic-ai exports src directly
const mod = (await import("@workspace/integrations-anthropic-ai")) as { anthropic: AnthropicLike };
_anthropic = mod.anthropic;
return _anthropic;
}
// ── Moderation prompt ─────────────────────────────────────────────────────────
const MODERATION_SYSTEM = `You are moderating events on a sovereign Nostr relay. Your job is to approve benign content and flag anything harmful.
APPROVE if the event is: a standard text note, profile update, reaction, encrypted DM, relay list, metadata update, or other typical Nostr activity.
FLAG if the event is: spam, harassment, illegal content, NSFW without appropriate warnings, coordinated abuse, or clearly malicious.
Respond ONLY with valid JSON: {"decision": "approve", "reason": "..."} or {"decision": "flag", "reason": "..."}`;
type ModerationDecision = "approve" | "flag";
interface ModerationResult {
decision: ModerationDecision;
reason: string;
}
async function callClaude(kind: number, content: string): Promise<ModerationResult> {
if (STUB_MODE) {
return { decision: "approve", reason: "Stub: auto-approved (no Anthropic key)" };
}
const client = await getClient();
const message = await client.messages.create({
model: process.env["MODERATION_MODEL"] ?? "claude-haiku-4-5",
max_tokens: 256,
system: MODERATION_SYSTEM,
messages: [
{
role: "user",
content: `Nostr event kind ${kind}. Content: ${content.slice(0, 2000)}`,
},
],
});
const block = message.content[0];
if (!block || block.type !== "text") {
return { decision: "flag", reason: "AI returned unexpected response" };
}
try {
const raw = block.text!.replace(/^```(?:json)?\s*/i, "").replace(/\s*```$/, "").trim();
const parsed = JSON.parse(raw) as { decision: string; reason?: string };
const decision = parsed.decision === "approve" ? "approve" : "flag";
return { decision, reason: parsed.reason ?? "" };
} catch {
logger.warn("moderation: failed to parse Claude response", {
text: block.text!.slice(0, 100),
});
return { decision: "flag", reason: "Failed to parse AI response" };
}
}
// ── ModerationService ─────────────────────────────────────────────────────────
export class ModerationService {
/**
* Insert an event into the moderation queue with "pending" status.
* Idempotent: if the event_id already exists, the insert is silently skipped.
*/
async enqueue(event: {
id: string;
pubkey: string;
kind: number;
rawJson: string;
}): Promise<void> {
await db
.insert(relayEventQueue)
.values({
eventId: event.id,
pubkey: event.pubkey,
kind: event.kind,
rawEvent: event.rawJson,
status: "pending",
})
.onConflictDoNothing();
logger.info("moderation: event enqueued", {
eventId: event.id.slice(0, 8),
pubkey: event.pubkey.slice(0, 8),
kind: event.kind,
});
}
/**
* Review a single pending event with Claude.
* Returns "approve" (event is injected into strfry + status → auto_approved)
* or "flag" (status stays pending — admin must decide).
*/
async autoReview(eventId: string): Promise<ModerationDecision> {
const rows = await db
.select()
.from(relayEventQueue)
.where(
and(
eq(relayEventQueue.eventId, eventId),
eq(relayEventQueue.status, "pending"),
),
)
.limit(1);
const row = rows[0];
if (!row) {
logger.warn("moderation: autoReview called on non-pending event", { eventId });
return "flag";
}
let content = "";
try {
const parsed = JSON.parse(row.rawEvent) as { content?: string };
content = parsed.content ?? "";
} catch {
content = "";
}
let result: ModerationResult;
try {
result = await callClaude(row.kind, content);
} catch (err) {
logger.error("moderation: Claude call failed — flagging for admin review", {
eventId: eventId.slice(0, 8),
err,
});
result = { decision: "flag", reason: "AI review failed — admin review required" };
}
if (result.decision === "approve") {
await this.decide(eventId, "auto_approved", result.reason, "timmy_ai");
} else {
// Update reason but leave status as "pending" for admin
await db
.update(relayEventQueue)
.set({ reviewReason: result.reason, reviewedBy: "timmy_ai" })
.where(eq(relayEventQueue.eventId, eventId));
logger.info("moderation: event flagged for admin review", {
eventId: eventId.slice(0, 8),
reason: result.reason,
});
}
return result.decision;
}
/**
* Apply a moderation decision (approve/auto_approved/rejected).
* On approval: inject the event into strfry.
*/
async decide(
eventId: string,
status: "approved" | "rejected" | "auto_approved",
reason: string,
reviewedBy: QueueReviewer,
): Promise<void> {
await db
.update(relayEventQueue)
.set({
status,
reviewedBy,
reviewReason: reason,
decidedAt: new Date(),
})
.where(eq(relayEventQueue.eventId, eventId));
logger.info("moderation: decision recorded", {
eventId: eventId.slice(0, 8),
status,
reviewedBy,
});
if (status === "approved" || status === "auto_approved") {
const rows = await db
.select({ rawEvent: relayEventQueue.rawEvent })
.from(relayEventQueue)
.where(eq(relayEventQueue.eventId, eventId))
.limit(1);
const rawEvent = rows[0]?.rawEvent;
if (rawEvent) {
const result = await injectEvent(rawEvent);
if (!result.ok) {
logger.error("moderation: strfry inject failed after approval", {
eventId: eventId.slice(0, 8),
error: result.error,
});
}
}
}
}
/**
* Background poll: auto-review up to `limit` pending events.
* Called every 30 seconds from the startup poll loop.
*/
async processPending(limit = 10): Promise<void> {
const rows = await db
.select({ eventId: relayEventQueue.eventId })
.from(relayEventQueue)
.where(eq(relayEventQueue.status, "pending"))
.limit(limit);
if (rows.length === 0) return;
logger.info("moderation: processing pending events", { count: rows.length });
for (const { eventId } of rows) {
try {
await this.autoReview(eventId);
} catch (err) {
logger.error("moderation: poll failed for event", {
eventId: eventId.slice(0, 8),
err,
});
}
}
}
}
export const moderationService = new ModerationService();