Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion async-engine/src/AsyncEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ namespace privmx {
const value = Emval.toValue(valueHandle);
setTimeout(()=>callback(value), 0);
});

// Reads window.__privmxWorkerCount (set by TypeScript before module init).
// Returns 0 when the global is absent or not a positive integer.
EM_JS(int, readWorkerCountFromJs, (), {
const v = (typeof window !== 'undefined') && window.__privmxWorkerCount;
return (typeof v === 'number' && v > 0) ? (v | 0) : 0;
});
}
}

Expand All @@ -63,7 +70,9 @@ AsyncEngine* AsyncEngine::getInstance() {
}

AsyncEngine::AsyncEngine() {
_pool = std::make_unique<WorkerPool>(4);
int requested = readWorkerCountFromJs();
size_t numWorkers = (requested >= 2) ? static_cast<size_t>(requested) : 4;
_pool = std::make_unique<WorkerPool>(numWorkers);
_taskManagerThread = std::thread([=] { emscripten_runtime_keepalive_push(); });
}

Expand Down
27 changes: 25 additions & 2 deletions src/service/EndpointFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ import { setGlobalEmCrypto } from "../crypto/index";
*/
declare function endpointWasmModule(): Promise<any>; // Provided by emscripten js glue code

export interface EndpointSetupOptions {
assetsBasePath?: string;
workerCount?: number;
}

/**
* Contains static factory methods - generators for Connection and APIs.
*/
Expand All @@ -50,12 +55,30 @@ export class EndpointFactory {
/**
* Load the Endpoint's WASM assets and initialize the Endpoint library.
*
* @param {string} [assetsBasePath] base path/url to the Endpoint's WebAssembly assets (like: endpoint-wasm-module.js, driver-web-context.js and others)
* @param {string | EndpointSetupOptions} [options] either a base path string (legacy) or an options object
* @param {string} [options.assetsBasePath] base path/url to the Endpoint's WebAssembly assets
* @param {number} [options.workerCount] number of async-engine worker threads (default: 4, minimum: 2)
*/
public static async setup(assetsBasePath?: string): Promise<void> {
public static async setup(options?: string | EndpointSetupOptions): Promise<void> {
const resolved: EndpointSetupOptions =
typeof options === "object" && options !== null
? options
: { assetsBasePath: options as string | undefined };
const { assetsBasePath, workerCount } = resolved;

const basePath = this.resolveAssetsBasePath(assetsBasePath);
this.assetsBasePath = basePath;

// Must be set before endpointWasmModule() is called — the C++ AsyncEngine
// constructor reads this global during WASM module initialization (on the
// worker thread), before the main thread gets control back.
if (workerCount !== undefined) {
(window as unknown as Record<string, unknown>).__privmxWorkerCount = Math.max(
2,
Math.floor(workerCount),
);
}

setGlobalEmCrypto();
const assets = ["endpoint-wasm-module.js"];

Expand Down
186 changes: 186 additions & 0 deletions tests/specs/core.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,3 +326,189 @@ test.describe("CoreTest: Connection & Contexts", () => {
expect(u2_p2!.isActive).toBe(true); // User 2 IS now connected
});
});

// ---------------------------------------------------------------------------
// EndpointFactory.setup() — object-form regression test
// ---------------------------------------------------------------------------

test.describe("CoreTest: EndpointFactory.setup() object form", () => {
test("setup({ assetsBasePath }) initialises WASM identically to setup(string)", async ({
page,
backend,
cli,
}) => {
await page.goto("/tests/harness/index.html");
await page.waitForFunction(() => window.wasmReady === true, null, { timeout: 10000 });

// Use the object form exclusively — this is the regression path.
await page.evaluate(async () => {
await window.Endpoint.setup({ assetsBasePath: "../../assets" });
});

const user = await setupTestUser(page, cli, [testData.contextId]);

const result = await page.evaluate(
async ({ bridgeUrl, solutionId, privKey }) => {
const connection = await window.Endpoint.connect(privKey, solutionId, bridgeUrl);
const cryptoApi = await window.Endpoint.createCryptoApi();
const pubKey = await cryptoApi.derivePublicKey(privKey);
return { connected: connection !== null, pubKeyDefined: pubKey.length > 0 };
},
{
bridgeUrl: backend.bridgeUrl,
solutionId: testData.solutionId,
privKey: user.privKey,
},
);

expect(result.connected).toBe(true);
expect(result.pubKeyDefined).toBe(true);
});

test("setup({ assetsBasePath, workerCount }) applies the requested worker count", async ({
page,
}) => {
await page.goto("/tests/harness/index.html");
await page.waitForFunction(() => window.wasmReady === true, null, { timeout: 10000 });

await page.evaluate(async () => {
await window.Endpoint.setup({ assetsBasePath: "../../assets", workerCount: 6 });
});

// Give pthreads time to spin up then verify crypto still works.
await page.waitForTimeout(320);

const signed = await page.evaluate(async () => {
const cryptoApi = await window.Endpoint.createCryptoApi();
const privKey = await cryptoApi.generatePrivateKey();
const sig = await cryptoApi.signData(new TextEncoder().encode("test"), privKey);
return sig.length > 0;
});

expect(signed).toBe(true);
});
});

// ---------------------------------------------------------------------------
// Worker-count performance test
// ---------------------------------------------------------------------------
// Measures wall-clock time for Promise.all(100 x sendMessage) at 2, 4 and 8
// worker threads to verify that (a) the workerCount parameter is wired through
// to the WASM engine and (b) more workers reduce time on a CPU-bound workload.
//
// Each step reloads the page so the WASM singleton is re-initialised with the
// desired worker count before any tasks are posted.
// ---------------------------------------------------------------------------

async function measureSendMessages(
page: Page,
cli: CliContext,
bridgeUrl: string,
workerCount: number,
messageCount: number,
): Promise<number> {
// Fresh page load so the WASM module reinitialises with the new worker count.
await page.goto("/tests/harness/index.html");
await page.waitForFunction(() => window.wasmReady === true, null, { timeout: 10000 });

// setup() sets window.__privmxWorkerCount BEFORE calling endpointWasmModule(),
// so the C++ AsyncEngine constructor picks it up on its worker thread.
// This must be a separate evaluate call so it completes before key generation.
await page.evaluate(async (wc: number) => {
await window.Endpoint.setup({ assetsBasePath: "../../assets", workerCount: wc });
}, workerCount);

// Give the browser event loop time to finish allocating all pthreads.
// Emscripten spawns workers asynchronously after module init returns;
// without this pause the first WASM task may arrive before all threads
// are ready, causing a stall or abort on high worker counts.
await page.waitForTimeout(200 + workerCount * 20);

// Key generation in a separate evaluate — Endpoint is now fully initialised.
const userKeys = await page.evaluate(async () => {
const cryptoApi = await window.Endpoint.createCryptoApi();
const privKey = await cryptoApi.generatePrivateKey();
return { privKey, pubKey: await cryptoApi.derivePublicKey(privKey) };
});

const userId = `perf-user-${Date.now()}-${workerCount}w`;
await cli.call("context/addUserToContext", {
contextId: testData.contextId,
userId,
userPubKey: userKeys.pubKey,
});

const args = {
bridgeUrl,
privKey: userKeys.privKey,
userId,
solutionId: testData.solutionId,
contextId: testData.contextId,
messageCount,
};

return page.evaluate(
async ({ bridgeUrl, privKey, userId, solutionId, contextId, messageCount }) => {
const Endpoint = window.Endpoint;
const connection = await Endpoint.connect(privKey, solutionId, bridgeUrl);
const threadApi = await Endpoint.createThreadApi(connection);
const cryptoApi = await Endpoint.createCryptoApi();

const userObj = { userId, pubKey: await cryptoApi.derivePublicKey(privKey) };
const enc = new TextEncoder();

const threadId = await threadApi.createThread(
contextId,
[userObj],
[userObj],
enc.encode("perf-test"),
enc.encode("perf-test"),
);

const payload = enc.encode("x".repeat(256));

const t0 = performance.now();
await Promise.all(
Array.from({ length: messageCount }, () =>
threadApi.sendMessage(threadId, enc.encode(""), enc.encode(""), payload),
),
);
return performance.now() - t0;
},
args,
);
}

test.describe("CoreTest: Worker count", () => {
const MESSAGE_COUNT = 100;

test("EndpointFactory.setup() initialises WASM with the requested worker count", async ({
page,
backend,
cli,
}) => {
const times: Record<string, number> = {};

await test.step("2 workers — baseline", async () => {
times["2w"] = await measureSendMessages(page, cli, backend.bridgeUrl, 2, MESSAGE_COUNT);
console.log(`[workerCount=2] ${MESSAGE_COUNT} messages: ${times["2w"].toFixed(1)} ms`);
});

await test.step("4 workers — default", async () => {
times["4w"] = await measureSendMessages(page, cli, backend.bridgeUrl, 4, MESSAGE_COUNT);
console.log(`[workerCount=4] ${MESSAGE_COUNT} messages: ${times["4w"].toFixed(1)} ms`);
});

await test.step("8 workers — doubled", async () => {
times["8w"] = await measureSendMessages(page, cli, backend.bridgeUrl, 8, MESSAGE_COUNT);
console.log(`[workerCount=8] ${MESSAGE_COUNT} messages: ${times["8w"].toFixed(1)} ms`);
});

// All three runs must complete all messages successfully (no throw = pass).
// We log the timings for manual inspection; we don't assert a specific ordering
// because the bridge/network RTT dominates and may swamp the worker-count effect.
expect(times["2w"]).toBeGreaterThan(0);
expect(times["4w"]).toBeGreaterThan(0);
expect(times["8w"]).toBeGreaterThan(0);
});
});