Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_XVR_Samza.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"modification": 2}
{"modification": 3}
16 changes: 8 additions & 8 deletions runners/samza/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ configurations {
validatesRunner
}

def samza_version = "1.6.0"
def samza_version = "1.8.0"

dependencies {
implementation library.java.vendored_guava_32_1_2_jre
Expand All @@ -55,14 +55,14 @@ dependencies {
implementation library.java.commons_io
implementation library.java.commons_collections
runtimeOnly "org.rocksdb:rocksdbjni:6.15.2"
runtimeOnly "org.scala-lang:scala-library:2.11.8"
runtimeOnly "org.scala-lang:scala-library:2.12.20"
implementation "org.apache.samza:samza-api:$samza_version"
implementation "org.apache.samza:samza-core_2.11:$samza_version"
runtimeOnly "org.apache.samza:samza-kafka_2.11:$samza_version"
runtimeOnly "org.apache.samza:samza-kv_2.11:$samza_version"
implementation "org.apache.samza:samza-kv-rocksdb_2.11:$samza_version"
implementation "org.apache.samza:samza-kv-inmemory_2.11:$samza_version"
implementation "org.apache.samza:samza-yarn_2.11:$samza_version"
implementation "org.apache.samza:samza-core_2.12:$samza_version"
runtimeOnly "org.apache.samza:samza-kafka_2.12:$samza_version"
runtimeOnly "org.apache.samza:samza-kv_2.12:$samza_version"
implementation "org.apache.samza:samza-kv-rocksdb_2.12:$samza_version"
implementation "org.apache.samza:samza-kv-inmemory_2.12:$samza_version"
implementation "org.apache.samza:samza-yarn_2.12:$samza_version"
compileOnly library.java.error_prone_annotations
runtimeOnly "org.apache.kafka:kafka-clients:2.0.1"
implementation library.java.vendored_grpc_1_69_0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.StateNamespaces;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.StatefulDoFnRunner;
import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.TimerInternalsFactory;
import org.apache.beam.runners.fnexecution.control.BundleCheckpointHandler;
import org.apache.beam.runners.fnexecution.control.BundleCheckpointHandlers;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
Expand All @@ -58,6 +62,7 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
import org.apache.beam.sdk.util.construction.PTransformTranslation;
import org.apache.beam.sdk.util.construction.Timer;
import org.apache.beam.sdk.util.construction.graph.ExecutableStage;
import org.apache.beam.sdk.util.construction.graph.PipelineNode;
Expand Down Expand Up @@ -236,6 +241,19 @@ public static <InT, FnOutT> DoFnRunner<InT, FnOutT> createPortable(
sideInputMapping,
sideInputHandler);

final Coder<BoundedWindow> windowCoder =
WindowUtils.getWindowStrategy(
executableStage.getInputPCollection().getId(), executableStage.getComponents())
.getWindowFn()
.windowCoder();
final BundleCheckpointHandler bundleCheckpointHandler =
createBundleCheckpointHandler(
executableStage,
nonKeyedStateInternalsFactory,
timerInternalsFactory,
windowedValueCoder,
windowCoder);

final SamzaExecutionContext executionContext =
(SamzaExecutionContext) context.getApplicationContainerContext();
final DoFnRunner<InT, FnOutT> underlyingRunner =
Expand All @@ -251,13 +269,47 @@ public static <InT, FnOutT> DoFnRunner<InT, FnOutT> createPortable(
bundledEventsBag,
stateRequestHandler,
samzaExecutionContext,
executableStage.getTransforms());
executableStage.getTransforms(),
bundleCheckpointHandler,
nonKeyedStateInternalsFactory,
windowedValueCoder,
windowCoder);
return pipelineOptions.getEnableMetrics()
? DoFnRunnerWithMetrics.wrap(
underlyingRunner, executionContext.getMetricsContainer(), transformFullName)
: underlyingRunner;
}

private static boolean hasSDF(ExecutableStage executableStage) {
return executableStage.getTransforms().stream()
.map(transform -> transform.getTransform().getSpec().getUrn())
.anyMatch(
PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN::equals);
}

private static <InT> BundleCheckpointHandler createBundleCheckpointHandler(
ExecutableStage executableStage,
SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
SamzaTimerInternalsFactory<?> timerInternalsFactory,
Coder<WindowedValue<InT>> windowedValueCoder,
Coder<BoundedWindow> windowCoder) {
if (!hasSDF(executableStage)) {
return response -> {
throw new UnsupportedOperationException(
"Self-checkpoint is only supported on splittable DoFn.");
};
}
// For SDF in a non-keyed context, we always use null as the state/timer key.
// Factories are typed as <InT> so the handler's type parameter matches the coder,
// avoiding any unchecked cast.
StateInternalsFactory<InT> sdfStateFactory =
key -> nonKeyedStateInternalsFactory.stateInternalsForKey(null);
TimerInternalsFactory<InT> sdfTimerFactory =
key -> timerInternalsFactory.timerInternalsForKey(null);
return new BundleCheckpointHandlers.StateAndTimerBundleCheckpointHandler<>(
sdfTimerFactory, sdfStateFactory, windowedValueCoder, windowCoder);
}

static class SdkHarnessDoFnRunner<InT, FnOutT> implements DoFnRunner<InT, FnOutT> {

private static final int DEFAULT_METRIC_SAMPLE_RATE = 100;
Expand All @@ -277,6 +329,10 @@ static class SdkHarnessDoFnRunner<InT, FnOutT> implements DoFnRunner<InT, FnOutT
private long startBundleTime;
private final String stepName;
private final Collection<PipelineNode.PTransformNode> pTransformNodes;
private final BundleCheckpointHandler bundleCheckpointHandler;
private final SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory;
private final Coder<WindowedValue<InT>> windowedValueCoder;
private final Coder<BoundedWindow> windowCoder;

private SdkHarnessDoFnRunner(
SamzaPipelineOptions pipelineOptions,
Expand All @@ -289,7 +345,11 @@ private SdkHarnessDoFnRunner(
BagState<WindowedValue<InT>> bundledEventsBag,
StateRequestHandler stateRequestHandler,
SamzaExecutionContext samzaExecutionContext,
Collection<PipelineNode.PTransformNode> pTransformNodes) {
Collection<PipelineNode.PTransformNode> pTransformNodes,
BundleCheckpointHandler bundleCheckpointHandler,
SamzaStoreStateInternals.Factory<?> nonKeyedStateInternalsFactory,
Coder<WindowedValue<InT>> windowedValueCoder,
Coder<BoundedWindow> windowCoder) {
this.pipelineOptions = pipelineOptions;
this.timerInternalsFactory = timerInternalsFactory;
this.windowingStrategy = windowingStrategy;
Expand All @@ -301,6 +361,10 @@ private SdkHarnessDoFnRunner(
this.samzaExecutionContext = samzaExecutionContext;
this.stepName = stepName;
this.pTransformNodes = pTransformNodes;
this.bundleCheckpointHandler = bundleCheckpointHandler;
this.nonKeyedStateInternalsFactory = nonKeyedStateInternalsFactory;
this.windowedValueCoder = windowedValueCoder;
this.windowCoder = windowCoder;
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -328,7 +392,6 @@ public FnDataReceiver<FnOutT> create(String pCollectionId) {
}
};

final Coder<BoundedWindow> windowCoder = windowingStrategy.getWindowFn().windowCoder();
final TimerReceiverFactory timerReceiverFactory =
new TimerReceiverFactory(stageBundleFactory, this::timerDataConsumer, windowCoder);

Expand All @@ -350,7 +413,9 @@ public FnDataReceiver<FnOutT> create(String pCollectionId) {
receiverFactory,
timerReceiverFactory,
stateRequestHandler,
samzaMetricsBundleProgressHandler);
samzaMetricsBundleProgressHandler,
null,
bundleCheckpointHandler);

startBundleTime = getStartBundleTime();

Expand Down Expand Up @@ -433,6 +498,20 @@ public <KeyT> void onTimer(
Instant outputTimestamp,
TimeDomain timeDomain,
CausedByDrain causedByDrain) {
// SDF checkpoint timers are handled by loading the stored residual and re-processing it.
if (BundleCheckpointHandlers.StateAndTimerBundleCheckpointHandler.isSdfTimer(timerId)) {
StateInternals stateInternals = nonKeyedStateInternalsFactory.stateInternalsForKey(null);
WindowedValue<InT> residual =
stateInternals
.state(
StateNamespaces.window(windowCoder, window),
StateTags.value(timerId, windowedValueCoder))
.read();
if (residual != null) {
processElement(residual);
}
Comment on lines +504 to +512
Copy link
Contributor

@Abacn Abacn Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
WindowedValue<InT> residual =
stateInternals
.state(
StateNamespaces.window(windowCoder, window),
StateTags.value(timerId, windowedValueCoder))
.read();
if (residual != null) {
processElement(residual);
}
org.apache.beam.sdk.state.ValueState<WindowedValue<InT>> residualState =
stateInternals.state(
StateNamespaces.window(windowCoder, window),
StateTags.value(timerId, windowedValueCoder));
WindowedValue<InT> residual = residualState.read();
if (residual != null) {
residualState.clear();
processElement(residual);
}

Gemini was able to find this and tests looks good

edit: actually it doesn't help. Run single test would pass, but tests are interfering with each other

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's an issue in Samza 1.8.0. Reverting SamzaDoFnRunners.java the test fails as well. Since Samza is already an inactive project (no new release for 3 years) it's not likely get fixed. Let's close this for now.

return;
}
final KV<String, String> timerReceiverKey =
TimerReceiverFactory.decodeTimerDataTimerId(timerFamilyId);
final FnDataReceiver<Timer> timerReceiver =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@
import org.apache.samza.storage.kv.KeyValueStoreMetrics;
import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStorageEngineFactory;
import org.apache.samza.storage.kv.inmemory.InMemoryKeyValueStore;
import org.apache.samza.system.SystemStreamPartition;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -309,11 +308,10 @@ public void processElement(
public static class TestStorageEngine extends InMemoryKeyValueStorageEngineFactory {

@Override
public KeyValueStore<byte[], byte[]> getKVStore(
protected KeyValueStore<byte[], byte[]> getKVStore(
String storeName,
File storeDir,
MetricsRegistry registry,
SystemStreamPartition changeLogSystemStreamPartition,
JobContext jobContext,
ContainerContext containerContext,
StorageEngineFactory.StoreMode readWrite) {
Expand Down
3 changes: 0 additions & 3 deletions sdks/go/test/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,9 +237,6 @@ var samzaFilters = []string{
"TestMapStateClear",
"TestSetState",
"TestSetStateClear",
// TODO(https://github.com/apache/beam/issues/26126): Java runner issue (AcitveBundle has no regsitered handler)
"TestDebeziumIO_BasicRead",

// Samza does not support state.
"TestTimers.*",
"TestBagStateBlindWrite",
Expand Down
Loading