Skip to content

Conversation

@TimothyW553
Copy link
Collaborator

@TimothyW553 TimothyW553 commented Dec 22, 2025

🥞 Stacked PR

Use this link to review incremental changes.


Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

How was this patch tested?

Does this PR introduce any user-facing changes?

@TimothyW553 TimothyW553 changed the title tests [draft] [Draft] E2E CCv2 DSv2 streaming integration test Dec 22, 2025
ucTableId, type, version, maxRatifiedVersion));
}
};
startVersion.ifPresent(v -> validateVersion.accept(v, "start"));
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for streaming: startVersion=2 (even if the latest version ratified by UC is 1) is a normal streaming probe, but UC was rejecting it. removing this start-version validation in UC lets kernel handle streaming probe correctly.

}

test("loadCommitRange throws if startVersion is greater than max ratified version") {
test(
Copy link
Collaborator Author

@TimothyW553 TimothyW553 Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for streaming purposes, it should not just completely stop the stream from probing, it should return "no commit found" (CommitRangeNotFoundException) and the stream should just wait.

long startVersion, Optional<DeltaSourceOffset> endOffset) {
List<IndexedFile> allIndexedFiles = new ArrayList<>();
Optional<Long> endVersionOpt =
endOffset.isPresent() ? Optional.of(endOffset.get().reservoirVersion()) : Optional.empty();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to follow DSv1 semantics. In DSv1, if the index is END_INDEX(), then it automatically bumps the cursor to (reservoirVersion = v+1, index = BASE_INDEX()) - this is the "next index to read". but in DSv2 the commit range is inclusive and so this would throw an error.

so if we are on BASE_INDEX(), we bump the version back down

@TimothyW553 TimothyW553 force-pushed the stack/E2E-uc-dsv2-tests branch from 1e2fb3b to 9f0f986 Compare January 2, 2026 22:07
@TimothyW553 TimothyW553 changed the title [Draft] E2E CCv2 DSv2 streaming integration test [Draft][Kernel-Spark] CCv2 DSv2 streaming read integration test Jan 6, 2026
tdas pushed a commit that referenced this pull request Jan 6, 2026
…erFactory (#5678)

## 🥞 Stacked PR
Use this
[link](https://github.com/delta-io/delta/pull/5678/files/7eeb5493c320ddffdea6473fbaf2735385a95e43..326588447b403f307fded67f06c4275122833628)
to review incremental changes.
- [catalogtableutils-ccv2](#5477)
[[Files changed](https://github.com/delta-io/delta/pull/5477/files)]
-
[**stack/snapshotmanager-factory-wireup**](#5678)
[[Files
changed](https://github.com/delta-io/delta/pull/5678/files/7eeb5493c320ddffdea6473fbaf2735385a95e43..326588447b403f307fded67f06c4275122833628)]
- [stack/E2E-uc-dsv2-tests](#5750)
[[Files
changed](https://github.com/delta-io/delta/pull/5750/files/8b3f0b1b885d2edd859e92cbcb518b0bb3d7abab..fa5552f020e744d43081768579c3fb957b09c9ff)]
- [stack/test-e2e-draft](#5784)
[[Files
changed](https://github.com/delta-io/delta/pull/5784/files/049c18201524eaf3c9c9b83f5ad385f1117c7435..049c18201524eaf3c9c9b83f5ad385f1117c7435)]

---------
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description
This PR introduces a factory that returns the appropriate snapshot
manager for a `SparkTable`. This factory checks the `CatalogTable` to
determine if the snapshot manager should be a PathBasedSnapshotManager,
UCManagedTableSnapshotManager, or another
CatalogedManagedSnapshotManager in the future.

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

## How was this patch tested?
Locally and CI.
<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?
No.
<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->

---------

Signed-off-by: Timothy Wang <[email protected]>
private static final long SINK_POLL_TIMEOUT_MS = 120_000L;

@Test
public void testStreamingReadsUcManagedTableProductionLike() throws Exception {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @TimothyW553 , I think you can follow the streaming write test in this PR https://github.com/delta-io/delta/pull/5719/files#diff-04879565d272ba02d9c1e47707ec9bfdb1044460e957ec769d1914178554383bR91-R116 to design your streaming read test.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean the process will be :

  1. Initialize a StreamingQuery which read from the delta table and write to the MemoryStream
  2. Write few records into the delta table.
  3. use the StreamingQuery#processAllAvailable() to wait all events being processed.
  4. Check the MemoryStream sinks.
  5. Repeat step.2 ~ step.4.

And then finally, all the records should be expected.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants