Skip to content

Commit 2209bd6

Browse files
authored
Adds bundleFinalizer support to Dataflow non-portable worker. (#37723)
* Adds bundleFinalizer support to non-portable worker. * Removes check preventing stateful DoFn's with bundle finalizers from running on Dataflow streaming non-portable worker when using Streaming Engine
1 parent 153875e commit 2209bd6

File tree

13 files changed

+544
-63
lines changed

13 files changed

+544
-63
lines changed
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run!",
3-
"modification": 3,
3+
"modification": 1,
44
}
5-
5+
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run!",
3-
"modification": 1,
3+
"modification": 6,
44
}

runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -311,9 +311,14 @@ public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT,
311311
public String getErrorContext() {
312312
return "SimpleDoFnRunner/StartBundle";
313313
}
314+
315+
@Override
316+
public BundleFinalizer bundleFinalizer() {
317+
return stepContext.bundleFinalizer();
318+
}
314319
}
315320

316-
/** An {@link DoFnInvoker.ArgumentProvider} for {@link DoFn.StartBundle @StartBundle}. */
321+
/** An {@link DoFnInvoker.ArgumentProvider} for {@link DoFn.FinishBundle @FinishBundle}. */
317322
private class DoFnFinishBundleArgumentProvider
318323
extends DoFnInvoker.BaseArgumentProvider<InputT, OutputT> {
319324
/** A concrete implementation of {@link DoFn.FinishBundleContext}. */
@@ -356,6 +361,11 @@ public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
356361
public String getErrorContext() {
357362
return "SimpleDoFnRunner/FinishBundle";
358363
}
364+
365+
@Override
366+
public BundleFinalizer bundleFinalizer() {
367+
return stepContext.bundleFinalizer();
368+
}
359369
}
360370

361371
/**
@@ -1030,7 +1040,7 @@ public <T> void outputWindowedValue(
10301040
@Override
10311041
public BundleFinalizer bundleFinalizer() {
10321042
throw new UnsupportedOperationException(
1033-
"Bundle finalization is not supported in non-portable pipelines.");
1043+
"Bundle finalization is not supported in OnTimer calls.");
10341044
}
10351045
}
10361046

@@ -1289,7 +1299,7 @@ public <T> void outputWindowedValue(
12891299
@Override
12901300
public BundleFinalizer bundleFinalizer() {
12911301
throw new UnsupportedOperationException(
1292-
"Bundle finalization is not supported in non-portable pipelines.");
1302+
"Bundle finalization is not supported in OnWindowExpiration calls.");
12931303
}
12941304
}
12951305

runners/google-cloud-dataflow-java/build.gradle

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,6 @@ def commonLegacyExcludeCategories = [
205205
'org.apache.beam.sdk.testing.UsesGaugeMetrics',
206206
'org.apache.beam.sdk.testing.UsesTestStream',
207207
'org.apache.beam.sdk.testing.UsesMetricsPusher',
208-
'org.apache.beam.sdk.testing.UsesBundleFinalizer',
209208
'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics', // Dataflow QM as of now does not support returning back BoundedTrie in metric result.
210209
]
211210

@@ -456,7 +455,9 @@ task validatesRunner {
456455
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful',
457456
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle',
458457
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful',
459-
]
458+
],
459+
// Batch legacy worker does not support bundle finalization.
460+
excludedCategories: [ 'org.apache.beam.sdk.testing.UsesBundleFinalizer', ],
460461
))
461462
}
462463

@@ -490,6 +491,8 @@ task validatesRunnerStreaming {
490491
description "Validates Dataflow runner forcing streaming mode"
491492
dependsOn(createLegacyWorkerValidatesRunnerTest(validatesRunnerStreamingConfig + [
492493
name: 'validatesRunnerLegacyWorkerTestStreaming',
494+
// Streaming appliance currently fails bundle finalizer tests.
495+
excludedCategories: validatesRunnerStreamingConfig.excludedCategories + [ 'org.apache.beam.sdk.testing.UsesBundleFinalizer', ],
493496
]))
494497
}
495498

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2737,11 +2737,11 @@ static void verifyDoFnSupported(
27372737
DataflowRunner.class.getSimpleName()));
27382738
}
27392739
boolean isUnifiedWorker = useUnifiedWorker(options);
2740-
if (DoFnSignatures.usesBundleFinalizer(fn) && !isUnifiedWorker) {
2740+
if (DoFnSignatures.usesBundleFinalizer(fn) && !isUnifiedWorker && !streaming) {
27412741
throw new UnsupportedOperationException(
27422742
String.format(
2743-
"%s does not currently support %s when not using unified worker because it uses "
2744-
+ "BundleFinalizers in its implementation. Set the `--experiments=use_runner_v2` "
2743+
"%s does not currently support %s in batch mode when not using unified worker because it "
2744+
+ "uses BundleFinalizers in its implementation. Set the `--experiments=use_runner_v2` "
27452745
+ "option to use this DoFn.",
27462746
DataflowRunner.class.getSimpleName(), fn.getClass().getSimpleName()));
27472747
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,7 @@ public DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> crea
156156
// in the event of a crash.
157157
10000,
158158
Duration.standardSeconds(10),
159-
() -> {
160-
throw new UnsupportedOperationException(
161-
"BundleFinalizer unsupported by non-portable Dataflow.");
162-
}));
159+
stepContext::bundleFinalizer));
163160
DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> simpleRunner =
164161
new SimpleDoFnRunner<>(
165162
options,

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.concurrent.ThreadLocalRandom;
3636
import java.util.concurrent.atomic.AtomicLong;
3737
import javax.annotation.concurrent.NotThreadSafe;
38+
import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
3839
import org.apache.beam.runners.core.SideInputReader;
3940
import org.apache.beam.runners.core.StateInternals;
4041
import org.apache.beam.runners.core.StateNamespace;
@@ -73,6 +74,7 @@
7374
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
7475
import org.apache.beam.sdk.metrics.MetricsContainer;
7576
import org.apache.beam.sdk.state.TimeDomain;
77+
import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
7678
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
7779
import org.apache.beam.sdk.util.ByteStringOutputStream;
7880
import org.apache.beam.sdk.values.PCollectionView;
@@ -82,8 +84,10 @@
8284
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
8385
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
8486
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashBasedTable;
87+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
8588
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
8689
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
90+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
8791
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators;
8892
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.PeekingIterator;
8993
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
@@ -446,11 +450,27 @@ public void invalidateCache() {
446450
}
447451
}
448452

449-
public Map<Long, Runnable> flushState() {
450-
Map<Long, Runnable> callbacks = new HashMap<>();
453+
public Map<Long, Pair<Instant, Runnable>> flushState() {
454+
Map<Long, Pair<Instant, Runnable>> callbacks = new HashMap<>();
451455

452456
for (StepContext stepContext : getAllStepContexts()) {
453457
stepContext.flushState();
458+
for (Pair<Instant, BundleFinalizer.Callback> bundleFinalizer :
459+
stepContext.flushBundleFinalizerCallbacks()) {
460+
long id = ThreadLocalRandom.current().nextLong();
461+
callbacks.put(
462+
id,
463+
Pair.of(
464+
bundleFinalizer.getLeft(),
465+
() -> {
466+
try {
467+
bundleFinalizer.getRight().onBundleSuccess();
468+
} catch (Exception e) {
469+
throw new RuntimeException("Exception while running bundle finalizer", e);
470+
}
471+
}));
472+
outputBuilder.addFinalizeIds(id);
473+
}
454474
}
455475

456476
if (activeReader != null) {
@@ -462,13 +482,15 @@ public Map<Long, Runnable> flushState() {
462482
sourceStateBuilder.addFinalizeIds(id);
463483
callbacks.put(
464484
id,
465-
() -> {
466-
try {
467-
checkpointMark.finalizeCheckpoint();
468-
} catch (IOException e) {
469-
throw new RuntimeException("Exception while finalizing checkpoint", e);
470-
}
471-
});
485+
Pair.of(
486+
Instant.now().plus(Duration.standardMinutes(5)),
487+
() -> {
488+
try {
489+
checkpointMark.finalizeCheckpoint();
490+
} catch (IOException e) {
491+
throw new RuntimeException("Exception while finalizing checkpoint", e);
492+
}
493+
}));
472494

473495
@SuppressWarnings("unchecked")
474496
Coder<UnboundedSource.CheckpointMark> checkpointCoder =
@@ -699,6 +721,11 @@ public <W extends BoundedWindow> void setStateCleanupTimer(
699721
public DataflowStepContext namespacedToUser() {
700722
return this;
701723
}
724+
725+
@Override
726+
public BundleFinalizer bundleFinalizer() {
727+
return wrapped.bundleFinalizer();
728+
}
702729
}
703730

704731
/** A {@link SideInputReader} that fetches side inputs from the streaming worker's cache. */
@@ -771,6 +798,7 @@ class StepContext extends DataflowExecutionContext.DataflowStepContext
771798
// A list of timer keys that were modified by user processing earlier in this bundle. This
772799
// serves a tombstone, so that we know not to fire any bundle timers that were modified.
773800
private Table<String, StateNamespace, TimerData> modifiedUserTimerKeys = null;
801+
private final WindmillBundleFinalizer bundleFinalizer = new WindmillBundleFinalizer();
774802

775803
public StepContext(DataflowOperationContext operationContext) {
776804
super(operationContext.nameContext());
@@ -1043,9 +1071,37 @@ public TimerInternals timerInternals() {
10431071
return checkNotNull(systemTimerInternals);
10441072
}
10451073

1074+
@Override
1075+
public BundleFinalizer bundleFinalizer() {
1076+
return bundleFinalizer;
1077+
}
1078+
10461079
public TimerInternals userTimerInternals() {
10471080
ensureStateful("Tried to access user timers");
10481081
return checkNotNull(userTimerInternals);
10491082
}
1083+
1084+
public ImmutableList<Pair<Instant, BundleFinalizer.Callback>> flushBundleFinalizerCallbacks() {
1085+
return bundleFinalizer.flushCallbacks();
1086+
}
1087+
}
1088+
1089+
private static class WindmillBundleFinalizer implements BundleFinalizer {
1090+
private ImmutableList.Builder<Pair<Instant, Callback>> callbacks = ImmutableList.builder();
1091+
1092+
private WindmillBundleFinalizer() {}
1093+
1094+
private ImmutableList<Pair<Instant, Callback>> flushCallbacks() {
1095+
ImmutableList<Pair<Instant, Callback>> flushedCallbacks = callbacks.build();
1096+
if (!Iterables.isEmpty(flushedCallbacks)) {
1097+
callbacks = ImmutableList.builder();
1098+
}
1099+
return flushedCallbacks;
1100+
}
1101+
1102+
@Override
1103+
public void afterBundleCommit(Instant callbackExpiry, Callback callback) {
1104+
callbacks.add(Pair.of(callbackExpiry, callback));
1105+
}
10501106
}
10511107
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,24 +54,27 @@ final class GetWorkResponseChunkAssembler {
5454
private final WorkItem.Builder workItemBuilder; // Reused to reduce GC overhead.
5555
private ByteString data;
5656
private long bufferedSize;
57+
private final List<Long> appliedFinalizeIds;
5758

5859
GetWorkResponseChunkAssembler() {
5960
workTimingInfosTracker = new GetWorkTimingInfosTracker(System::currentTimeMillis);
6061
data = ByteString.EMPTY;
6162
bufferedSize = 0;
6263
metadata = null;
6364
workItemBuilder = WorkItem.newBuilder();
65+
appliedFinalizeIds = new ArrayList<>();
6466
}
6567

6668
/**
67-
* Appends the response chunk bytes to the {@link #data }byte buffer. Return the assembled
69+
* Appends the response chunk bytes to the {@link #data} byte buffer. Return the assembled
6870
* WorkItem if all response chunks for a WorkItem have been received.
6971
*/
7072
List<AssembledWorkItem> append(Windmill.StreamingGetWorkResponseChunk chunk) {
7173
if (chunk.hasComputationMetadata()) {
7274
metadata = ComputationMetadata.fromProto(chunk.getComputationMetadata());
7375
}
7476
workTimingInfosTracker.addTimingInfo(chunk.getPerWorkItemTimingInfosList());
77+
appliedFinalizeIds.addAll(chunk.getAppliedFinalizeIdsList());
7578

7679
List<AssembledWorkItem> response = new ArrayList<>();
7780
for (int i = 0; i < chunk.getSerializedWorkItemList().size(); i++) {
@@ -90,13 +93,14 @@ List<AssembledWorkItem> append(Windmill.StreamingGetWorkResponseChunk chunk) {
9093
}
9194

9295
/**
93-
* Attempt to flush the {@link #data} bytes into a {@link WorkItem} w/ it's metadata. Resets the
96+
* Attempt to flush the {@link #data} bytes into a {@link WorkItem} w/ its metadata. Resets the
9497
* data byte string and tracking metadata afterwards, whether the {@link WorkItem} deserialization
9598
* was successful or not.
9699
*/
97100
private Optional<AssembledWorkItem> flushToWorkItem() {
98101
try {
99102
workItemBuilder.mergeFrom(data);
103+
workItemBuilder.addAllAppliedFinalizeIds(appliedFinalizeIds);
100104
return Optional.of(
101105
AssembledWorkItem.create(
102106
workItemBuilder.build(),
@@ -110,6 +114,7 @@ private Optional<AssembledWorkItem> flushToWorkItem() {
110114
workTimingInfosTracker.reset();
111115
data = ByteString.EMPTY;
112116
bufferedSize = 0;
117+
appliedFinalizeIds.clear();
113118
}
114119

115120
return Optional.empty();

0 commit comments

Comments
 (0)