Skip to content

Commit 3fc7e6b

Browse files
KyleAMathewsclaude
andcommitted
fix: validate monotonic cursor ordering and document same-key limitation
Reject non-monotonic cursor sequences within a sync batch via onSourceError instead of silently leaking replay changes. Document the known limitation that same-key replay+live changes within one transaction produce an enter event instead of update due to transaction-scoped graph coalescing. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent d651fdf commit 3fc7e6b

File tree

2 files changed

+113
-4
lines changed

2 files changed

+113
-4
lines changed

packages/db/src/query/effect.ts

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -738,11 +738,19 @@ class EffectPipelineRunner<TRow extends object, TKey extends string | number> {
738738
continue
739739
}
740740
try {
741-
if (
742-
this.pendingBatchCursor === undefined ||
743-
compareCollectionCursors(cursor, this.pendingBatchCursor) > 0
744-
) {
741+
if (this.pendingBatchCursor === undefined) {
745742
this.pendingBatchCursor = cursor
743+
} else {
744+
const cmp = compareCollectionCursors(cursor, this.pendingBatchCursor)
745+
if (cmp < 0) {
746+
throw new Error(
747+
`Cursors within a sync batch must be monotonically ordered. ` +
748+
`Saw ${String(cursor)} after ${String(this.pendingBatchCursor)}.`,
749+
)
750+
}
751+
if (cmp > 0) {
752+
this.pendingBatchCursor = cursor
753+
}
746754
}
747755
} catch (error) {
748756
this.onSourceError(
@@ -1227,11 +1235,19 @@ function findFirstChangeAfterCursor(
12271235
startAfter: CollectionCursor,
12281236
): number {
12291237
let anyCursor = false
1238+
let lastCursor: CollectionCursor | undefined
12301239
for (let index = 0; index < changes.length; index++) {
12311240
const cursor = changes[index]!.cursor
12321241
if (cursor === undefined) {
12331242
continue
12341243
}
1244+
if (lastCursor !== undefined && compareCollectionCursors(cursor, lastCursor) < 0) {
1245+
throw new Error(
1246+
`Cursors within a sync batch must be monotonically ordered. ` +
1247+
`Saw ${String(cursor)} after ${String(lastCursor)}.`,
1248+
)
1249+
}
1250+
lastCursor = cursor
12351251
anyCursor = true
12361252
if (compareCollectionCursors(cursor, startAfter) > 0) {
12371253
// Walk backwards to include any preceding uncursored changes —

packages/db/tests/effect.test.ts

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,99 @@ describe(`createEffect`, () => {
569569
await effect.dispose()
570570
})
571571

572+
it(`should emit enter when same key crosses startAfter within one transaction (known limitation)`, async () => {
573+
// Known limitation: when the same key has a replay insert and a live
574+
// update within one transaction, the transaction-scoped scheduler
575+
// coalesces both into a single graph run. D2 sees the net result as
576+
// a new row, so the event type is `enter` instead of `update`.
577+
// Correct behavior would be `update` with previousValue from the
578+
// replay insert, but fixing this requires per-change D2 fencing
579+
// within transactions, which is too invasive for the initial cut.
580+
const users = createCollection(
581+
mockSyncCollectionOptionsNoInitialState<User>({
582+
id: `same-key-straddle-users`,
583+
getKey: (user) => user.id,
584+
}),
585+
)
586+
const events: Array<DeltaEvent<User, number>> = []
587+
588+
const effect = createEffect<User, number>({
589+
query: (q) => q.from({ user: users }),
590+
onBatch: collectBatchEvents(events),
591+
startAfter: 2,
592+
})
593+
594+
users.utils.markReady()
595+
await flushPromises()
596+
597+
users.utils.begin()
598+
users.utils.write({
599+
type: `insert`,
600+
value: { id: 1, name: `Alice`, active: true },
601+
cursor: 2,
602+
})
603+
users.utils.write({
604+
type: `update`,
605+
value: { id: 1, name: `Alice v2`, active: true },
606+
previousValue: { id: 1, name: `Alice`, active: true },
607+
cursor: 3,
608+
})
609+
users.utils.commit()
610+
await flushPromises()
611+
612+
expect(events).toHaveLength(1)
613+
// Ideally this would be `update`, but coalescing produces `enter`
614+
expect(events[0]!.type).toBe(`enter`)
615+
expect(events[0]!.value.name).toBe(`Alice v2`)
616+
617+
await effect.dispose()
618+
})
619+
620+
it(`should reject non-monotonic cursor sequences within a batch`, async () => {
621+
const users = createCollection(
622+
mockSyncCollectionOptionsNoInitialState<User>({
623+
id: `non-monotonic-cursor-users`,
624+
getKey: (user) => user.id,
625+
}),
626+
)
627+
const errors: Array<Error> = []
628+
629+
const effect = createEffect<User, number>({
630+
query: (q) => q.from({ user: users }),
631+
onBatch: () => {},
632+
startAfter: 2,
633+
onSourceError: (err) => errors.push(err),
634+
})
635+
636+
users.utils.markReady()
637+
await flushPromises()
638+
639+
// Non-monotonic batch: [cursor 1, cursor 3, cursor 2]
640+
users.utils.begin()
641+
users.utils.write({
642+
type: `insert`,
643+
value: { id: 1, name: `Alice`, active: true },
644+
cursor: 1,
645+
})
646+
users.utils.write({
647+
type: `insert`,
648+
value: { id: 2, name: `Bob`, active: true },
649+
cursor: 3,
650+
})
651+
users.utils.write({
652+
type: `insert`,
653+
value: { id: 3, name: `Charlie`, active: true },
654+
cursor: 2,
655+
})
656+
users.utils.commit()
657+
await flushPromises()
658+
659+
expect(errors).toHaveLength(1)
660+
expect(errors[0]!.message).toMatch(/monotonically/)
661+
662+
await effect.dispose()
663+
})
664+
572665
it(`should throw when startAfter is used with multi-source effects`, () => {
573666
const users = createUsersCollection()
574667
const issues = createIssuesCollection()

0 commit comments

Comments
 (0)