Skip to content

Commit 2f69f04

Browse files
committed
spotless
1 parent 2f96e7d commit 2f69f04

File tree

2 files changed

+17
-29
lines changed

2 files changed

+17
-29
lines changed

runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -478,8 +478,6 @@ public String getErrorContext() {
478478
watermarkEstimatorStateT = watermarkEstimatorState.read();
479479
}
480480

481-
// elementAndRestriction should be patched with drain info
482-
483481
final WatermarkEstimator<WatermarkEstimatorStateT> watermarkEstimator =
484482
invoker.invokeNewWatermarkEstimator(
485483
new BaseArgumentProvider<InputT, OutputT>() {
@@ -613,14 +611,14 @@ public String getErrorContext() {
613611
timerInternals.currentProcessingTime().plus(result.getContinuation().resumeDelay());
614612
holdState.add(futureOutputWatermark);
615613
// Set a timer to continue processing this element, but only when no drain
616-
if(timer.causedByDrain() == CausedByDrain.NORMAL) {
614+
if (timer == null || timer.causedByDrain() == CausedByDrain.NORMAL) {
617615
timerInternals.setTimer(
618-
TimerInternals.TimerData.of(
619-
stateNamespace,
620-
wakeupTime,
621-
wakeupTime,
622-
TimeDomain.PROCESSING_TIME,
623-
timer == null ? CausedByDrain.NORMAL : timer.causedByDrain()));
616+
TimerInternals.TimerData.of(
617+
stateNamespace,
618+
wakeupTime,
619+
wakeupTime,
620+
TimeDomain.PROCESSING_TIME,
621+
CausedByDrain.NORMAL));
624622
}
625623
}
626624

sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1804,11 +1804,6 @@ public <T> void outputWindowedValue(
18041804
outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo));
18051805
}
18061806

1807-
@Override
1808-
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
1809-
return currentElement.causedByDrain();
1810-
}
1811-
18121807
@Override
18131808
public State state(String stateId, boolean alwaysFetched) {
18141809
StateDeclaration stateDeclaration = doFnSignature.stateDeclarations().get(stateId);
@@ -1860,11 +1855,6 @@ public TimerMap timerFamily(String timerFamilyId) {
18601855
currentElement.getTimestamp(),
18611856
currentElement.getPaneInfo());
18621857
}
1863-
1864-
@Override
1865-
public CausedByDrain causedByDrain() {
1866-
return currentElement.causedByDrain();
1867-
}
18681858
}
18691859

18701860
/** Provides arguments for a {@link DoFnInvoker} for a non-window observing method. */
@@ -1946,16 +1936,6 @@ public <T> void outputWindowedValue(
19461936
}
19471937
outputTo(consumer, WindowedValues.of(output, timestamp, windows, paneInfo));
19481938
}
1949-
1950-
@Override
1951-
public CausedByDrain causedByDrain() {
1952-
return currentElement.causedByDrain();
1953-
}
1954-
1955-
@Override
1956-
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
1957-
return currentElement.causedByDrain();
1958-
}
19591939
}
19601940

19611941
/** Provides base arguments for a {@link DoFnInvoker} for a non-window observing method. */
@@ -2254,6 +2234,16 @@ public Object watermarkEstimatorState() {
22542234
public WatermarkEstimator<?> watermarkEstimator() {
22552235
return currentWatermarkEstimator;
22562236
}
2237+
2238+
@Override
2239+
public CausedByDrain causedByDrain() {
2240+
return currentElement.causedByDrain();
2241+
}
2242+
2243+
@Override
2244+
public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
2245+
return currentElement.causedByDrain();
2246+
}
22572247
}
22582248

22592249
/**

0 commit comments

Comments
 (0)