Skip to content

[Runner] Fix SDF checkpoint handling in and upgrade of Samza portable runner (resolves #26126)#37886

Closed
tkaymak wants to merge 6 commits intoapache:masterfrom
tkaymak:fix_samza_debezium_test
Closed

[Runner] Fix SDF checkpoint handling in and upgrade of Samza portable runner (resolves #26126)#37886
tkaymak wants to merge 6 commits intoapache:masterfrom
tkaymak:fix_samza_debezium_test

Conversation

@tkaymak
Copy link
Contributor

@tkaymak tkaymak commented Mar 18, 2026

Samza's SdkHarnessDoFnRunner called getBundle() without a BundleCheckpointHandler, causing an UnsupportedOperationException (issue #26126) when KafkaSourceConsumerFn (an SDF) returned ProcessContinuation.resume().

Three changes:

  1. hasSDF() + createBundleCheckpointHandler() — detects whether the executable stage contains an SDF (URN = SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN) and if so, creates a StateAndTimerBundleCheckpointHandler that stores SDF residuals in Samza's persistent state/timer internals (using a null-key non-keyed factory, matching how the stage is already run non-keyed).

  2. startBundle() — passes the bundleCheckpointHandler to stageBundleFactory.getBundle(...) so checkpoint responses from the SDK harness are properly handled instead of throwing.

  3. onTimer() — SDF checkpoint timers (IDs prefixed with sdf_checkpoint:) are now intercepted. Instead of trying to route them to a remote bundle timer receiver (which would fail), the runner reads the stored residual from state and re-feeds it via processElement() for reprocessing.

In combination with upgrading to Samza 1.8.0 / Scala 2.12 and adjusting the signature of the SamzaStoreStateInternalsTest this allows to remove the TestDebeziumIO_BasicRead filter from samzaFilters and re-enable the test against the Samza runner.


GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a critical issue in the Samza portable runner where Splittable DoFns (SDFs) failed to handle checkpoints correctly, leading to UnsupportedOperationException. The changes introduce robust checkpointing support for SDFs by integrating a dedicated bundle checkpoint handler, enabling local processing of SDF timers, and managing state persistence. This fix ensures proper execution of SDFs within the Samza runner and re-enables previously disabled integration tests.

Highlights

  • SDF Checkpoint Handling: Implemented a mechanism to detect if an executable stage contains a Splittable DoFn (SDF) and, if so, to create a StateAndTimerBundleCheckpointHandler. This handler stores SDF residuals in Samza's persistent state/timer internals, using a null-key non-keyed factory.
  • Bundle Checkpoint Integration: Modified the startBundle() method to pass the newly created bundleCheckpointHandler to stageBundleFactory.getBundle(...). This ensures that checkpoint responses from the SDK harness are correctly processed, preventing UnsupportedOperationException.
  • SDF Timer Interception: Introduced logic in the onTimer() method to intercept SDF checkpoint timers (identified by a specific prefix). Instead of attempting to route these timers remotely, the runner now reads the stored residual from Samza state and re-feeds it for reprocessing, effectively handling SDF checkpoints locally.
  • Test Re-enablement: Removed the TestDebeziumIO_BasicRead filter from the Samza integration tests, allowing it to run against the Samza runner, as the underlying issue with SDF checkpointing has been resolved.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@tkaymak
Copy link
Contributor Author

tkaymak commented Mar 18, 2026

/gemini review

@github-actions
Copy link
Contributor

Assigning reviewers:

R: @shunping for label go.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request correctly addresses a bug in the Samza portable runner related to checkpoint handling for Splittable DoFns (SDFs). The changes introduce a BundleCheckpointHandler that leverages Samza's state and timer mechanisms to manage SDF residuals, fixing an UnsupportedOperationException. The implementation is clean, and the logic for detecting SDFs, creating the handler, and processing checkpoint timers is sound. The re-enabling of the TestDebeziumIO_BasicRead test provides confidence in the fix. I have one suggestion to improve the readability of a new helper method.

@tkaymak
Copy link
Contributor Author

tkaymak commented Mar 18, 2026

I think @Abacn would be best to review this (or someone that looks at the Java label PRs, as the Golang change is really just to re-enable the Golang test)

@Abacn
Copy link
Contributor

Abacn commented Mar 18, 2026

Thanks for the change, unfortunately Samza runner is scheduled to retire (along with Java 8 sunset. context #32208 #31678)

Since this is already done. We can get it in. Would you mind make any change to .github/trigger_files/beam_PostCommit_XVR_Samza.json and push a commit to trigger the PostCommit test for this PR?

@tkaymak
Copy link
Contributor Author

tkaymak commented Mar 18, 2026

@Abacn I was building the test setup of the CI locally to make sure I could follow every step.
While doing so I uncovered that (after the fix) the whole runner might need an upgrade. Knowing that it will be sunset, I think it might not be worth the time investment and would focus on other open DebeziumIO issues.
Any guidance is appreciated.

Possible Fixes for the situation explained below:

  1. Option — Upgrade Samza: Try Samza 1.7.0 or later, which likely rebuilt the Scala artifacts against a consistent API.
  2. Option — Switch to Scala 2.12 artifacts: samza-kv_2.12:1.6.0 exists on Maven and may have been compiled against a consistent API. Swap samza-kv_2.11samza-kv_2.12 in runners/samza/build.gradle.

Issue: Running TestDebeziumIO_BasicRead (or any pipeline that calls BagState.clear()) on the Samza portable runner crashes with:

java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

Root cause

This is seems to be a binary compatibility bug between two Samza 1.6.0 JARs bundled in the Beam Samza job-server shadow JAR.

samza-api:1.6.0 defines KeyValueStore as a Java interface with deleteAll as a default method:

public interface KeyValueStore<K, V> {
    default void deleteAll(List<K> keys) { ... }
}

samza-kv_2.11:1.6.0 contains CachedStore.scala (Scala 2.11 compiled). Its bytecode uses invokespecial to call KeyValueStore.deleteAll:

invokespecial #417  // Method org/apache/samza/storage/kv/KeyValueStore.deleteAll:(Ljava/util/List;)V

invokespecial is only valid for constructors, private methods, and super-class calls — not for Java interface default methods. The Scala compiler emitted this because at compile time KeyValueStore (or that method) had a different type/form. At runtime on JDK 21, the JVM strictly enforces that interface method calls use invokeinterface, not invokespecial, and throws IncompatibleClassChangeError.

Full stack trace

Caused by: java.lang.IncompatibleClassChangeError:
    Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)'
    must be InterfaceMethodref constant
    at org.apache.samza.storage.kv.CachedStore.deleteAll(CachedStore.scala:253)
    at org.apache.samza.storage.kv.NullSafeKeyValueStore.deleteAll(NullSafeKeyValueStore.scala:72)
    at org.apache.samza.storage.kv.KeyValueStorageEngine.deleteAll(KeyValueStorageEngine.scala:89)
    at org.apache.beam.runners.samza.runtime.SamzaStoreStateInternals$SamzaBagState.clear(SamzaStoreStateInternals.java:510)
    at org.apache.beam.runners.samza.runtime.SamzaDoFnRunners$SdkHarnessDoFnRunner.finishBundle(SamzaDoFnRunners.java:552)
    at org.apache.beam.runners.samza.runtime.PortableDoFnOp$1.onBundleFinished(PortableDoFnOp.java:458)
    at org.apache.beam.runners.samza.runtime.ClassicBundleManager.tryFinishBundle(ClassicBundleManager.java:241)
    at org.apache.beam.runners.samza.runtime.PortableDoFnOp.processElement(PortableDoFnOp.java:304)

Trigger condition

Any portable Samza pipeline that reaches BagState.clear() during finishBundle — including the DebeziumIO cross-language test (TestDebeziumIO_BasicRead). This also affects any pipeline that exercises BagState, SetState, or MapState clear operations via the portable runner path.

Environment

Beam: 2.73.0-SNAPSHOT
Samza: 1.6.0 (samza-api + samza-kv_2.11)
JDK: 21

@tkaymak
Copy link
Contributor Author

tkaymak commented Mar 18, 2026

@Abacn I think it was a nice exercise to dive into the code. However, given that the last Samza release is from 2023, I think the answer is clear. I will close this PR.

@tkaymak tkaymak closed this Mar 18, 2026
@tkaymak tkaymak reopened this Mar 19, 2026
@tkaymak
Copy link
Contributor Author

tkaymak commented Mar 19, 2026

@Abacn - upgrading to 1.8.0 / scala 2.12 (Option 1) seems to have done the trick. The minimal necessary changes to make the test pass have been implemented. I am not sure it would be still worth merging this before the sunsetting of Samza.

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request effectively addresses the SDF checkpoint handling issue in the Samza portable runner, resolving issue #26126. The changes involve updating Samza and Scala dependencies to newer versions, and crucially, implementing a BundleCheckpointHandler to correctly manage SDF residuals and timers. The SdkHarnessDoFnRunner is updated to detect SDF transforms, create an appropriate checkpoint handler, and intercept SDF checkpoint timers to re-process residuals from state. The removal of the TestDebeziumIO_BasicRead filter from the Go integration tests confirms the successful resolution of the underlying problem.

@tkaymak tkaymak changed the title Fix SDF checkpoint handling in Samza portable runner (resolves #26126) Fix SDF checkpoint handling in and upgrade of Samza portable runner (resolves #26126) Mar 19, 2026
@tkaymak
Copy link
Contributor Author

tkaymak commented Mar 19, 2026

@Abacn I did use .github/trigger_files/beam_PostCommit_XVR_Samza.json to push a commit to trigger the PostCommit test for this PR and it looks ok to me. ✅
/gemini review

@tkaymak tkaymak changed the title Fix SDF checkpoint handling in and upgrade of Samza portable runner (resolves #26126) [Runner] Fix SDF checkpoint handling in and upgrade of Samza portable runner (resolves #26126) Mar 19, 2026
@Abacn
Copy link
Contributor

Abacn commented Mar 19, 2026

It appears some change broke ParDo.

Tested Samza ValidatesRunner tests on these PR:

(Java) https://github.com/apache/beam/actions/runs/23321184640

(Python) https://github.com/apache/beam/actions/runs/23321199760

(Go) https://github.com/apache/beam/actions/runs/23321177775

test_batch_pardo (apache_beam.runners.portability.samza_runner_test.SamzaRunnerTest) failed

Failed assert: [6, 12, 18] == ['a', 'b']

['a', 'b'] is from other tests.


non-portable Samza runner is fine: https://github.com/apache/beam/actions/runs/23321191973

Comment on lines +504 to +512
WindowedValue<InT> residual =
stateInternals
.state(
StateNamespaces.window(windowCoder, window),
StateTags.value(timerId, windowedValueCoder))
.read();
if (residual != null) {
processElement(residual);
}
Copy link
Contributor

@Abacn Abacn Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
WindowedValue<InT> residual =
stateInternals
.state(
StateNamespaces.window(windowCoder, window),
StateTags.value(timerId, windowedValueCoder))
.read();
if (residual != null) {
processElement(residual);
}
org.apache.beam.sdk.state.ValueState<WindowedValue<InT>> residualState =
stateInternals.state(
StateNamespaces.window(windowCoder, window),
StateTags.value(timerId, windowedValueCoder));
WindowedValue<InT> residual = residualState.read();
if (residual != null) {
residualState.clear();
processElement(residual);
}

Gemini was able to find this and tests looks good

edit: actually it doesn't help. Run single test would pass, but tests are interfering with each other

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's an issue in Samza 1.8.0. Reverting SamzaDoFnRunners.java the test fails as well. Since Samza is already an inactive project (no new release for 3 years) it's not likely get fixed. Let's close this for now.

@tkaymak tkaymak closed this Mar 20, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants