Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.

Commit 40ff148

Browse files
authored
Merge pull request #493 from scwhittle/reduce_fn_prefetching
Improve ReduceFnRunner prefetching
2 parents 4805db9 + 75eb794 commit 40ff148

File tree

9 files changed

+407
-227
lines changed

9 files changed

+407
-227
lines changed

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BatchTimerInternals.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import org.joda.time.Instant;
2525

26+
import java.util.ArrayList;
2627
import java.util.HashSet;
2728
import java.util.PriorityQueue;
2829
import java.util.Set;
@@ -124,16 +125,19 @@ private void advance(ReduceFnRunner<?, ?, ?, ?> runner, Instant newTime, TimeDom
124125
PriorityQueue<TimerData> timers = queue(domain);
125126
boolean shouldFire = false;
126127

127-
do {
128-
TimerData timer = timers.peek();
129-
// Timers fire if the new time is ahead of the timer
130-
shouldFire = timer != null && newTime.isAfter(timer.getTimestamp());
131-
if (shouldFire) {
132-
// Remove before firing, so that if the trigger adds another identical
128+
while (true) {
129+
ArrayList<TimerData> firedTimers = new ArrayList();
130+
while (!timers.isEmpty() && newTime.isAfter(timers.peek().getTimestamp())) {
131+
// Remove before firing, so that if the callback adds another identical
133132
// timer we don't remove it.
134-
timers.remove();
135-
runner.onTimer(timer);
133+
TimerData timer = timers.remove();
134+
existingTimers.remove(timer);
135+
firedTimers.add(timer);
136136
}
137-
} while (shouldFire);
137+
if (firedTimers.isEmpty()) {
138+
break;
139+
}
140+
runner.onTimers(firedTimers);
141+
}
138142
}
139143
}

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/GroupAlsoByWindowViaWindowSetDoFn.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.google.cloud.dataflow.sdk.transforms.Sum;
2121
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
2222
import com.google.cloud.dataflow.sdk.util.DoFnRunner.ReduceFnExecutor;
23-
import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
2423
import com.google.cloud.dataflow.sdk.util.state.StateInternals;
2524
import com.google.cloud.dataflow.sdk.values.KV;
2625

@@ -81,9 +80,7 @@ public void processElement(ProcessContext c) throws Exception {
8180
c.getPipelineOptions());
8281

8382
reduceFnRunner.processElements(element.elementsIterable());
84-
for (TimerData timer : element.timersIterable()) {
85-
reduceFnRunner.onTimer(timer);
86-
}
83+
reduceFnRunner.onTimers(element.timersIterable());
8784
reduceFnRunner.persist();
8885
}
8986

@@ -101,4 +98,3 @@ public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() {
10198
return droppedDueToLateness;
10299
}
103100
}
104-

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ public void clear(StateAccessor<?> state) {
5151
state.access(PANE_INFO_TAG).clear();
5252
}
5353

54+
public void prefetchPaneInfo(ReduceFn<?, ?, ?, ?>.Context context) {
55+
context.state().access(PaneInfoTracker.PANE_INFO_TAG).readLater();
56+
}
57+
5458
/**
5559
* Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane
5660
* info includes the timing for the pane, who's calculation is quite subtle.

sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnContextFactory.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
2525
import com.google.cloud.dataflow.sdk.util.TimerInternals.TimerData;
2626
import com.google.cloud.dataflow.sdk.util.state.MergingStateAccessor;
27-
import com.google.cloud.dataflow.sdk.util.state.ReadableState;
2827
import com.google.cloud.dataflow.sdk.util.state.State;
2928
import com.google.cloud.dataflow.sdk.util.state.StateAccessor;
3029
import com.google.cloud.dataflow.sdk.util.state.StateContext;
@@ -99,7 +98,7 @@ public ReduceFn<K, InputT, OutputT, W>.ProcessValueContext forValue(
9998
}
10099

101100
public ReduceFn<K, InputT, OutputT, W>.OnTriggerContext forTrigger(W window,
102-
ReadableState<PaneInfo> pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
101+
PaneInfo pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
103102
return new OnTriggerContextImpl(stateAccessor(window, style), pane, callbacks);
104103
}
105104

@@ -371,11 +370,11 @@ public Timers timers() {
371370

372371
private class OnTriggerContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnTriggerContext {
373372
private final StateAccessorImpl<K, W> state;
374-
private final ReadableState<PaneInfo> pane;
373+
private final PaneInfo pane;
375374
private final OnTriggerCallbacks<OutputT> callbacks;
376375
private final TimersImpl timers;
377376

378-
private OnTriggerContextImpl(StateAccessorImpl<K, W> state, ReadableState<PaneInfo> pane,
377+
private OnTriggerContextImpl(StateAccessorImpl<K, W> state, PaneInfo pane,
379378
OnTriggerCallbacks<OutputT> callbacks) {
380379
reduceFn.super();
381380
this.state = state;
@@ -406,7 +405,7 @@ public StateAccessor<K> state() {
406405

407406
@Override
408407
public PaneInfo paneInfo() {
409-
return pane.read();
408+
return pane;
410409
}
411410

412411
@Override

0 commit comments

Comments
 (0)