feat: cursor-aware effect replay with startAfter gating#1405
feat: cursor-aware effect replay with startAfter gating#1405KyleAMathews wants to merge 11 commits intomainfrom
Conversation
Add CollectionCursor type and startAfter option to createEffect, enabling effects to suppress callbacks during replay of historical changes while still hydrating internal query state. Sync writes can now carry an opaque sortable cursor that propagates through ChangeMessage and DeltaEvent. Includes fixes for cursor type mismatch crashes (routed through onSourceError), cursor-less change handling after replay, and cursor erasure within same-transaction writes. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
More templates
@tanstack/angular-db
@tanstack/browser-db-sqlite-persistence
@tanstack/capacitor-db-sqlite-persistence
@tanstack/cloudflare-durable-objects-db-sqlite-persistence
@tanstack/db
@tanstack/db-ivm
@tanstack/db-sqlite-persistence-core
@tanstack/electric-db-collection
@tanstack/electron-db-sqlite-persistence
@tanstack/expo-db-sqlite-persistence
@tanstack/node-db-sqlite-persistence
@tanstack/offline-transactions
@tanstack/powersync-db-collection
@tanstack/query-db-collection
@tanstack/react-db
@tanstack/react-native-db-sqlite-persistence
@tanstack/rxdb-db-collection
@tanstack/solid-db
@tanstack/svelte-db
@tanstack/tauri-db-sqlite-persistence
@tanstack/trailbase-db-collection
@tanstack/vue-db
commit: |
|
Size Change: +1.15 kB (+1.02%) Total Size: 114 kB
ℹ️ View Unchanged
|
|
Size Change: 0 B Total Size: 4.23 kB ℹ️ View Unchanged
|
Cursor gate state is global to the effect, so a cursor from one source collection could incorrectly open the gate for another. Validate at construction time that startAfter is only used with single-source effects until per-source cursor tracking is implemented. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
df371c3 to
d651fdf
Compare
Reject non-monotonic cursor sequences within a sync batch via onSourceError instead of silently leaking replay changes. Document the known limitation that same-key replay+live changes within one transaction produce an enter event instead of update due to transaction-scoped graph coalescing. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Thread stream.lastOffset as the cursor field on each write in processChangeMessage, enabling effects with startAfter to resume from a persisted Electric offset. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Clarify that startAfter requires monotonic cursors from the sync source and is only supported for single-source (non-join) effects. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The same-key boundary-crossing case (insert at cursor N, update at cursor N+1 in one transaction) cannot happen in practice because startAfter is always set from a completed transaction boundary. Within a single transaction, all changes share the same cursor space. Removes the test documenting this as a limitation and reverts the synchronous graph flush that was added to address it. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
startAfter now accepts Record<string, CollectionCursor> for per-source gating in join effects. Each source gets an independent gate. DeltaEvent gains triggeringSource and cursors fields for gated effects. Join tap callbacks during graph runs are isolated from cursor gate logic to prevent false gate opens. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
I had both Opus and GPT5.2 review this, with the context of our discussion, and the comment on #1394. The key thing here is that we are trying to bolt on log replay triggers on top of the query engine, and there are multiple layers where event collapsing can happen. This api is giving the impression that you will get a true replay of all events, but depending on how the source collections emits them (how it handles up-to-date/markReady), and what other collection are joined.
I do wander if we need a separate Deep Review of
|
| Aspect | startAfter (this PR) |
$version (Sam's proposal) |
|---|---|---|
| Where gating happens | Effect pipeline (before output) | User code (in handler or WHERE) |
| Scope | Per-source alias | Per-row |
| Works with event collapsing | Partially (state-delta semantics) | Yes (user controls filtering) |
| Works with mutable rows | Fragile (intermediate states lost) | Explicit (user decides) |
| Complexity | Internal to effect engine | External, user-controlled |
Sam's proposal explicitly acknowledges: "If user data collapses actions into mutable state, intermediate actions are not reconstructable from $version alone." The startAfter PR doesn't surface this limitation as clearly.
The $version approach is more honest about what TanStack DB can and cannot guarantee. The startAfter approach provides better DX for the common case (append-only events with monotonic cursors) but creates an illusion of exact replay guarantees that the pipeline can't always deliver.
12. Summary of Recommended Actions
Must fix:
- Buffer drain path must respect cursor gating (Issue ci: Introduce changesets #1)
Should fix:
2. Document that startAfter provides state-delta replay, not event-log replay — the semantics are "hydrate historical state, then emit future net query-result transitions"
3. Document the "exit without enter" behaviour
4. Make triggeringSource an array, or document that it's the first contributing alias and that lazy-loaded join sources are invisible to it
5. Consider whether uncursored batches opening the gate should emit a warning, since it silently breaks the monotonic-cursor contract
Should consider:
6. The isRedundantSync interaction with cursor gating (Issue #3)
7. Whether startAfter is the right abstraction vs. Sam's $version proposal — particularly given the three independent collapsing layers that make exact replay impossible for mutable rows
8. Adding cross-batch monotonicity enforcement or documenting it as a caller responsibility
Tests to add:
9. Buffered startup batch straddles startAfter before all aliases are subscribed — this is the gap most likely to hide a real bug
|
After the review above, I asked Opus:
What a per-change commit mode would look likeThe idea: add a config flag on the collection (or on the sync config) that makes every createCollection({
sync: {
replayCommitMode: 'per-change', // new option
sync: ({ begin, write, commit, markReady }) => {
// adapter writes normally — collection auto-wraps each write
// in its own begin/commit cycle
}
}
})Let me trace what this would fix and what it wouldn't. What it fixesLayer 1 collapsing (same-key state diffing) is eliminated. Today, With per-change commits, each write gets its own Cursor precision improves. Each event carries its own cursor, so the consumer can checkpoint at per-event granularity. No more straddling batches — each change is either before or after the The buffer drain bug (Issue #1) becomes less severe. Since each change is its own commit, even if changes arrive during the buffer phase, each produces a single event with a clear cursor position. The gate logic still needs fixing, but the "straddling" scenario becomes impossible. What it does NOT fixEven with per-change commits, two other collapsing layers remain: Layer 2 — Layer 3 — The D2 graph + effect accumulation. Even with per-change commits, the begin(); write({ key: 1, cursor: 1 }); commit();
begin(); write({ key: 1, cursor: 2 }); commit();Both But if something wraps these in a parent context (or if the scheduler defers for any reason), the two graph runs merge and More importantly, queries with JOINs, WHERE, or aggregation still transform the relationship between source events and output events. A source insert might not produce any output (WHERE filters it). A source update to the join key might cause both an exit and an enter for different output keys. Per-change commits at the collection level don't affect this — it's a property of the query pipeline. The performance problemPer-change commits during initial sync would be devastating. Consider an Electric shape with 10,000 rows in the initial snapshot. Today, this is one You could mitigate this by only enabling per-change mode after the initial sync, or only for changes past the // Hypothetical: only per-change commit for post-cursor changes
sync: {
replayCommitMode: {
mode: 'per-change',
afterCursor: savedCursor, // only applies past this point
}
}This would require the collection to know about cursor semantics, which currently lives only in the effect layer. It would leak effect-level concerns into the collection. The atomicity problemSome sync providers intentionally batch changes that should be atomic. Electric's What would actually make things work end-to-endGiven the three collapsing layers, no single config at the collection level fully solves the problem. Here's what a complete solution would require: For same-key collapsing (Layer 1): Per-change commits, or modifying For subscription rewriting (Layer 2): The effect would need to subscribe with For D2 accumulation (Layer 3): The effect would need to run the graph and flush after every single source change, not accumulate across a scheduler run. This means Combining all three, you'd need something like: createCollection({
sync: {
commitMode: 'per-change', // Layer 1: no same-key collapsing
sync: ...
}
})
createEffect({
query: (q) => q.from({ events: collection }),
startAfter: savedCursor,
// implicitly: subscribe with skipFiltering, flush per change
onEnter: (event) => { ... }
})This is achievable but represents a fundamentally different operating mode for the effect pipeline — closer to an event-stream processor than a reactive query result observer. The deeper question: is this the right layer?I think the tension here is that Path A — Accept state-delta semantics. Path B — Add a true event-replay mode. Per-change commits + skip subscription filtering + per-change graph flushes. This is a real operating mode, but it should probably be a separate API (maybe A per-change commit config at the collection level is a necessary building block for Path B, but it's not sufficient on its own. And it comes with real costs (performance, atomicity) that need to be weighed against the alternative of just using Sam's My recommendation: if the team wants to pursue Path B, design it as a distinct mode with clear constraints (single-source, no aggregation, per-change semantics) rather than trying to make the general-purpose |
Summary
Add
startAftercursor option tocreateEffectanduseLiveQueryEffect, enabling effects to skip callbacks during historical replay while still hydrating query state. Supports both single-source effects (scalar cursor) and join queries (Record<string, CollectionCursor>for per-source gating).Root Cause
When replaying a stream of sync changes (e.g., after reconnection or page reload),
createEffectfires callbacks for every historical change. There was no way to say "I've already processed everything up to cursor X — only fire for new changes." This forced consumers to either accept duplicate side-effects during replay or build their own deduplication layer.Approach
Thread a
CollectionCursor(string | number) through three layers:1. Sync layer (
sync.ts,state.ts):sync.write()accepts an optionalcursor. The collection state manager propagates it throughPendingSyncedTransaction→eventCursors→ emittedChangeMessageevents.2. Effect pipeline (
effect.ts): NewstartAfterconfig option. When set, theEffectPipelineRunnergates callbacks per-alias: changes at or beforestartAfter[alias]still hydrate the D2 query graph (so query state is correct), butflushPendingChangesdiscards events until a live source contributes changes past its cursor boundary. For single-source effects, a scalarstartAftervalue is automatically normalized to the single alias.3. DeltaEvent enrichment (
effect.ts): Each emittedDeltaEventcarries:cursor: batch-level high-water cursor (max across all source cursors)cursors: per-source cursor map (for gated effects)triggeringSource: which source alias drove this batch (for gated effects)Key Invariants
previousValueon the first liveupdateevent would be wrongcompareCollectionCursorstype mismatches route throughonSourceErrorfor graceful disposalstartAfterwith multi-source effects throws (use Record form for joins)Non-goals
CollectionCursorby primitive type (runtime check is sufficient for now)Trade-offs
The cursor is batch-level, using the highest cursor seen per-source in the batch. This means the cleanest semantics come when replay writes are cursor-ordered and each sync commit maps to one cursor boundary. This was chosen over per-row cursors because the batch is the natural checkpoint granularity — saving a cursor after processing a batch is the common consumer pattern.
Per-alias gating adds complexity to the effect pipeline, but the alternative (requiring consumers to manually filter events by source) would push that complexity to every call site and couldn't protect
previousValuecorrectness.Verification
Files changed
packages/db/src/types.tsCollectionCursortype,cursorfield onChangeMessagepackages/db/src/collection/state.tsPendingCursorWritetype, cursor propagation througheventCursors→ emitted change eventspackages/db/src/collection/sync.tsrowCursorWritespackages/db/src/query/effect.tsstartAftergating,DeltaEventwithcursor/cursors/triggeringSource, join-tap-aware graph run isolation,compareCollectionCursorspackages/db/src/query/live/utils.tssplitUpdatesvia spreadpackages/react-db/src/useLiveQueryEffect.tsstartAfterthrough tocreateEffectpackages/electric-db-collection/src/electric.tsstream.lastOffsetas cursor in sync writespackages/db/tests/effect.test.tspackages/db/tests/collection.test.ts🤖 Generated with Claude Code