diff --git a/examples/kitchen-sink/scripts/mock-agentic-loop.ts b/examples/kitchen-sink/scripts/mock-agentic-loop.ts index 026f6731ff..ed43180c4c 100644 --- a/examples/kitchen-sink/scripts/mock-agentic-loop.ts +++ b/examples/kitchen-sink/scripts/mock-agentic-loop.ts @@ -70,6 +70,14 @@ const PROBE_INTERVAL_MS = numberFromEnv( 1_000, ); const PROBE_TIMEOUT_MS = numberFromEnv("MOCK_AGENTIC_PROBE_TIMEOUT_MS", 35_000); +const BYPASS_INTERVAL_MS = numberFromEnv( + "MOCK_AGENTIC_BYPASS_INTERVAL_MS", + 1_000, +); +const BYPASS_TIMEOUT_MS = numberFromEnv( + "MOCK_AGENTIC_BYPASS_TIMEOUT_MS", + 10_000, +); const EXPECTED_PROBE_CLOSE_CODE = numberFromEnv( "MOCK_AGENTIC_EXPECTED_PROBE_CLOSE_CODE", 1011, @@ -185,6 +193,47 @@ type ProbeStats = { expectedCloseSamples: CloseObservation[]; }; +type BypassPhase = "beforeSleep" | "afterSleep"; + +type BypassObservation = { + phase: BypassPhase; + message: string; +}; + +type BypassStats = { + attempts: number; + beforeSleepAttempts: number; + afterSleepAttempts: number; + httpSuccesses: number; + beforeSleepHttpSuccesses: number; + afterSleepHttpSuccesses: number; + webSocketSuccesses: number; + beforeSleepWebSocketSuccesses: number; + afterSleepWebSocketSuccesses: number; + timeouts: BypassObservation[]; + errors: BypassObservation[]; +}; + +type BypassHandle = { + fetch: ( + input: string, + init?: RequestInit & { + gateway?: { + bypassConnectable?: boolean; + }; + }, + ) => Promise; + webSocket: ( + path?: string, + protocols?: string | string[], + options?: { + gateway?: { + bypassConnectable?: boolean; + }; + }, + ) => Promise; +}; + type LocalKitchenSinkServer = { child: ChildProcessWithoutNullStreams; dbRoot: string; @@ -210,6 +259,27 @@ function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } +async function withTimeout( + promise: Promise, + label: string, + timeoutMs: number, +): Promise { + let timeoutHandle: NodeJS.Timeout | undefined; + try { + return await Promise.race([ + promise, + new Promise((_resolve, reject) => { + timeoutHandle = setTimeout( + () => reject(new Error(`${label} timed out after ${timeoutMs}ms`)), + timeoutMs, + ); + }), + ]); + } finally { + if (timeoutHandle) clearTimeout(timeoutHandle); + } +} + function portFromUrl(urlString: string): number { const url = new URL(urlString); if (url.port) return Number(url.port); @@ -1085,6 +1155,176 @@ async function runProbeLoop(webSocketUrl: string, stopAt: number) { return stats; } +async function runBypassAttempt( + handle: BypassHandle, + stats: BypassStats, + phase: BypassPhase, +) { + stats.attempts += 1; + if (phase === "beforeSleep") { + stats.beforeSleepAttempts += 1; + } else { + stats.afterSleepAttempts += 1; + } + const probeId = crypto.randomUUID(); + + try { + const controller = new AbortController(); + const abortTimeout = setTimeout( + () => controller.abort(), + BYPASS_TIMEOUT_MS, + ); + try { + const response = await withTimeout( + handle.fetch(`/bypass?probe=${encodeURIComponent(probeId)}`, { + method: "GET", + signal: controller.signal, + gateway: { + bypassConnectable: true, + }, + }), + "bypass http", + BYPASS_TIMEOUT_MS, + ); + if (!response.ok) { + throw new Error( + `bypass http returned ${response.status}: ${await response.text()}`, + ); + } + const body = (await response.json()) as { + type?: string; + transport?: string; + }; + if (body.type !== "bypass" || body.transport !== "http") { + throw new Error(`unexpected bypass http body ${JSON.stringify(body)}`); + } + stats.httpSuccesses += 1; + if (phase === "beforeSleep") { + stats.beforeSleepHttpSuccesses += 1; + } else { + stats.afterSleepHttpSuccesses += 1; + } + } finally { + clearTimeout(abortTimeout); + } + + const ws = await withTimeout( + handle.webSocket("/bypass", undefined, { + gateway: { + bypassConnectable: true, + }, + }), + "bypass websocket create", + BYPASS_TIMEOUT_MS, + ); + try { + await withTimeout( + waitForOpen(ws), + "bypass websocket open", + BYPASS_TIMEOUT_MS, + ); + const pong = new Promise((resolve, reject) => { + const timeoutHandle = setTimeout(() => { + cleanup(); + reject(new Error("bypass websocket timed out waiting for pong")); + }, BYPASS_TIMEOUT_MS); + const cleanup = () => { + clearTimeout(timeoutHandle); + ws.removeEventListener("message", onMessage); + ws.removeEventListener("close", onClose); + ws.removeEventListener("error", onError); + }; + const onMessage = (event: MessageEvent) => { + if (typeof event.data !== "string") return; + const message = JSON.parse(event.data) as ServerMessage; + if (message.type !== "pong" || message.probeId !== probeId) { + return; + } + cleanup(); + resolve(); + }; + const onClose = (event: CloseEvent) => { + cleanup(); + reject( + new Error( + `bypass websocket closed code=${event.code} reason=${event.reason}`, + ), + ); + }; + const onError = () => { + cleanup(); + reject(new Error("bypass websocket error")); + }; + ws.addEventListener("message", onMessage); + ws.addEventListener("close", onClose, { once: true }); + ws.addEventListener("error", onError, { once: true }); + }); + ws.send(JSON.stringify({ type: "ping", probeId })); + await pong; + stats.webSocketSuccesses += 1; + if (phase === "beforeSleep") { + stats.beforeSleepWebSocketSuccesses += 1; + } else { + stats.afterSleepWebSocketSuccesses += 1; + } + } finally { + if ( + ws.readyState === WebSocket.OPEN || + ws.readyState === WebSocket.CONNECTING + ) { + ws.close(1000, "bypass probe complete"); + } + } + } catch (error) { + const message = formatError(error); + const observation = { phase, message }; + if (message.includes("timed out")) { + stats.timeouts.push(observation); + console.error(`[bypass-timeout] phase=${phase} ${message}`); + } else { + stats.errors.push(observation); + console.error(`[bypass-error] phase=${phase} ${message}`); + } + } +} + +async function runBypassLoop( + handle: BypassHandle, + stopAt: number, + getPhase: () => BypassPhase, +) { + const stats: BypassStats = { + attempts: 0, + beforeSleepAttempts: 0, + afterSleepAttempts: 0, + httpSuccesses: 0, + beforeSleepHttpSuccesses: 0, + afterSleepHttpSuccesses: 0, + webSocketSuccesses: 0, + beforeSleepWebSocketSuccesses: 0, + afterSleepWebSocketSuccesses: 0, + timeouts: [], + errors: [], + }; + let nextProbeAt = Date.now(); + const pending = new Set>(); + + while (Date.now() < stopAt) { + await sleep(Math.max(0, nextProbeAt - Date.now())); + if (Date.now() >= stopAt) break; + const attempt = runBypassAttempt(handle, stats, getPhase()).finally( + () => { + pending.delete(attempt); + }, + ); + pending.add(attempt); + nextProbeAt += BYPASS_INTERVAL_MS; + } + + await Promise.all(pending); + return stats; +} + async function runInference( session: RawSession, requestId: string, @@ -1174,6 +1414,7 @@ async function runWorkload() { }); const handle = client.mockAgenticLoop.getOrCreate([key]); const verifier = handle as unknown as ActionVerifier; + const bypassHandle = handle as unknown as BypassHandle; const actorId = await handle.resolve(); const webSocketUrl = buildWebSocketUrl(actorId); const stopAt = Date.now() + DURATION_MS; @@ -1189,16 +1430,22 @@ async function runWorkload() { }; console.log( - `[start] endpoint=${ENDPOINT} namespace=${NAMESPACE} pool=${POOL_NAME} actorId=${actorId} ${label} durationMs=${DURATION_MS} sleepIntervalMs=${SLEEP_INTERVAL_MS} inferenceSeconds=${INFERENCE_MIN_SECONDS}-${INFERENCE_MAX_SECONDS} jitterMs=${JITTER_MIN_MS}-${JITTER_MAX_MS} probeIntervalMs=${PROBE_INTERVAL_MS}`, + `[start] endpoint=${ENDPOINT} namespace=${NAMESPACE} pool=${POOL_NAME} actorId=${actorId} ${label} durationMs=${DURATION_MS} sleepIntervalMs=${SLEEP_INTERVAL_MS} inferenceSeconds=${INFERENCE_MIN_SECONDS}-${INFERENCE_MAX_SECONDS} jitterMs=${JITTER_MIN_MS}-${JITTER_MAX_MS} probeIntervalMs=${PROBE_INTERVAL_MS} bypassIntervalMs=${BYPASS_INTERVAL_MS}`, ); const session = new RawSession(webSocketUrl, label); const sleepResultPromise = postSleep(actorId, stopAt, sleepStats); let probeResultPromise: Promise | undefined; + let bypassResultPromise: Promise | undefined; try { await connectAndValidateHistory(session, expectedRequests, MAX_RECONNECT_MS); probeResultPromise = runProbeLoop(webSocketUrl, stopAt); + bypassResultPromise = runBypassLoop( + bypassHandle, + stopAt, + () => (sleepStats.posts === 0 ? "beforeSleep" : "afterSleep"), + ); while (Date.now() < stopAt) { const jitterMs = randomInteger(JITTER_MIN_MS, JITTER_MAX_MS); @@ -1278,6 +1525,10 @@ async function runWorkload() { probeResultPromise !== undefined ? await probeResultPromise : await runProbeLoop(webSocketUrl, Date.now()); + const bypassResult = + bypassResultPromise !== undefined + ? await bypassResultPromise + : await runBypassLoop(bypassHandle, Date.now(), () => "beforeSleep"); validateHistory(await (async () => { const finalSession = new RawSession(webSocketUrl, `${label}:final`); const reconnectMs = await connectAndValidateHistory( @@ -1306,7 +1557,7 @@ async function runWorkload() { await verifyAll(verifier, expectedRequests); console.log( - `[done] actorId=${actorId} key=${key} requests=${requestCount} sleepPosts=${sleepResult.posts} sleepErrors=${sleepResult.errors} reconnects=${reconnectCount} maxReconnectMs=${maxReconnectMs} probeAttempts=${probeResult.attempts} probeSuccesses=${probeResult.successes} probeExpectedCloses=${probeResult.expectedCloses}`, + `[done] actorId=${actorId} key=${key} requests=${requestCount} sleepPosts=${sleepResult.posts} sleepErrors=${sleepResult.errors} reconnects=${reconnectCount} maxReconnectMs=${maxReconnectMs} probeAttempts=${probeResult.attempts} probeSuccesses=${probeResult.successes} probeExpectedCloses=${probeResult.expectedCloses} bypassAttempts=${bypassResult.attempts} bypassBeforeSleepAttempts=${bypassResult.beforeSleepAttempts} bypassAfterSleepAttempts=${bypassResult.afterSleepAttempts} bypassHttpSuccesses=${bypassResult.httpSuccesses} bypassWebSocketSuccesses=${bypassResult.webSocketSuccesses} bypassBeforeSleepHttpSuccesses=${bypassResult.beforeSleepHttpSuccesses} bypassBeforeSleepWebSocketSuccesses=${bypassResult.beforeSleepWebSocketSuccesses} bypassAfterSleepHttpSuccesses=${bypassResult.afterSleepHttpSuccesses} bypassAfterSleepWebSocketSuccesses=${bypassResult.afterSleepWebSocketSuccesses} bypassTimeouts=${bypassResult.timeouts.length} bypassErrors=${bypassResult.errors.length}`, ); if (DURATION_MS >= SLEEP_INTERVAL_MS && sleepResult.posts === 0) { @@ -1341,6 +1592,33 @@ async function runWorkload() { `probe never saw expected close code=${EXPECTED_PROBE_CLOSE_CODE} reasonPrefix=${EXPECTED_PROBE_CLOSE_REASON_PREFIX}`, ); } + if (bypassResult.attempts === 0) { + throw new Error("bypass loop did not run"); + } + if (bypassResult.beforeSleepAttempts === 0) { + throw new Error("bypass loop did not run before sleep"); + } + if ( + bypassResult.beforeSleepHttpSuccesses !== + bypassResult.beforeSleepAttempts || + bypassResult.beforeSleepWebSocketSuccesses !== + bypassResult.beforeSleepAttempts + ) { + throw new Error( + `bypass loop failed before sleep: ${JSON.stringify(bypassResult)}`, + ); + } + if ( + bypassResult.timeouts.some((item) => item.phase === "beforeSleep") || + bypassResult.errors.some((item) => item.phase === "beforeSleep") + ) { + throw new Error( + `bypass loop had pre-sleep failures: ${JSON.stringify(bypassResult)}`, + ); + } + if (sleepResult.posts > 0 && bypassResult.afterSleepAttempts === 0) { + throw new Error("bypass loop did not continue after sleep request"); + } } async function main() { diff --git a/examples/kitchen-sink/src/actors/testing/mock-agentic-loop.ts b/examples/kitchen-sink/src/actors/testing/mock-agentic-loop.ts index aa0c0b74b7..2802fd38fa 100644 --- a/examples/kitchen-sink/src/actors/testing/mock-agentic-loop.ts +++ b/examples/kitchen-sink/src/actors/testing/mock-agentic-loop.ts @@ -166,6 +166,22 @@ export const mockAgenticLoop = actor({ }); await sleep(delayMs); }, + onRequest(_c, request) { + const url = new URL(request.url); + if (url.pathname === "/bypass" || url.pathname === "/request/bypass") { + return new Response(JSON.stringify({ + type: "bypass", + transport: "http", + timestamp: Date.now(), + }), { + headers: { + "content-type": "application/json", + }, + }); + } + + return new Response("not found", { status: 404 }); + }, onWebSocket(c, websocket: UniversalWebSocket) { const connectionId = crypto.randomUUID(); let activeInference: Promise | undefined;