Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit 3a6dde7

Browse files
committed
Only increase counters in LateDroppingDoFnRunner once, even if iterated multiple times.
1 parent 4805db9 commit 3a6dde7

File tree

1 file changed

+18
-8
lines changed

1 file changed

+18
-8
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -112,20 +112,30 @@ public WindowedValue<InputT> apply(BoundedWindow window) {
112112
});
113113
}});
114114

115+
Iterable<WindowedValue<InputT>> concatElements = Iterables.concat(windowsExpandedElements);
116+
117+
// Bump the counter separately since we don't want multiple iterations to
118+
// increase it multiple times.
119+
for (WindowedValue<InputT> input : concatElements) {
120+
BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
121+
if (canDropDueToExpiredWindow(window)) {
122+
// The element is too late for this window.
123+
droppedDueToLateness.addValue(1L);
124+
WindowTracing.debug(
125+
"ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
126+
+ "since too far behind inputWatermark:{}; outputWatermark:{}",
127+
input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
128+
timerInternals.currentOutputWatermarkTime());
129+
}
130+
}
131+
115132
Iterable<WindowedValue<InputT>> nonLateElements = Iterables.filter(
116-
Iterables.concat(windowsExpandedElements),
133+
concatElements,
117134
new Predicate<WindowedValue<InputT>>() {
118135
@Override
119136
public boolean apply(WindowedValue<InputT> input) {
120137
BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
121138
if (canDropDueToExpiredWindow(window)) {
122-
// The element is too late for this window.
123-
droppedDueToLateness.addValue(1L);
124-
WindowTracing.debug(
125-
"ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
126-
+ "since too far behind inputWatermark:{}; outputWatermark:{}",
127-
input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
128-
timerInternals.currentOutputWatermarkTime());
129139
return false;
130140
} else {
131141
return true;

0 commit comments

Comments
 (0)