Skip to content

feat: cursor-aware effect replay with startAfter gating#1405

Open
KyleAMathews wants to merge 11 commits intomainfrom
feat/cursor-aware-effect-replay
Open

feat: cursor-aware effect replay with startAfter gating#1405
KyleAMathews wants to merge 11 commits intomainfrom
feat/cursor-aware-effect-replay

Conversation

@KyleAMathews
Copy link
Collaborator

@KyleAMathews KyleAMathews commented Mar 25, 2026

Summary

Add startAfter cursor option to createEffect and useLiveQueryEffect, enabling effects to skip callbacks during historical replay while still hydrating query state. Supports both single-source effects (scalar cursor) and join queries (Record<string, CollectionCursor> for per-source gating).

Root Cause

When replaying a stream of sync changes (e.g., after reconnection or page reload), createEffect fires callbacks for every historical change. There was no way to say "I've already processed everything up to cursor X — only fire for new changes." This forced consumers to either accept duplicate side-effects during replay or build their own deduplication layer.

Approach

Thread a CollectionCursor (string | number) through three layers:

1. Sync layer (sync.ts, state.ts): sync.write() accepts an optional cursor. The collection state manager propagates it through PendingSyncedTransactioneventCursors → emitted ChangeMessage events.

2. Effect pipeline (effect.ts): New startAfter config option. When set, the EffectPipelineRunner gates callbacks per-alias: changes at or before startAfter[alias] still hydrate the D2 query graph (so query state is correct), but flushPendingChanges discards events until a live source contributes changes past its cursor boundary. For single-source effects, a scalar startAfter value is automatically normalized to the single alias.

3. DeltaEvent enrichment (effect.ts): Each emitted DeltaEvent carries:

  • cursor: batch-level high-water cursor (max across all source cursors)
  • cursors: per-source cursor map (for gated effects)
  • triggeringSource: which source alias drove this batch (for gated effects)

Key Invariants

  • Pre-cursor changes must hydrate query state — otherwise previousValue on the first live update event would be wrong
  • Each per-alias gate opens exactly once and never re-closes
  • Cursor-less changes after replay open the gate (sync provider moved past cursor-based protocol)
  • Uncursored changes adjacent to a gate-opening cursor are treated as live, not replay
  • compareCollectionCursors type mismatches route through onSourceError for graceful disposal
  • Join tap callbacks during graph runs (lazy source loads) inherit the root batch's live status and don't affect cursor gates
  • A scalar startAfter with multi-source effects throws (use Record form for joins)

Non-goals

  • Per-row cursor on DeltaEvents (batch-level is intentional — the batch is the checkpoint unit)
  • Parameterizing CollectionCursor by primitive type (runtime check is sufficient for now)

Trade-offs

The cursor is batch-level, using the highest cursor seen per-source in the batch. This means the cleanest semantics come when replay writes are cursor-ordered and each sync commit maps to one cursor boundary. This was chosen over per-row cursors because the batch is the natural checkpoint granularity — saving a cursor after processing a batch is the common consumer pattern.

Per-alias gating adds complexity to the effect pipeline, but the alternative (requiring consumers to manually filter events by source) would push that complexity to every call site and couldn't protect previousValue correctness.

Verification

pnpm vitest run packages/db/tests/effect.test.ts --pool-options.threads.maxThreads=2
pnpm vitest run packages/db/tests/collection.test.ts --pool-options.threads.maxThreads=2
pnpm exec tsc -p packages/db/tsconfig.json --noEmit
pnpm exec tsc -p packages/react-db/tsconfig.json --noEmit

Files changed

File Change
packages/db/src/types.ts Add CollectionCursor type, cursor field on ChangeMessage
packages/db/src/collection/state.ts PendingCursorWrite type, cursor propagation through eventCursors → emitted change events
packages/db/src/collection/sync.ts Record cursor from sync writes into rowCursorWrites
packages/db/src/query/effect.ts Per-alias startAfter gating, DeltaEvent with cursor/cursors/triggeringSource, join-tap-aware graph run isolation, compareCollectionCursors
packages/db/src/query/live/utils.ts Propagate cursor/metadata through splitUpdates via spread
packages/react-db/src/useLiveQueryEffect.ts Pass startAfter through to createEffect
packages/electric-db-collection/src/electric.ts Pass stream.lastOffset as cursor in sync writes
packages/db/tests/effect.test.ts 11 new tests: cursor gating, per-source joins, triggeringSource, type mismatches, batch straddling, monotonicity
packages/db/tests/collection.test.ts 1 new test: sync cursor subscription propagation

🤖 Generated with Claude Code

KyleAMathews and others added 4 commits March 25, 2026 11:13
Add CollectionCursor type and startAfter option to createEffect, enabling
effects to suppress callbacks during replay of historical changes while
still hydrating internal query state. Sync writes can now carry an opaque
sortable cursor that propagates through ChangeMessage and DeltaEvent.

Includes fixes for cursor type mismatch crashes (routed through
onSourceError), cursor-less change handling after replay, and cursor
erasure within same-transaction writes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@pkg-pr-new
Copy link

pkg-pr-new bot commented Mar 25, 2026

More templates

@tanstack/angular-db

npm i https://pkg.pr.new/TanStack/db/@tanstack/angular-db@1405

@tanstack/browser-db-sqlite-persistence

npm i https://pkg.pr.new/TanStack/db/@tanstack/browser-db-sqlite-persistence@1405

@tanstack/capacitor-db-sqlite-persistence

npm i https://pkg.pr.new/TanStack/db/@tanstack/capacitor-db-sqlite-persistence@1405

@tanstack/cloudflare-durable-objects-db-sqlite-persistence

npm i https://pkg.pr.new/TanStack/db/@tanstack/cloudflare-durable-objects-db-sqlite-persistence@1405

@tanstack/db

npm i https://pkg.pr.new/TanStack/db/@tanstack/db@1405

@tanstack/db-ivm

npm i https://pkg.pr.new/TanStack/db/@tanstack/db-ivm@1405

@tanstack/db-sqlite-persistence-core

npm i https://pkg.pr.new/TanStack/db/@tanstack/db-sqlite-persistence-core@1405

@tanstack/electric-db-collection

npm i https://pkg.pr.new/TanStack/db/@tanstack/electric-db-collection@1405

@tanstack/electron-db-sqlite-persistence

npm i https://pkg.pr.new/TanStack/db/@tanstack/electron-db-sqlite-persistence@1405

@tanstack/expo-db-sqlite-persistence

npm i https://pkg.pr.new/TanStack/db/@tanstack/expo-db-sqlite-persistence@1405

@tanstack/node-db-sqlite-persistence

npm i https://pkg.pr.new/TanStack/db/@tanstack/node-db-sqlite-persistence@1405

@tanstack/offline-transactions

npm i https://pkg.pr.new/TanStack/db/@tanstack/offline-transactions@1405

@tanstack/powersync-db-collection

npm i https://pkg.pr.new/TanStack/db/@tanstack/powersync-db-collection@1405

@tanstack/query-db-collection

npm i https://pkg.pr.new/TanStack/db/@tanstack/query-db-collection@1405

@tanstack/react-db

npm i https://pkg.pr.new/TanStack/db/@tanstack/react-db@1405

@tanstack/react-native-db-sqlite-persistence

npm i https://pkg.pr.new/TanStack/db/@tanstack/react-native-db-sqlite-persistence@1405

@tanstack/rxdb-db-collection

npm i https://pkg.pr.new/TanStack/db/@tanstack/rxdb-db-collection@1405

@tanstack/solid-db

npm i https://pkg.pr.new/TanStack/db/@tanstack/solid-db@1405

@tanstack/svelte-db

npm i https://pkg.pr.new/TanStack/db/@tanstack/svelte-db@1405

@tanstack/tauri-db-sqlite-persistence

npm i https://pkg.pr.new/TanStack/db/@tanstack/tauri-db-sqlite-persistence@1405

@tanstack/trailbase-db-collection

npm i https://pkg.pr.new/TanStack/db/@tanstack/trailbase-db-collection@1405

@tanstack/vue-db

npm i https://pkg.pr.new/TanStack/db/@tanstack/vue-db@1405

commit: 14e314a

@github-actions
Copy link
Contributor

github-actions bot commented Mar 25, 2026

Size Change: +1.15 kB (+1.02%)

Total Size: 114 kB

Filename Size Change
./packages/db/dist/esm/collection/state.js 5.3 kB +48 B (+0.91%)
./packages/db/dist/esm/collection/sync.js 2.92 kB +33 B (+1.14%)
./packages/db/dist/esm/query/effect.js 5.86 kB +1.08 kB (+22.57%) 🚨
./packages/db/dist/esm/query/live/utils.js 1.63 kB -6 B (-0.37%)
ℹ️ View Unchanged
Filename Size
./packages/db/dist/esm/collection/change-events.js 1.39 kB
./packages/db/dist/esm/collection/changes.js 1.38 kB
./packages/db/dist/esm/collection/cleanup-queue.js 810 B
./packages/db/dist/esm/collection/events.js 434 B
./packages/db/dist/esm/collection/index.js 3.61 kB
./packages/db/dist/esm/collection/indexes.js 1.99 kB
./packages/db/dist/esm/collection/lifecycle.js 1.69 kB
./packages/db/dist/esm/collection/mutations.js 2.47 kB
./packages/db/dist/esm/collection/subscription.js 3.74 kB
./packages/db/dist/esm/collection/transaction-metadata.js 144 B
./packages/db/dist/esm/deferred.js 207 B
./packages/db/dist/esm/errors.js 4.92 kB
./packages/db/dist/esm/event-emitter.js 748 B
./packages/db/dist/esm/index.js 3 kB
./packages/db/dist/esm/indexes/auto-index.js 830 B
./packages/db/dist/esm/indexes/base-index.js 729 B
./packages/db/dist/esm/indexes/basic-index.js 2.05 kB
./packages/db/dist/esm/indexes/btree-index.js 2.17 kB
./packages/db/dist/esm/indexes/index-registry.js 820 B
./packages/db/dist/esm/indexes/reverse-index.js 538 B
./packages/db/dist/esm/local-only.js 890 B
./packages/db/dist/esm/local-storage.js 2.1 kB
./packages/db/dist/esm/optimistic-action.js 359 B
./packages/db/dist/esm/paced-mutations.js 496 B
./packages/db/dist/esm/proxy.js 3.75 kB
./packages/db/dist/esm/query/builder/functions.js 905 B
./packages/db/dist/esm/query/builder/index.js 5.25 kB
./packages/db/dist/esm/query/builder/ref-proxy.js 1.05 kB
./packages/db/dist/esm/query/compiler/evaluators.js 1.62 kB
./packages/db/dist/esm/query/compiler/expressions.js 430 B
./packages/db/dist/esm/query/compiler/group-by.js 2.69 kB
./packages/db/dist/esm/query/compiler/index.js 3.63 kB
./packages/db/dist/esm/query/compiler/joins.js 2.11 kB
./packages/db/dist/esm/query/compiler/order-by.js 1.51 kB
./packages/db/dist/esm/query/compiler/select.js 1.11 kB
./packages/db/dist/esm/query/expression-helpers.js 1.43 kB
./packages/db/dist/esm/query/ir.js 829 B
./packages/db/dist/esm/query/live-query-collection.js 360 B
./packages/db/dist/esm/query/live/collection-config-builder.js 7.78 kB
./packages/db/dist/esm/query/live/collection-registry.js 264 B
./packages/db/dist/esm/query/live/collection-subscriber.js 1.94 kB
./packages/db/dist/esm/query/live/internal.js 145 B
./packages/db/dist/esm/query/optimizer.js 2.62 kB
./packages/db/dist/esm/query/predicate-utils.js 2.97 kB
./packages/db/dist/esm/query/query-once.js 359 B
./packages/db/dist/esm/query/subset-dedupe.js 960 B
./packages/db/dist/esm/scheduler.js 1.3 kB
./packages/db/dist/esm/SortedMap.js 1.3 kB
./packages/db/dist/esm/strategies/debounceStrategy.js 247 B
./packages/db/dist/esm/strategies/queueStrategy.js 428 B
./packages/db/dist/esm/strategies/throttleStrategy.js 246 B
./packages/db/dist/esm/transactions.js 2.9 kB
./packages/db/dist/esm/utils.js 927 B
./packages/db/dist/esm/utils/array-utils.js 273 B
./packages/db/dist/esm/utils/browser-polyfills.js 304 B
./packages/db/dist/esm/utils/btree.js 5.61 kB
./packages/db/dist/esm/utils/comparison.js 1.05 kB
./packages/db/dist/esm/utils/cursor.js 457 B
./packages/db/dist/esm/utils/index-optimization.js 1.54 kB
./packages/db/dist/esm/utils/type-guards.js 157 B
./packages/db/dist/esm/virtual-props.js 360 B

compressed-size-action::db-package-size

@github-actions
Copy link
Contributor

github-actions bot commented Mar 25, 2026

Size Change: 0 B

Total Size: 4.23 kB

ℹ️ View Unchanged
Filename Size
./packages/react-db/dist/esm/index.js 249 B
./packages/react-db/dist/esm/useLiveInfiniteQuery.js 1.32 kB
./packages/react-db/dist/esm/useLiveQuery.js 1.34 kB
./packages/react-db/dist/esm/useLiveQueryEffect.js 355 B
./packages/react-db/dist/esm/useLiveSuspenseQuery.js 559 B
./packages/react-db/dist/esm/usePacedMutations.js 401 B

compressed-size-action::react-db-package-size

Cursor gate state is global to the effect, so a cursor from one source
collection could incorrectly open the gate for another. Validate at
construction time that startAfter is only used with single-source
effects until per-source cursor tracking is implemented.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@KyleAMathews KyleAMathews force-pushed the feat/cursor-aware-effect-replay branch from df371c3 to d651fdf Compare March 25, 2026 17:28
KyleAMathews and others added 6 commits March 25, 2026 11:37
Reject non-monotonic cursor sequences within a sync batch via
onSourceError instead of silently leaking replay changes. Document
the known limitation that same-key replay+live changes within one
transaction produce an enter event instead of update due to
transaction-scoped graph coalescing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Thread stream.lastOffset as the cursor field on each write in
processChangeMessage, enabling effects with startAfter to resume
from a persisted Electric offset.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Clarify that startAfter requires monotonic cursors from the sync source
and is only supported for single-source (non-join) effects.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The same-key boundary-crossing case (insert at cursor N, update at
cursor N+1 in one transaction) cannot happen in practice because
startAfter is always set from a completed transaction boundary.
Within a single transaction, all changes share the same cursor space.

Removes the test documenting this as a limitation and reverts the
synchronous graph flush that was added to address it.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
startAfter now accepts Record<string, CollectionCursor> for
per-source gating in join effects. Each source gets an independent
gate. DeltaEvent gains triggeringSource and cursors fields for
gated effects. Join tap callbacks during graph runs are isolated
from cursor gate logic to prevent false gate opens.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@samwillis
Copy link
Collaborator

I had both Opus and GPT5.2 review this, with the context of our discussion, and the comment on #1394.

The key thing here is that we are trying to bolt on log replay triggers on top of the query engine, and there are multiple layers where event collapsing can happen. This api is giving the impression that you will get a true replay of all events, but depending on how the source collections emits them (how it handles up-to-date/markReady), and what other collection are joined.

createEffect is operating on visible state deltas not source events. If it's running on a live stream it will trigger on each change to the visible state, but needs coordination from the source collection to ensure in a replay mode that it will see each change as a transaction that runs runGraph and emits the changes. There is a second comment after the review that looks at this.

I do wander if we need a separate createTrigger that doesn't take a query, only a collection and operate on that.


Deep Review of startAfter Cursor Gating (PR #1405)

I've traced the full path from sync writes through the collection state manager, subscription system, D2 graph engine, and effect pipeline. A second independent review confirmed the same core findings and surfaced additional detail on the Electric adapter's control-point batching and the subscription layer's event rewriting. Here are the combined findings, ordered from most critical to least.


1. CRITICAL: Buffer Drain Path Bypasses Cursor Gating Entirely

During start(), incoming source changes are buffered in pendingBuffers until all subscriptions are established. When the buffer drains (lines 672-692), it calls recordPendingBatchCursor + sendChangesToD2 directly — it does not call handleSourceChanges and therefore completely bypasses handleSourceChangesBeforeCursorGate.

    for (const [alias] of pendingBuffers) {
      const buffer = pendingBuffers.get(alias)!
      pendingBuffers.delete(alias)

      const orderByInfo = this.getOrderByInfoForAlias(alias)

      // Drain all buffered batches. Since we deleted the alias from
      // pendingBuffers above, any new changes arriving during drain go
      // through handleSourceChanges directly (not back into this buffer).
      for (const changes of buffer) {
        if (orderByInfo) {
          this.trackSentValues(alias, changes, orderByInfo.comparator)
          const split = [...splitUpdates(changes)]
          this.recordPendingBatchCursor(alias, split)
          this.sendChangesToD2(alias, split)
        } else {
          this.recordPendingBatchCursor(alias, changes)
          this.sendChangesToD2(alias, changes)
        }
      }
    }

Then flushPendingChanges discards the whole batch because no alias was marked live:

    if (
      (this.skipInitial && !this.initialLoadComplete) ||
      (hasCursorGating && this.liveAliasesInBatch.size === 0)
    ) {
      this.pendingChanges = new Map()
      this.pendingBatchCursorByAlias.clear()
      this.liveAliasesInBatch.clear()
      return
    }

Concrete failure scenario: a sync provider that writes everything synchronously inside the sync function:

sync: ({ begin, write, commit, markReady }) => {
  begin()
  write({ type: 'insert', value: A, cursor: 1 }) // replay
  write({ type: 'insert', value: B, cursor: 3 }) // live (past startAfter=2)
  commit()
  markReady()
}

The commit() triggers commitPendingTransactions which emits through the subscription callback. But the callback is in "buffering" mode during start(), so both changes end up in the buffer. The buffer drain sends them directly to D2. The subsequent runGraph() produces DeltaEvents. flushPendingChanges checks liveAliasesInBatch.size === 0 (since the drain path never populates it) and discards all events — including the live data at cursor=3 that should have fired.

The cursor gate for this alias also never opens (cursorGateOpenByAlias stays false), so if no more data arrives, the effect is permanently stuck.

All existing tests use mockSyncCollectionOptionsNoInitialState which avoids this path, so the bug is untested. A test covering "buffered startup batch straddles startAfter before all aliases are subscribed" should be added.

Fix: The buffer drain loop should route through handleSourceChanges (or at least through the cursor gate logic) instead of calling sendChangesToD2 directly.


2. CRITICAL: This Does Not, and Cannot With the Current Architecture, Guarantee True Replayability of Distinct Source Events

The branch is consistent with the PR writeup, but the semantics are "replay-safe checkpointing of net query deltas", not "replay every source event after cursor". There are three independent collapsing layers that each reduce the number of observable events:

Layer 1 — CollectionStateManager: commitPendingTransactions computes a visible-state diff, not a per-write event log. Multiple write() calls to the same key in one begin()/commit() collapse into one emitted ChangeMessage. The rowCursorWrites map uses Map.set(key, ...), keeping only the last cursor written for a given key:

      for (const key of changedKeys) {
        const cursor = eventCursors.get(key)
        const previousVisibleValue = currentVisibleState.get(key)
        const newVisibleValue = this.get(key)
        // ... one net event per key based on diff
      }

Layer 2 — CollectionSubscription: filterAndFlipChanges rewrites unseen update into insert and drops unseen delete. The query engine does not receive the raw sync event stream:

// (from filterAndFlipChanges)
if (!keyInSentKeys) {
  if (change.type === `update`) {
    newChange = { ...change, type: `insert`, previousValue: undefined }
  } else if (change.type === `delete`) {
    if (!skipDeleteFilter) {
      continue  // dropped entirely
    }
  }
  this.sentKeys.add(change.key)
}

Layer 3 — Effect pipeline: accumulateEffectChanges accumulates all D2 output multiplicities until graph quiescence, then classifyDelta produces one enter/exit/update per key:

function accumulateEffectChanges<T>(
  acc: Map<unknown, EffectChanges<T>>,
  [[key, tupleData], multiplicity]: ...
): Map<unknown, EffectChanges<T>> {
  // ...
  if (multiplicity < 0) {
    changes.deletes += Math.abs(multiplicity)
    changes.deleteValue ??= value
  } else if (multiplicity > 0) {
    changes.inserts += multiplicity
    changes.insertValue = value
  }
  // ...
}

This means joins, orderBy/topK, subquery/includes materialisation, and same-key multiple writes in one sync transaction can all collapse multiple source mutations into one effect event. If the requirement from issue #1394 is "only process mutations after cursor, preserving each distinct source mutation", this branch does not provide that contract.

This is the fundamental tension Sam identified in the issue: TanStack DB is a state-delta system, not an event log. The startAfter feature works correctly for state deltas (the visible query result after a given cursor is correct), but cannot guarantee that every source-level event produces a corresponding DeltaEvent. This matters most when the sync layer models append-only events as mutable rows.


3. SIGNIFICANT: isRedundantSync Can Swallow Cursored Events

In commitPendingTransactions (state.ts), there's a redundancy check:

        const completedOp = completedOptimisticOps.get(key)
        let isRedundantSync = false

        if (completedOp) {
          if (
            completedOp.type === `delete` &&
            previousVisibleValue !== undefined &&
            newVisibleValue === undefined &&
            deepEquals(completedOp.value, previousVisibleValue)
          ) {
            isRedundantSync = true
          } else if (
            newVisibleValue !== undefined &&
            deepEquals(completedOp.value, newVisibleValue)
          ) {
            isRedundantSync = true
          }
        }
        // ...
        if (isRedundantSync && !shouldEmitVirtualUpdate) {
          continue  // <-- event suppressed, cursor lost
        }

When a completed optimistic operation matches the sync confirmation, the event is suppressed. But eventCursors has the cursor for that key. Since the event is skipped, the cursor never reaches the subscription, and the effect's cursor gate never sees it.

Scenario: user makes an optimistic insert at key=1, sync confirms with cursor=5, values match → event suppressed → effect never sees cursor=5 → cursor gate never opens if startAfter=4.

This interaction between optimistic operations and cursor gating is probably rare in the target use case (server-side durable execution), but it's a landmine.


4. SIGNIFICANT: Electric Adapter Control-Point Batching Shapes What the Effect Sees

The Electric adapter's subscribe handler uses up-to-date and subset-end control messages as commit boundaries. The first progressive up-to-date triggers an atomic truncate + replay of all buffered initial messages. This means the number of source messages that get collapsed into one collection transaction is controlled by Electric's control-point timing, not by the individual write() calls.

This is a critical layer that sits between the raw stream and the collection, and it directly affects how many distinct source events are visible to the effect. If Electric batches 50 row changes into one begin()/commit() cycle triggered by a single up-to-date, all 50 writes collapse per the state-diff logic above. The cursor on each would be stream.lastOffset (the same batch-level offset for all 50), so startAfter granularity is limited to Electric batch boundaries.

        write({
          type: isDuplicateInsert ? `update` : operation,
          value: changeMessage.value,
          cursor: stream.lastOffset,
          metadata: {
            ...changeMessage.headers,
          },
        })

This is consistent with the PR's "batch is the checkpoint unit" design, but it constrains replay precision to the sync provider's commit boundaries.


5. DESIGN: Transaction-Scoped Batching Can Leak Replay Into Output

When handleSourceChangesBeforeCursorGate splits a batch at the cursor boundary, it calls scheduleGraphRun twice — once for replay data, once for live data:

    if (firstLiveIndex > 0) {
      const replayChanges = changes.slice(0, firstLiveIndex)
      this.sendChangesToD2(alias, replayChanges)
      this.scheduleGraphRun(alias)
    } else if (!getActiveTransaction() && this.graph?.pendingWork()) {
      // Best-effort boundary preservation outside transaction-scoped flushes.
      this.runGraph()
    }

    this.cursorGateOpenByAlias.set(alias, true)
    this.liveAliasesInBatch.add(alias)

    const liveChanges = changes.slice(firstLiveIndex)
    this.recordPendingBatchCursor(alias, liveChanges)
    this.sendChangesToD2(alias, liveChanges)
    this.scheduleGraphRun(alias)

Without an active transaction, scheduleGraphRun runs the graph immediately, so the replay graph run completes (and output is discarded) before the live changes are sent. This is correct.

But if there is an active TanStack DB transaction, both scheduleGraphRun calls are deferred to the same scheduler job. When the scheduler finally runs, both replay and live data are queued in D2's inputs and processed in one graph.run(). The D2 graph merges all queued multisets at the join operator level (JoinOperator drains its entire input queue per run()). The accumulated output includes contributions from both replay and live data. Since liveAliasesInBatch has the alias, flushPendingChanges emits everything, including output caused by replay changes.

The code acknowledges this with the comment "Best-effort boundary preservation outside transaction-scoped flushes." In practice, sync callbacks don't run inside TanStack DB transactions, so the immediate (correct) path is taken. But this is fragile and undocumented.


6. DESIGN: Uncursored Batches Are Treated as Fully Live Once Gating Is Active

findFirstChangeAfterCursor() returns 0 when a batch contains no cursors at all:

  if (!anyCursor) {
    return 0
  }

  return -1

This opens the gate and emits the entire batch. It matches the test "should eventually open cursor gate when cursor-less changes follow cursored replay", but it also means startAfter is only safe if the adapter puts a monotonic cursor on every relevant sync.write() call. Any missed cursor in an adapter or edge path becomes a duplicate-side-effect hole instead of a hard failure. This isn't validated at the startAfter contract level — there's no warning or error when the sync provider stops sending cursors partway through replay.


7. DESIGN: triggeringSource / cursors Are Not Reliable Causal Metadata

Two reasons:

First, triggeringSource is just the first alias inserted into liveAliasesInBatch:

      if (this.liveAliasesInBatch.size > 0) {
        triggeringSource = this.liveAliasesInBatch.values().next().value
      }

For join queries where multiple sources contribute live changes in the same batch, only the first-inserted alias is reported.

Second, in-graph join tap / lazy-load changes skip both recordPendingBatchCursor() and liveAliasesInBatch (because they hit the isGraphRunning guard at line 719). So result changes can be influenced by secondary-source loads without that source being represented in cursors or triggeringSource at all:

    if (this.isGraphRunning) {
      this.sendChangesToD2(alias, changes)
      return
    }

Checkpoint routing logic should not assume these fields are authoritative for multi-source batches.


8. DESIGN: "Exit Without Enter" Is Possible After Replay

If a replayed change causes a row to enter the query result, that enter event is discarded (cursor gate closed). If a subsequent live change causes the same row to exit the query result, the exit event is emitted. The consumer sees an exit for a row they never saw enter for.

This is semantically correct for startAfter (the consumer already processed the enter before checkpointing), but it's a sharp edge. The previousValue on an update event could reference state that the consumer never observed through this effect instance. This should be documented.


9. CODE: No Cross-Batch Monotonicity Enforcement

recordPendingBatchCursor enforces monotonicity within a single batch of changes. But across batches, cursors can go backwards without detection — pendingBatchCursorByAlias is cleared in flushPendingChanges between batches, so the comparison against current restarts from scratch each time.

This may be acceptable (the user's sync provider is responsible for ordering), but it contradicts the "monotonically ordered" invariant implied by the within-batch enforcement.


10. CONTEXT: markReady() Does Not Create Row Deltas

markReady() itself does not mutate the transaction payload seen by the query engine. It only transitions the lifecycle status and emits an empty notification:

  public markReady(): void {
    // ...
    if (this.status === `loading`) {
      this.setStatus(`ready`, true)
      // ...
      if (this.changes.changeSubscriptions.size > 0) {
        this.changes.emitEmptyReadyEvent()
      }
    }
  }

However, what does shape the transaction boundary is the sync adapter's control-point handling. In the Electric adapter, subset-end and up-to-date are the commit triggers, and the first progressive up-to-date does an atomic truncate + replay of all buffered initial messages. So control messages absolutely can change how many source messages get collapsed into one collection transaction before the effect layer sees them. This is the mechanism by which markReady timing indirectly affects cursor gating — not by creating events, but by determining when batches are committed and therefore how much collapsing occurs.


11. Alignment with Sam's $version Proposal

Sam's comment on the issue proposes a $version virtual prop approach instead. The key differences:

Aspect startAfter (this PR) $version (Sam's proposal)
Where gating happens Effect pipeline (before output) User code (in handler or WHERE)
Scope Per-source alias Per-row
Works with event collapsing Partially (state-delta semantics) Yes (user controls filtering)
Works with mutable rows Fragile (intermediate states lost) Explicit (user decides)
Complexity Internal to effect engine External, user-controlled

Sam's proposal explicitly acknowledges: "If user data collapses actions into mutable state, intermediate actions are not reconstructable from $version alone." The startAfter PR doesn't surface this limitation as clearly.

The $version approach is more honest about what TanStack DB can and cannot guarantee. The startAfter approach provides better DX for the common case (append-only events with monotonic cursors) but creates an illusion of exact replay guarantees that the pipeline can't always deliver.


12. Summary of Recommended Actions

Must fix:

  1. Buffer drain path must respect cursor gating (Issue ci: Introduce changesets #1)

Should fix:
2. Document that startAfter provides state-delta replay, not event-log replay — the semantics are "hydrate historical state, then emit future net query-result transitions"
3. Document the "exit without enter" behaviour
4. Make triggeringSource an array, or document that it's the first contributing alias and that lazy-loaded join sources are invisible to it
5. Consider whether uncursored batches opening the gate should emit a warning, since it silently breaks the monotonic-cursor contract

Should consider:
6. The isRedundantSync interaction with cursor gating (Issue #3)
7. Whether startAfter is the right abstraction vs. Sam's $version proposal — particularly given the three independent collapsing layers that make exact replay impossible for mutable rows
8. Adding cross-batch monotonicity enforcement or documenting it as a caller responsibility

Tests to add:
9. Buffered startup batch straddles startAfter before all aliases are subscribed — this is the gap most likely to hide a real bug

@samwillis
Copy link
Collaborator

After the review above, I asked Opus:

I think it's also important to consider if there should be a collection level config to tell it to "commit" after every change in a "replay mode" and how this could make things work?


What a per-change commit mode would look like

The idea: add a config flag on the collection (or on the sync config) that makes every write() auto-commit, so the sync adapter doesn't need to batch. Something like:

createCollection({
  sync: {
    replayCommitMode: 'per-change', // new option
    sync: ({ begin, write, commit, markReady }) => {
      // adapter writes normally — collection auto-wraps each write
      // in its own begin/commit cycle
    }
  }
})

Let me trace what this would fix and what it wouldn't.


What it fixes

Layer 1 collapsing (same-key state diffing) is eliminated. Today, commitPendingTransactions applies all operations in the transaction to syncedData, then diffs previous vs. final visible state per key. If key=1 gets insert(cursor=1) then update(cursor=2) in one transaction, the collection emits one event with the final value.

With per-change commits, each write gets its own commitPendingTransactions call. Insert(key=1, cursor=1) produces one insert event. Update(key=1, cursor=2) produces one update event. Both carry their own cursor. The effect sees both.

Cursor precision improves. Each event carries its own cursor, so the consumer can checkpoint at per-event granularity. No more straddling batches — each change is either before or after the startAfter boundary, never split across a boundary within a single commit.

The buffer drain bug (Issue #1) becomes less severe. Since each change is its own commit, even if changes arrive during the buffer phase, each produces a single event with a clear cursor position. The gate logic still needs fixing, but the "straddling" scenario becomes impossible.


What it does NOT fix

Even with per-change commits, two other collapsing layers remain:

Layer 2 — CollectionSubscription.filterAndFlipChanges still rewrites events. If the subscription hasn't seen key=1 yet (not in sentKeys), an update to key=1 gets rewritten to insert. A delete for an unseen key gets dropped entirely. This happens regardless of commit granularity. So the effect's D2 input still doesn't match the raw sync stream 1:1.

Layer 3 — The D2 graph + effect accumulation. Even with per-change commits, the transactionScopedScheduler can coalesce multiple synchronous commits into one graph run. If the sync adapter fires:

begin(); write({ key: 1, cursor: 1 }); commit();
begin(); write({ key: 1, cursor: 2 }); commit();

Both commit() calls trigger subscription callbacks synchronously. Each calls handleSourceChangesscheduleGraphRun. Without an active TanStack DB transaction, each scheduleGraphRun runs the graph immediately and separately — so this case actually works and produces two separate DeltaEvent batches.

But if something wraps these in a parent context (or if the scheduler defers for any reason), the two graph runs merge and accumulateEffectChanges produces one net delta for key=1.

More importantly, queries with JOINs, WHERE, or aggregation still transform the relationship between source events and output events. A source insert might not produce any output (WHERE filters it). A source update to the join key might cause both an exit and an enter for different output keys. Per-change commits at the collection level don't affect this — it's a property of the query pipeline.


The performance problem

Per-change commits during initial sync would be devastating. Consider an Electric shape with 10,000 rows in the initial snapshot. Today, this is one begin() → 10,000 write() calls → one commit() → one visible-state diff → one subscription callback. With per-change commits, it would be 10,000 individual commitPendingTransactions calls, each doing a state diff + event emission + subscription callback. Plus 10,000 D2 graph runs.

You could mitigate this by only enabling per-change mode after the initial sync, or only for changes past the startAfter boundary. But this adds significant complexity:

// Hypothetical: only per-change commit for post-cursor changes
sync: {
  replayCommitMode: {
    mode: 'per-change',
    afterCursor: savedCursor, // only applies past this point
  }
}

This would require the collection to know about cursor semantics, which currently lives only in the effect layer. It would leak effect-level concerns into the collection.


The atomicity problem

Some sync providers intentionally batch changes that should be atomic. Electric's up-to-date control message means "the preceding changes form a consistent snapshot." Breaking this into per-change commits means consumers see intermediate states that were never independently consistent on the server. For state-delta effects (the current createEffect semantics), this is usually fine. But it breaks the implicit contract that each event represents a consistent state transition.


What would actually make things work end-to-end

Given the three collapsing layers, no single config at the collection level fully solves the problem. Here's what a complete solution would require:

For same-key collapsing (Layer 1): Per-change commits, or modifying commitPendingTransactions to emit events per operation instead of per state diff.

For subscription rewriting (Layer 2): The effect would need to subscribe with includeInitialState: false and skipFiltering: true (via markAllStateAsSeen), which it already does for mockSyncCollectionOptionsNoInitialState in tests. But for collections with existing state, this means the effect would miss the initial snapshot — which is actually fine for startAfter since the whole point is "I already have state up to this cursor."

For D2 accumulation (Layer 3): The effect would need to run the graph and flush after every single source change, not accumulate across a scheduler run. This means scheduleGraphRun must run immediately (no transaction batching), and runGraph must be called per change, not per scheduler cycle.

Combining all three, you'd need something like:

createCollection({
  sync: {
    commitMode: 'per-change', // Layer 1: no same-key collapsing
    sync: ...
  }
})

createEffect({
  query: (q) => q.from({ events: collection }),
  startAfter: savedCursor,
  // implicitly: subscribe with skipFiltering, flush per change
  onEnter: (event) => { ... }
})

This is achievable but represents a fundamentally different operating mode for the effect pipeline — closer to an event-stream processor than a reactive query result observer.


The deeper question: is this the right layer?

I think the tension here is that startAfter is trying to add event-log semantics to a system designed around state deltas. There are two honest paths forward:

Path A — Accept state-delta semantics. startAfter means "hydrate historical state, then emit future net query-result transitions." Document that intermediate events may be collapsed. This is what the current PR does, and it's correct for its stated semantics. Sam's $version proposal fits here — it gives the user a tool to implement their own filtering without the system pretending it can replay individual events.

Path B — Add a true event-replay mode. Per-change commits + skip subscription filtering + per-change graph flushes. This is a real operating mode, but it should probably be a separate API (maybe createEventProcessor or a config like replayMode: 'event-log') rather than bolted onto the existing createEffect. It would only work for simple queries (no joins, no aggregation), because the D2 pipeline inherently transforms the event stream.

A per-change commit config at the collection level is a necessary building block for Path B, but it's not sufficient on its own. And it comes with real costs (performance, atomicity) that need to be weighed against the alternative of just using Sam's $version approach where the consumer handles idempotency explicitly.

My recommendation: if the team wants to pursue Path B, design it as a distinct mode with clear constraints (single-source, no aggregation, per-change semantics) rather than trying to make the general-purpose startAfter on createEffect guarantee something the architecture can't deliver in the general case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants