[Runner] Fix SDF checkpoint handling in and upgrade of Samza portable runner (resolves #26126)#37886
[Runner] Fix SDF checkpoint handling in and upgrade of Samza portable runner (resolves #26126)#37886tkaymak wants to merge 6 commits intoapache:masterfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a critical issue in the Samza portable runner where Splittable DoFns (SDFs) failed to handle checkpoints correctly, leading to Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
|
/gemini review |
|
Assigning reviewers: R: @shunping for label go. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
There was a problem hiding this comment.
Code Review
This pull request correctly addresses a bug in the Samza portable runner related to checkpoint handling for Splittable DoFns (SDFs). The changes introduce a BundleCheckpointHandler that leverages Samza's state and timer mechanisms to manage SDF residuals, fixing an UnsupportedOperationException. The implementation is clean, and the logic for detecting SDFs, creating the handler, and processing checkpoint timers is sound. The re-enabling of the TestDebeziumIO_BasicRead test provides confidence in the fix. I have one suggestion to improve the readability of a new helper method.
runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
Outdated
Show resolved
Hide resolved
|
I think @Abacn would be best to review this (or someone that looks at the Java label PRs, as the Golang change is really just to re-enable the Golang test) |
|
Thanks for the change, unfortunately Samza runner is scheduled to retire (along with Java 8 sunset. context #32208 #31678) Since this is already done. We can get it in. Would you mind make any change to .github/trigger_files/beam_PostCommit_XVR_Samza.json and push a commit to trigger the PostCommit test for this PR? |
|
@Abacn I was building the test setup of the CI locally to make sure I could follow every step. Possible Fixes for the situation explained below:
Issue: Running TestDebeziumIO_BasicRead (or any pipeline that calls BagState.clear()) on the Samza portable runner crashes with:Root causeThis is seems to be a binary compatibility bug between two Samza 1.6.0 JARs bundled in the Beam Samza job-server shadow JAR. samza-api:1.6.0 defines KeyValueStore as a Java interface with deleteAll as a default method: public interface KeyValueStore<K, V> {
default void deleteAll(List<K> keys) { ... }
}
Full stack traceTrigger conditionAny portable Samza pipeline that reaches EnvironmentBeam: 2.73.0-SNAPSHOT |
|
@Abacn I think it was a nice exercise to dive into the code. However, given that the last Samza release is from 2023, I think the answer is clear. I will close this PR. |
|
@Abacn - upgrading to 1.8.0 / scala 2.12 (Option 1) seems to have done the trick. The minimal necessary changes to make the test pass have been implemented. I am not sure it would be still worth merging this before the sunsetting of Samza. /gemini review |
There was a problem hiding this comment.
Code Review
This pull request effectively addresses the SDF checkpoint handling issue in the Samza portable runner, resolving issue #26126. The changes involve updating Samza and Scala dependencies to newer versions, and crucially, implementing a BundleCheckpointHandler to correctly manage SDF residuals and timers. The SdkHarnessDoFnRunner is updated to detect SDF transforms, create an appropriate checkpoint handler, and intercept SDF checkpoint timers to re-process residuals from state. The removal of the TestDebeziumIO_BasicRead filter from the Go integration tests confirms the successful resolution of the underlying problem.
|
@Abacn I did use .github/trigger_files/beam_PostCommit_XVR_Samza.json to push a commit to trigger the PostCommit test for this PR and it looks ok to me. ✅ |
|
It appears some change broke ParDo. Tested Samza ValidatesRunner tests on these PR: (Java) https://github.com/apache/beam/actions/runs/23321184640 (Python) https://github.com/apache/beam/actions/runs/23321199760 (Go) https://github.com/apache/beam/actions/runs/23321177775
non-portable Samza runner is fine: https://github.com/apache/beam/actions/runs/23321191973 |
| WindowedValue<InT> residual = | ||
| stateInternals | ||
| .state( | ||
| StateNamespaces.window(windowCoder, window), | ||
| StateTags.value(timerId, windowedValueCoder)) | ||
| .read(); | ||
| if (residual != null) { | ||
| processElement(residual); | ||
| } |
There was a problem hiding this comment.
| WindowedValue<InT> residual = | |
| stateInternals | |
| .state( | |
| StateNamespaces.window(windowCoder, window), | |
| StateTags.value(timerId, windowedValueCoder)) | |
| .read(); | |
| if (residual != null) { | |
| processElement(residual); | |
| } | |
| org.apache.beam.sdk.state.ValueState<WindowedValue<InT>> residualState = | |
| stateInternals.state( | |
| StateNamespaces.window(windowCoder, window), | |
| StateTags.value(timerId, windowedValueCoder)); | |
| WindowedValue<InT> residual = residualState.read(); | |
| if (residual != null) { | |
| residualState.clear(); | |
| processElement(residual); | |
| } |
Gemini was able to find this and tests looks good
edit: actually it doesn't help. Run single test would pass, but tests are interfering with each other
There was a problem hiding this comment.
I think it's an issue in Samza 1.8.0. Reverting SamzaDoFnRunners.java the test fails as well. Since Samza is already an inactive project (no new release for 3 years) it's not likely get fixed. Let's close this for now.
Samza's
SdkHarnessDoFnRunnercalledgetBundle()without aBundleCheckpointHandler, causing anUnsupportedOperationException(issue #26126) whenKafkaSourceConsumerFn(an SDF) returnedProcessContinuation.resume().Three changes:
hasSDF()+createBundleCheckpointHandler()— detects whether the executable stage contains an SDF (URN =SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN) and if so, creates aStateAndTimerBundleCheckpointHandlerthat stores SDF residuals in Samza's persistent state/timer internals (using a null-key non-keyed factory, matching how the stage is already run non-keyed).startBundle()— passes thebundleCheckpointHandlertostageBundleFactory.getBundle(...)so checkpoint responses from the SDK harness are properly handled instead of throwing.onTimer()— SDF checkpoint timers (IDs prefixed withsdf_checkpoint:) are now intercepted. Instead of trying to route them to a remote bundle timer receiver (which would fail), the runner reads the stored residual from state and re-feeds it viaprocessElement()for reprocessing.In combination with upgrading to Samza 1.8.0 / Scala 2.12 and adjusting the signature of the
SamzaStoreStateInternalsTestthis allows to remove theTestDebeziumIO_BasicReadfilter fromsamzaFiltersand re-enable the test against the Samza runner.GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.