Files
timmy-tower/artifacts/api-server/src/index.ts
alexpaynex a95fd76ebd task/32: Event moderation queue + Timmy AI review
## What was built
Full moderation pipeline: relay_event_queue table, strfry inject helper,
ModerationService with Claude haiku review, policy tier routing, 30s poll loop,
admin approve/reject/list endpoints.

## DB schema (`lib/db/src/schema/relay-event-queue.ts`)
relay_event_queue: event_id (PK), pubkey (FK → nostr_identities), kind,
raw_event (text JSON), status (pending/approved/rejected/auto_approved),
reviewed_by (timmy_ai/admin/null), review_reason, created_at, decided_at.
Exported from schema/index.ts. Pushed via pnpm run push.

## strfry HTTP client (`artifacts/api-server/src/lib/strfry.ts`)
injectEvent(rawEventJson) — POST {STRFRY_URL}/import (NDJSON).
STRFRY_URL defaults to "http://strfry:7777" (Docker internal network).
5s timeout; graceful failure in dev when strfry not running; never throws.

## ModerationService (`artifacts/api-server/src/lib/moderation.ts`)
- enqueue(event) — insert pending row; idempotent onConflictDoNothing
- autoReview(eventId) — Claude haiku prompt: approve or flag. On flag, marks
  reviewedBy=timmy_ai and leaves pending for admin. On approve, calls decide().
- decide(eventId, status, reason, reviewedBy) — updates DB + calls injectEvent
- processPending(limit=10) — batch poll: auto-review up to limit pending events
- Stub mode: auto-approves all events when Anthropic key absent

## Policy endpoint update (`artifacts/api-server/src/routes/relay.ts`)
Tier routing in evaluatePolicy:
  read/none → reject (unchanged)
  write + elite tier → injectEvent + accept (elite bypass; shadowReject if inject fails)
  write + non-elite → enqueue + shadowReject (held for moderation)
Imports db/nostrIdentities directly for tier check. Both inject and enqueue errors
are fail-closed (reject vs shadowReject respectively).

## Background poll loop (`artifacts/api-server/src/index.ts`)
setInterval every 30s calling moderationService.processPending(10).
Interval configurable via MODERATION_POLL_MS env var.
Errors caught per-event; poll loop never crashes the server.

## Admin queue routes (`artifacts/api-server/src/routes/admin-relay-queue.ts`)
ADMIN_SECRET Bearer auth (same pattern as admin-relay.ts).
GET  /api/admin/relay/queue?status=...        — list all / by status
POST /api/admin/relay/queue/:eventId/approve  — approve + inject into strfry
POST /api/admin/relay/queue/:eventId/reject   — reject (no inject)
409 on duplicate decisions. Registered in routes/index.ts.

## Smoke tests (all pass)
Unknown → reject ✓; elite → shadowReject (strfry unavailable in dev) ✓;
non-elite write → shadowReject + pending in queue ✓; admin approve → approved ✓;
moderation poll loop started ✓; TypeScript 0 errors.
2026-03-19 20:35:39 +00:00

63 lines
2.6 KiB
TypeScript

import { createServer } from "http";
import app from "./app.js";
import { attachWebSocketServer } from "./routes/events.js";
import { rootLogger } from "./lib/logger.js";
import { timmyIdentityService } from "./lib/timmy-identity.js";
import { startEngagementEngine } from "./lib/engagement.js";
import { relayAccountService } from "./lib/relay-accounts.js";
import { moderationService } from "./lib/moderation.js";
const rawPort = process.env["PORT"];
if (!rawPort) {
throw new Error("PORT environment variable is required but was not provided.");
}
const port = Number(rawPort);
if (Number.isNaN(port) || port <= 0) {
throw new Error(`Invalid PORT value: "${rawPort}"`);
}
const server = createServer(app);
attachWebSocketServer(server);
server.listen(port, () => {
rootLogger.info("server started", { port });
rootLogger.info("timmy identity", { npub: timmyIdentityService.npub });
const domain = process.env["REPLIT_DEV_DOMAIN"];
if (domain) {
rootLogger.info("public url", { url: `https://${domain}/api/ui` });
rootLogger.info("tower url", { url: `https://${domain}/tower` });
rootLogger.info("ws url", { url: `wss://${domain}/api/ws` });
}
startEngagementEngine();
// ── Moderation poll loop ─────────────────────────────────────────────────
// Processes up to 10 pending relay events every 30 seconds via Claude haiku.
const MODERATION_POLL_MS = parseInt(process.env["MODERATION_POLL_MS"] ?? "", 10) || 30_000;
setInterval(() => {
moderationService.processPending(10).catch((err) =>
rootLogger.error("moderation poll error", { err }),
);
}, MODERATION_POLL_MS);
rootLogger.info("moderation poll loop started", { intervalMs: MODERATION_POLL_MS });
// Seed Timmy's own pubkey with elite identity + relay write access.
// Resolves pubkey from TIMMY_NOSTR_PUBKEY env var if set (explicit override),
// otherwise falls back to the hex pubkey derived from TIMMY_NOSTR_NSEC.
// Idempotent — safe to run on every startup.
const timmyPubkey = process.env["TIMMY_NOSTR_PUBKEY"] ?? timmyIdentityService.pubkeyHex;
relayAccountService
.seedElite(timmyPubkey, "Timmy's own pubkey — elite access seeded at startup")
.then(() =>
rootLogger.info("relay: Timmy's pubkey seeded with elite access", {
pubkey: timmyPubkey.slice(0, 8),
source: process.env["TIMMY_NOSTR_PUBKEY"] ? "TIMMY_NOSTR_PUBKEY env" : "timmyIdentityService",
}),
)
.catch((err) =>
rootLogger.warn("relay: failed to seed Timmy's pubkey", { err }),
);
});