fix(#14,#16): stub mode for executeWorkStreaming + SSE race condition

Fix #14 — AgentService stub mode for executeWorkStreaming:
- Remove static import of @workspace/integrations-anthropic-ai (throws on
  module load when env vars absent)
- Add STUB_MODE detection at startup using AI_INTEGRATIONS_ANTHROPIC_API_KEY
  and AI_INTEGRATIONS_ANTHROPIC_BASE_URL presence
- Lazy getClient() uses dynamic import, only called in live mode
- stub executeWork() and executeWorkStreaming() return canned responses;
  streaming stub emits word-by-word with 40 ms delay to exercise SSE path
- Define local AnthropicLike interface — no dependency on @anthropic-ai/sdk
  types or unbuilt integrations-anthropic-ai dist

Fix #16 — SSE stream registry race condition:
- After bus-listener wait, refresh BOTH stream registry AND job state from DB
- If job completed while waiting (stream already gone), replay full result
  immediately instead of emitting 'Stream not available' error
- Increase wait timeout from 60 s to 90 s for mainnet payment latency
- Cleaner event bus filter: any jobId event resolves the wait (not just
  job:state and job:paid) so job:completed also unblocks waiting clients
- Named branches with comments for each resolution path

Fix pre-existing typecheck errors (bonus):
- event-bus.ts: EventEmitter overload uses any[] not unknown[]
- testkit.ts: escape bash ${ELAPSED_T14} in TS template literal
This commit is contained in:
alexpaynex
2026-03-18 22:32:53 +00:00
parent 5b3d7edf6a
commit 6f059daa8c
4 changed files with 148 additions and 51 deletions

View File

@@ -1,5 +1,3 @@
import { anthropic } from "@workspace/integrations-anthropic-ai";
export interface EvalResult {
accepted: boolean;
reason: string;
@@ -18,17 +16,81 @@ export interface AgentConfig {
workModel?: string;
}
// ── Stub mode detection ───────────────────────────────────────────────────────
// If Anthropic credentials are absent, all AI calls return canned responses so
// the server starts and exercises the full payment/state-machine flow without
// a real API key. This mirrors the LNbits stub pattern.
const STUB_MODE =
!process.env["AI_INTEGRATIONS_ANTHROPIC_API_KEY"] ||
!process.env["AI_INTEGRATIONS_ANTHROPIC_BASE_URL"];
if (STUB_MODE) {
console.log(
"[AgentService] No Anthropic key — running in STUB mode. AI responses are simulated.",
);
}
const STUB_EVAL: EvalResult = {
accepted: true,
reason: "Stub: request accepted for processing.",
inputTokens: 0,
outputTokens: 0,
};
const STUB_RESULT =
"Stub response: Timmy is running in stub mode (no Anthropic API key). " +
"Configure AI_INTEGRATIONS_ANTHROPIC_API_KEY to enable real AI responses.";
// ── Lazy client ───────────────────────────────────────────────────────────────
// Minimal local interface — avoids importing @anthropic-ai/sdk types directly.
// Dynamic import avoids the module-level throw in the integrations client when
// env vars are absent (the client.ts guard runs at module evaluation time).
interface AnthropicLike {
messages: {
create(params: Record<string, unknown>): Promise<{
content: Array<{ type: string; text?: string }>;
usage: { input_tokens: number; output_tokens: number };
}>;
stream(params: Record<string, unknown>): AsyncIterable<{
type: string;
delta?: { type: string; text?: string };
usage?: { output_tokens: number };
message?: { usage: { input_tokens: number } };
}>;
};
}
let _anthropic: AnthropicLike | null = null;
async function getClient(): Promise<AnthropicLike> {
if (_anthropic) return _anthropic;
// @ts-ignore -- TS6305: integrations-anthropic-ai exports src directly; project-reference build not required at runtime
const mod = (await import("@workspace/integrations-anthropic-ai")) as { anthropic: AnthropicLike };
_anthropic = mod.anthropic;
return _anthropic;
}
// ── AgentService ─────────────────────────────────────────────────────────────
export class AgentService {
readonly evalModel: string;
readonly workModel: string;
readonly stubMode: boolean = STUB_MODE;
constructor(config?: AgentConfig) {
this.evalModel = config?.evalModel ?? process.env.EVAL_MODEL ?? "claude-haiku-4-5";
this.workModel = config?.workModel ?? process.env.WORK_MODEL ?? "claude-sonnet-4-6";
this.evalModel = config?.evalModel ?? process.env["EVAL_MODEL"] ?? "claude-haiku-4-5";
this.workModel = config?.workModel ?? process.env["WORK_MODEL"] ?? "claude-sonnet-4-6";
}
async evaluateRequest(requestText: string): Promise<EvalResult> {
const message = await anthropic.messages.create({
if (STUB_MODE) {
// Simulate a short eval delay so state-machine tests are realistic
await new Promise((r) => setTimeout(r, 300));
return { ...STUB_EVAL };
}
const client = await getClient();
const message = await client.messages.create({
model: this.evalModel,
max_tokens: 8192,
system: `You are Timmy, an AI agent gatekeeper. Evaluate whether a request is acceptable to act on.
@@ -45,10 +107,10 @@ Respond ONLY with valid JSON: {"accepted": true, "reason": "..."} or {"accepted"
let parsed: { accepted: boolean; reason: string };
try {
const raw = block.text.replace(/^```(?:json)?\s*/i, "").replace(/\s*```$/, "").trim();
const raw = block.text!.replace(/^```(?:json)?\s*/i, "").replace(/\s*```$/, "").trim();
parsed = JSON.parse(raw) as { accepted: boolean; reason: string };
} catch {
throw new Error(`Failed to parse eval JSON: ${block.text}`);
throw new Error(`Failed to parse eval JSON: ${block.text!}`);
}
return {
@@ -60,7 +122,13 @@ Respond ONLY with valid JSON: {"accepted": true, "reason": "..."} or {"accepted"
}
async executeWork(requestText: string): Promise<WorkResult> {
const message = await anthropic.messages.create({
if (STUB_MODE) {
await new Promise((r) => setTimeout(r, 500));
return { result: STUB_RESULT, inputTokens: 0, outputTokens: 0 };
}
const client = await getClient();
const message = await client.messages.create({
model: this.workModel,
max_tokens: 8192,
system: `You are Timmy, a capable AI agent. A user has paid for you to handle their request.
@@ -74,25 +142,37 @@ Fulfill it thoroughly and helpfully. Be concise yet complete.`,
}
return {
result: block.text,
result: block.text!,
inputTokens: message.usage.input_tokens,
outputTokens: message.usage.output_tokens,
};
}
/**
* Streaming variant of executeWork (#3). Calls onChunk for every text delta
* so callers can pipe tokens to an SSE stream in real time.
* Streaming variant of executeWork (#3). Calls onChunk for every text delta.
* In stub mode, emits the canned response word-by-word to exercise the SSE
* path end-to-end without a real Anthropic key.
*/
async executeWorkStreaming(
requestText: string,
onChunk: (delta: string) => void,
): Promise<WorkResult> {
if (STUB_MODE) {
const words = STUB_RESULT.split(" ");
for (const word of words) {
const delta = word + " ";
onChunk(delta);
await new Promise((r) => setTimeout(r, 40));
}
return { result: STUB_RESULT, inputTokens: 0, outputTokens: 0 };
}
const client = await getClient();
let fullText = "";
let inputTokens = 0;
let outputTokens = 0;
const stream = anthropic.messages.stream({
const stream = client.messages.stream({
model: this.workModel,
max_tokens: 8192,
system: `You are Timmy, a capable AI agent. A user has paid for you to handle their request.
@@ -103,23 +183,19 @@ Fulfill it thoroughly and helpfully. Be concise yet complete.`,
for await (const event of stream) {
if (
event.type === "content_block_delta" &&
event.delta.type === "text_delta"
event.delta?.type === "text_delta"
) {
const delta = event.delta.text;
const delta = event.delta!.text ?? "";
fullText += delta;
onChunk(delta);
} else if (event.type === "message_delta" && event.usage) {
outputTokens = event.usage.output_tokens;
} else if (event.type === "message_start" && event.message.usage) {
inputTokens = event.message.usage.input_tokens;
outputTokens = event.usage!.output_tokens;
} else if (event.type === "message_start" && event.message?.usage) {
inputTokens = event.message!.usage.input_tokens;
}
}
return {
result: fullText,
inputTokens,
outputTokens,
};
return { result: fullText, inputTokens, outputTokens };
}
}

View File

@@ -20,7 +20,8 @@ class EventBus extends EventEmitter {
}
on(event: "bus", listener: (data: BusEvent) => void): this;
on(event: string, listener: (...args: unknown[]) => void): this {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
on(event: string, listener: (...args: any[]) => void): this {
return super.on(event, listener);
}

View File

@@ -483,17 +483,19 @@ router.get("/jobs/:id/stream", async (req: Request, res: Response) => {
req.on("close", cleanup);
// Wait up to 60 s for a stream slot to appear (work payment may not have landed yet)
// ── Wait for stream slot (fixes #16 race condition) ──────────────────────
// After the bus wait we re-check BOTH the stream registry AND the DB so we
// handle: (a) job completed while we waited (stream already gone), (b) job
// still executing but stream was registered after we first checked.
let stream = streamRegistry.get(id);
let currentJob = job;
if (!stream) {
await new Promise<void>((resolve) => {
const deadline = setTimeout(resolve, 60_000);
// 90 s timeout — generous enough for slow payment confirmations on mainnet
const deadline = setTimeout(resolve, 90_000);
const busListener = (data: Parameters<typeof eventBus.publish>[0]) => {
if (
"jobId" in data &&
data.jobId === id &&
(data.type === "job:state" || data.type === "job:paid")
) {
if ("jobId" in data && data.jobId === id) {
clearTimeout(deadline);
eventBus.off("bus", busListener);
resolve();
@@ -501,38 +503,56 @@ router.get("/jobs/:id/stream", async (req: Request, res: Response) => {
};
eventBus.on("bus", busListener);
});
// Refresh both stream slot and job state after waiting
stream = streamRegistry.get(id);
currentJob = (await getJobById(id)) ?? currentJob;
}
if (!stream) {
// Still no stream — job may have completed while we waited
const refreshed = await getJobById(id);
if (refreshed?.state === "complete" && refreshed.result) {
sendEvent("token", { text: refreshed.result });
sendEvent("done", { jobId: id, state: "complete" });
} else {
sendEvent("error", { jobId: id, message: "Stream not available" });
}
// ── Resolve: stream available ─────────────────────────────────────────────
if (stream) {
const attachToStream = (s: typeof stream) => {
s!.on("data", (chunk: Buffer) => {
sendEvent("token", { text: chunk.toString("utf8") });
});
s!.on("end", () => {
sendEvent("done", { jobId: id, state: "complete" });
res.end();
cleanup();
});
s!.on("error", (err: Error) => {
sendEvent("error", { jobId: id, message: err.message });
res.end();
cleanup();
});
};
attachToStream(stream);
return;
}
// ── Resolve: job completed while we waited (stream already gone) ──────────
if (currentJob.state === "complete" && currentJob.result) {
sendEvent("token", { text: currentJob.result });
sendEvent("done", { jobId: id, state: "complete" });
res.end();
cleanup();
return;
}
stream.on("data", (chunk: Buffer) => {
sendEvent("token", { text: chunk.toString("utf8") });
});
stream.on("end", () => {
sendEvent("done", { jobId: id, state: "complete" });
if (currentJob.state === "failed") {
sendEvent("error", { jobId: id, message: currentJob.errorMessage ?? "Job failed" });
res.end();
cleanup();
});
return;
}
stream.on("error", (err: Error) => {
sendEvent("error", { jobId: id, message: err.message });
res.end();
cleanup();
// ── Resolve: timeout with no activity — tell client to fall back to polling
sendEvent("error", {
jobId: id,
message: "Stream timed out. Poll GET /api/jobs/:id for current state.",
});
res.end();
cleanup();
});
export default router;

View File

@@ -383,7 +383,7 @@ if [[ -n "$SESSION_MACAROON" && "$SESSION_MACAROON" != "null" ]]; then
END_T14=$(date +%s)
ELAPSED_T14=$((END_T14 - START_T14))
if [[ "$T14_CODE" == "200" && ("$T14_STATE" == "complete" || "$T14_STATE" == "rejected") && -n "$T14_DEBITED" && "$T14_DEBITED" != "null" && -n "$T14_BAL" ]]; then
note PASS "state=$T14_STATE in ${ELAPSED_T14}s, debitedSats=$T14_DEBITED, balanceRemaining=$T14_BAL"
note PASS "state=$T14_STATE in \${ELAPSED_T14}s, debitedSats=$T14_DEBITED, balanceRemaining=$T14_BAL"
PASS=$((PASS+1))
else
note FAIL "code=$T14_CODE body=$T14_BODY"