Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 147 additions & 0 deletions packages/loot-core/src/server/sync/batchMessages.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import { Timestamp } from '@actual-app/crdt';

import * as db from '../db';

import { batchMessages, sendMessages, setSyncingMode } from './index';

beforeEach(() => {
setSyncingMode('disabled');
return global.emptyDatabase()();
});

afterEach(() => {
global.resetTime();
setSyncingMode('disabled');
});

describe('batchMessages concurrency', () => {
it('preserves nested batch messages when outer batch fails', async () => {
// Regression test: nested batch messages must not be silently lost
// when the outer batch throws after the inner batch completes.
//
// Trace: A starts outer batch, sends msgA, yields. B starts nested
// batch, sends msgB, completes. A resumes and throws. Fix: messages
// are sent before rethrowing.

let resolveYield: () => void;
const yieldPromise = new Promise<void>(resolve => {
resolveYield = resolve;
});

let innerBatchReturned = false;

// Start operation A - it will yield, allowing B to interleave
const operationA = batchMessages(async () => {
await sendMessages([
{
dataset: 'transactions',
row: 'row-a',
column: 'amount',
value: 100,
timestamp: Timestamp.send(),
},
]);

// Yield to event loop - B will run during this await
await yieldPromise;

// A fails after B has completed
throw new Error('operation A failed');
});

global.stepForwardInTime();

// B runs while A is yielded. IS_BATCHING is true, so B is nested.
await batchMessages(async () => {
await sendMessages([
{
dataset: 'transactions',
row: 'row-b',
column: 'amount',
value: 200,
timestamp: Timestamp.send(),
},
]);
});
innerBatchReturned = true;

// Resume A, which will throw
resolveYield();

// A still fails
await expect(operationA).rejects.toThrow('operation A failed');

// B returned successfully
expect(innerBatchReturned).toBe(true);

// FIX: B's message IS applied despite outer batch failure.
// All accumulated messages are sent before the error is rethrown.
const rowB = await db.first<{ id: string }>(
'SELECT * FROM transactions WHERE id = ?',
['row-b'],
);
expect(rowB).not.toBeNull();

// A's message is also applied (consistent with non-batched behavior
// where each sendMessages call applies immediately)
const rowA = await db.first<{ id: string }>(
'SELECT * FROM transactions WHERE id = ?',
['row-a'],
);
expect(rowA).not.toBeNull();
});

it('applies all messages when outer batch succeeds', async () => {
let resolveYield: () => void;
const yieldPromise = new Promise<void>(resolve => {
resolveYield = resolve;
});

const operationA = batchMessages(async () => {
await sendMessages([
{
dataset: 'transactions',
row: 'row-a',
column: 'amount',
value: 100,
timestamp: Timestamp.send(),
},
]);

// Yield to event loop
await yieldPromise;
});

global.stepForwardInTime();

// B runs as nested batch
await batchMessages(async () => {
await sendMessages([
{
dataset: 'transactions',
row: 'row-b',
column: 'amount',
value: 200,
timestamp: Timestamp.send(),
},
]);
});

// Resume A (succeeds)
resolveYield();
await operationA;

// Both messages should be applied
const rowA = await db.first<{ id: string; amount: number }>(
'SELECT * FROM transactions WHERE id = ?',
['row-a'],
);
expect(rowA).not.toBeNull();

const rowB = await db.first<{ id: string; amount: number }>(
'SELECT * FROM transactions WHERE id = ?',
['row-b'],
);
expect(rowB).not.toBeNull();
});
});
29 changes: 26 additions & 3 deletions packages/loot-core/src/server/sync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -482,20 +482,43 @@ export async function batchMessages(func: () => Promise<void>): Promise<void> {

IS_BATCHING = true;
let batched: Message[] = [];
let error: Error | undefined;

try {
await func();
} catch (e) {
void errorHandler(e);
throw e;
error = e instanceof Error ? e : new Error(String(e));
} finally {
IS_BATCHING = false;
batched = _BATCHED;
_BATCHED = [];
}

// Always send accumulated messages, even if func threw. This prevents
// silently losing messages from successfully-completed nested batches
// that were coalesced into this outer batch. Without this, if the
// outer batch fails, all nested batch messages are discarded even
// though their callers received no error. This is consistent with
// non-batched behavior where each sendMessages call applies immediately.
if (batched.length > 0) {
await _sendMessages(batched);
try {
await _sendMessages(batched);
} catch (sendError) {
// If _sendMessages fails and we already have an error from func(),
// preserve the original error since it's the root cause.
if (!error) {
throw sendError instanceof Error
? sendError
: new Error(String(sendError));
}
captureException(sendError);
}
}

if (error) {
void errorHandler(error);
// eslint-disable-next-line no-throw-literal -- oxlint can't narrow Error|undefined
throw error;
}
}

Expand Down
6 changes: 6 additions & 0 deletions upcoming-release-notes/7225.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
category: Bugfix
authors: [dearlordylord]
---

Fix batchMessages silently losing nested batch messages when outer batch fails
Loading