@@ -112,20 +112,30 @@ public WindowedValue<InputT> apply(BoundedWindow window) {
112112 });
113113 }});
114114
115+ Iterable <WindowedValue <InputT >> concatElements = Iterables .concat (windowsExpandedElements );
116+
117+ // Bump the counter separately since we don't want multiple iterations to
118+ // increase it multiple times.
119+ for (WindowedValue <InputT > input : concatElements ) {
120+ BoundedWindow window = Iterables .getOnlyElement (input .getWindows ());
121+ if (canDropDueToExpiredWindow (window )) {
122+ // The element is too late for this window.
123+ droppedDueToLateness .addValue (1L );
124+ WindowTracing .debug (
125+ "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
126+ + "since too far behind inputWatermark:{}; outputWatermark:{}" ,
127+ input .getTimestamp (), key , window , timerInternals .currentInputWatermarkTime (),
128+ timerInternals .currentOutputWatermarkTime ());
129+ }
130+ }
131+
115132 Iterable <WindowedValue <InputT >> nonLateElements = Iterables .filter (
116- Iterables . concat ( windowsExpandedElements ) ,
133+ concatElements ,
117134 new Predicate <WindowedValue <InputT >>() {
118135 @ Override
119136 public boolean apply (WindowedValue <InputT > input ) {
120137 BoundedWindow window = Iterables .getOnlyElement (input .getWindows ());
121138 if (canDropDueToExpiredWindow (window )) {
122- // The element is too late for this window.
123- droppedDueToLateness .addValue (1L );
124- WindowTracing .debug (
125- "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
126- + "since too far behind inputWatermark:{}; outputWatermark:{}" ,
127- input .getTimestamp (), key , window , timerInternals .currentInputWatermarkTime (),
128- timerInternals .currentOutputWatermarkTime ());
129139 return false ;
130140 } else {
131141 return true ;
0 commit comments