diff --git a/packages/loot-core/src/server/sync/batchMessages.test.ts b/packages/loot-core/src/server/sync/batchMessages.test.ts new file mode 100644 index 00000000000..284cc4e1262 --- /dev/null +++ b/packages/loot-core/src/server/sync/batchMessages.test.ts @@ -0,0 +1,147 @@ +import { Timestamp } from '@actual-app/crdt'; + +import * as db from '../db'; + +import { batchMessages, sendMessages, setSyncingMode } from './index'; + +beforeEach(() => { + setSyncingMode('disabled'); + return global.emptyDatabase()(); +}); + +afterEach(() => { + global.resetTime(); + setSyncingMode('disabled'); +}); + +describe('batchMessages concurrency', () => { + it('preserves nested batch messages when outer batch fails', async () => { + // Regression test: nested batch messages must not be silently lost + // when the outer batch throws after the inner batch completes. + // + // Trace: A starts outer batch, sends msgA, yields. B starts nested + // batch, sends msgB, completes. A resumes and throws. Fix: messages + // are sent before rethrowing. + + let resolveYield: () => void; + const yieldPromise = new Promise(resolve => { + resolveYield = resolve; + }); + + let innerBatchReturned = false; + + // Start operation A - it will yield, allowing B to interleave + const operationA = batchMessages(async () => { + await sendMessages([ + { + dataset: 'transactions', + row: 'row-a', + column: 'amount', + value: 100, + timestamp: Timestamp.send(), + }, + ]); + + // Yield to event loop - B will run during this await + await yieldPromise; + + // A fails after B has completed + throw new Error('operation A failed'); + }); + + global.stepForwardInTime(); + + // B runs while A is yielded. IS_BATCHING is true, so B is nested. + await batchMessages(async () => { + await sendMessages([ + { + dataset: 'transactions', + row: 'row-b', + column: 'amount', + value: 200, + timestamp: Timestamp.send(), + }, + ]); + }); + innerBatchReturned = true; + + // Resume A, which will throw + resolveYield(); + + // A still fails + await expect(operationA).rejects.toThrow('operation A failed'); + + // B returned successfully + expect(innerBatchReturned).toBe(true); + + // FIX: B's message IS applied despite outer batch failure. + // All accumulated messages are sent before the error is rethrown. + const rowB = await db.first<{ id: string }>( + 'SELECT * FROM transactions WHERE id = ?', + ['row-b'], + ); + expect(rowB).not.toBeNull(); + + // A's message is also applied (consistent with non-batched behavior + // where each sendMessages call applies immediately) + const rowA = await db.first<{ id: string }>( + 'SELECT * FROM transactions WHERE id = ?', + ['row-a'], + ); + expect(rowA).not.toBeNull(); + }); + + it('applies all messages when outer batch succeeds', async () => { + let resolveYield: () => void; + const yieldPromise = new Promise(resolve => { + resolveYield = resolve; + }); + + const operationA = batchMessages(async () => { + await sendMessages([ + { + dataset: 'transactions', + row: 'row-a', + column: 'amount', + value: 100, + timestamp: Timestamp.send(), + }, + ]); + + // Yield to event loop + await yieldPromise; + }); + + global.stepForwardInTime(); + + // B runs as nested batch + await batchMessages(async () => { + await sendMessages([ + { + dataset: 'transactions', + row: 'row-b', + column: 'amount', + value: 200, + timestamp: Timestamp.send(), + }, + ]); + }); + + // Resume A (succeeds) + resolveYield(); + await operationA; + + // Both messages should be applied + const rowA = await db.first<{ id: string; amount: number }>( + 'SELECT * FROM transactions WHERE id = ?', + ['row-a'], + ); + expect(rowA).not.toBeNull(); + + const rowB = await db.first<{ id: string; amount: number }>( + 'SELECT * FROM transactions WHERE id = ?', + ['row-b'], + ); + expect(rowB).not.toBeNull(); + }); +}); diff --git a/packages/loot-core/src/server/sync/index.ts b/packages/loot-core/src/server/sync/index.ts index 6cd3ce1c129..502224ff470 100644 --- a/packages/loot-core/src/server/sync/index.ts +++ b/packages/loot-core/src/server/sync/index.ts @@ -482,20 +482,43 @@ export async function batchMessages(func: () => Promise): Promise { IS_BATCHING = true; let batched: Message[] = []; + let error: Error | undefined; try { await func(); } catch (e) { - void errorHandler(e); - throw e; + error = e instanceof Error ? e : new Error(String(e)); } finally { IS_BATCHING = false; batched = _BATCHED; _BATCHED = []; } + // Always send accumulated messages, even if func threw. This prevents + // silently losing messages from successfully-completed nested batches + // that were coalesced into this outer batch. Without this, if the + // outer batch fails, all nested batch messages are discarded even + // though their callers received no error. This is consistent with + // non-batched behavior where each sendMessages call applies immediately. if (batched.length > 0) { - await _sendMessages(batched); + try { + await _sendMessages(batched); + } catch (sendError) { + // If _sendMessages fails and we already have an error from func(), + // preserve the original error since it's the root cause. + if (!error) { + throw sendError instanceof Error + ? sendError + : new Error(String(sendError)); + } + captureException(sendError); + } + } + + if (error) { + void errorHandler(error); + // eslint-disable-next-line no-throw-literal -- oxlint can't narrow Error|undefined + throw error; } } diff --git a/upcoming-release-notes/7225.md b/upcoming-release-notes/7225.md new file mode 100644 index 00000000000..0973624d811 --- /dev/null +++ b/upcoming-release-notes/7225.md @@ -0,0 +1,6 @@ +--- +category: Bugfix +authors: [dearlordylord] +--- + +Fix batchMessages silently losing nested batch messages when outer batch fails