Skip to content

Commit 8f19630

Browse files
committed
Defer broadcast join probe leaf splits
Avoids starting join probe leaf splits for tasks that are awaiting the build side completion. Otherwise, build side leaf split concurrency will be starved by probe side splits that start and then immediately block.
1 parent aa8f0ae commit 8f19630

File tree

8 files changed

+105
-6
lines changed

8 files changed

+105
-6
lines changed

core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ public class SqlTaskExecution
105105
private final List<PlanNodeId> sourceStartOrder;
106106
@GuardedBy("this")
107107
private int schedulingPlanNodeOrdinal;
108+
@GuardedBy("this")
109+
private ListenableFuture<Void> pipelineDependenciesSatisfied = immediateVoidFuture();
108110

109111
@GuardedBy("this")
110112
private final Map<PlanNodeId, PendingSplitsForPlanNode> pendingSplitsByPlanNode;
@@ -309,7 +311,8 @@ private synchronized Set<PlanNodeId> updateSplitAssignments(List<SplitAssignment
309311
// update task with new sources
310312
for (SplitAssignment splitAssignment : unacknowledgedSplitAssignment) {
311313
if (driverRunnerFactoriesWithSplitLifeCycle.containsKey(splitAssignment.getPlanNodeId())) {
312-
schedulePartitionedSource(splitAssignment);
314+
mergeIntoPendingSplits(splitAssignment.getPlanNodeId(), splitAssignment.getSplits(), splitAssignment.isNoMoreSplits());
315+
schedulePartitionedSourcePendingSplits();
313316
}
314317
else {
315318
// tell existing drivers about the new splits
@@ -337,15 +340,34 @@ private void mergeIntoPendingSplits(PlanNodeId planNodeId, Set<ScheduledSplit> s
337340
}
338341
}
339342

340-
private synchronized void schedulePartitionedSource(SplitAssignment splitAssignmentUpdate)
343+
private synchronized void scheduleSourcePartitionedSplitsAfterPipelineUnblocked()
341344
{
342-
mergeIntoPendingSplits(splitAssignmentUpdate.getPlanNodeId(), splitAssignmentUpdate.getSplits(), splitAssignmentUpdate.isNoMoreSplits());
345+
try (SetThreadName _ = new SetThreadName("Task-" + taskId)) {
346+
// Enqueue pending splits as split runners after unblocking
347+
schedulePartitionedSourcePendingSplits();
348+
// Re-check for task completion since we may have just set no more splits
349+
checkTaskCompletion();
350+
}
351+
}
343352

353+
private synchronized void schedulePartitionedSourcePendingSplits()
354+
{
344355
while (schedulingPlanNodeOrdinal < sourceStartOrder.size()) {
345356
PlanNodeId schedulingPlanNode = sourceStartOrder.get(schedulingPlanNodeOrdinal);
346357

347358
DriverSplitRunnerFactory partitionedDriverRunnerFactory = driverRunnerFactoriesWithSplitLifeCycle.get(schedulingPlanNode);
348359

360+
// Avoid creating split runners for pipelines that are awaiting another pipeline completing (e.g. probe side of a join waiting
361+
// on the broadcast completion). Otherwise, build side pipelines will have reduced concurrency available.
362+
ListenableFuture<Void> pipelineDependenciesSatisfied = partitionedDriverRunnerFactory.getPipelineDependenciesSatisfied();
363+
if (!pipelineDependenciesSatisfied.isDone()) {
364+
// Only register a single re-schedule listener if we're blocked on pipeline dependencies
365+
if (this.pipelineDependenciesSatisfied.isDone()) {
366+
this.pipelineDependenciesSatisfied = pipelineDependenciesSatisfied;
367+
pipelineDependenciesSatisfied.addListener(this::scheduleSourcePartitionedSplitsAfterPipelineUnblocked, notificationExecutor);
368+
}
369+
break;
370+
}
349371
PendingSplitsForPlanNode pendingSplits = pendingSplitsByPlanNode.get(schedulingPlanNode);
350372

351373
// Enqueue driver runners with split lifecycle for this plan node and driver life cycle combination.
@@ -600,6 +622,11 @@ private DriverSplitRunnerFactory(DriverFactory driverFactory, Tracer tracer, boo
600622
.startSpan();
601623
}
602624

625+
public ListenableFuture<Void> getPipelineDependenciesSatisfied()
626+
{
627+
return driverFactory.getPipelineDependenciesSatisfied();
628+
}
629+
603630
public DriverSplitRunner createPartitionedDriverRunner(ScheduledSplit partitionedSplit)
604631
{
605632
return createDriverRunner(partitionedSplit, partitionedSplit.getSplit().getSplitWeight().getRawValue());

core/trino-main/src/main/java/io/trino/operator/DriverFactory.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
package io.trino.operator;
1515

1616
import com.google.common.collect.ImmutableList;
17+
import com.google.common.util.concurrent.Futures;
18+
import com.google.common.util.concurrent.ListenableFuture;
1719
import com.google.errorprone.annotations.concurrent.GuardedBy;
1820
import io.trino.sql.planner.plan.PlanNodeId;
1921
import jakarta.annotation.Nullable;
@@ -26,6 +28,7 @@
2628
import static com.google.common.base.Preconditions.checkArgument;
2729
import static com.google.common.base.Preconditions.checkState;
2830
import static com.google.common.collect.ImmutableList.toImmutableList;
31+
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
2932
import static java.util.Objects.requireNonNull;
3033

3134
public class DriverFactory
@@ -35,6 +38,7 @@ public class DriverFactory
3538
private final boolean outputDriver;
3639
private final Optional<PlanNodeId> sourceId;
3740
private final OptionalInt driverInstances;
41+
private final ListenableFuture<Void> pipelineDependenciesSatisfied;
3842

3943
// must synchronize between createDriver() and noMoreDrivers(), but isNoMoreDrivers() is safe without synchronizing
4044
@GuardedBy("this")
@@ -57,6 +61,11 @@ public DriverFactory(int pipelineId, boolean inputDriver, boolean outputDriver,
5761
.collect(toImmutableList());
5862
checkArgument(sourceIds.size() <= 1, "Expected at most one source operator in driver factory, but found %s", sourceIds);
5963
this.sourceId = sourceIds.isEmpty() ? Optional.empty() : Optional.of(sourceIds.get(0));
64+
List<ListenableFuture<Void>> pipelineDependencies = operatorFactories.stream()
65+
.map(OperatorFactory::pipelineDependenciesSatisfied)
66+
.filter(future -> !future.isDone())
67+
.collect(toImmutableList());
68+
this.pipelineDependenciesSatisfied = pipelineDependencies.isEmpty() ? Futures.immediateVoidFuture() : Futures.whenAllComplete(pipelineDependencies).call(() -> null, directExecutor());
6069
}
6170

6271
public int getPipelineId()
@@ -74,6 +83,11 @@ public boolean isOutputDriver()
7483
return outputDriver;
7584
}
7685

86+
public ListenableFuture<Void> getPipelineDependenciesSatisfied()
87+
{
88+
return pipelineDependenciesSatisfied;
89+
}
90+
7791
/**
7892
* return the sourceId of this DriverFactory.
7993
* A DriverFactory doesn't always have source node.

core/trino-main/src/main/java/io/trino/operator/OperatorFactory.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@
1313
*/
1414
package io.trino.operator;
1515

16+
import com.google.common.util.concurrent.ListenableFuture;
17+
18+
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
19+
1620
public interface OperatorFactory
1721
{
1822
Operator createOperator(DriverContext driverContext);
@@ -27,4 +31,14 @@ public interface OperatorFactory
2731
void noMoreOperators();
2832

2933
OperatorFactory duplicate();
34+
35+
/**
36+
* Returns a future indicating that any dependencies operators have on other pipelines has been satisfied and that leaf splits
37+
* should be allowed to start for this operator. This is used to prevent join probe splits from starting before the build side
38+
* of a join is ready when the two are in the same stage (i.e.: broadcast join on top of a table scan).
39+
*/
40+
default ListenableFuture<Void> pipelineDependenciesSatisfied()
41+
{
42+
return immediateVoidFuture();
43+
}
3044
}

core/trino-main/src/main/java/io/trino/operator/WorkProcessorOperatorAdapter.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.trino.operator;
1515

1616
import com.google.common.annotations.VisibleForTesting;
17+
import com.google.common.util.concurrent.Futures;
1718
import com.google.common.util.concurrent.ListenableFuture;
1819
import io.trino.memory.context.MemoryTrackingContext;
1920
import io.trino.operator.join.JoinOperatorFactory;
@@ -86,6 +87,21 @@ public Optional<OperatorFactory> createOuterOperatorFactory()
8687
return lookupJoin.createOuterOperatorFactory();
8788
}
8889

90+
@Override
91+
public ListenableFuture<Void> buildPipelineReady()
92+
{
93+
if (!(operatorFactory instanceof JoinOperatorFactory lookupJoin)) {
94+
return Futures.immediateVoidFuture();
95+
}
96+
return lookupJoin.buildPipelineReady();
97+
}
98+
99+
@Override
100+
public ListenableFuture<Void> pipelineDependenciesSatisfied()
101+
{
102+
return buildPipelineReady();
103+
}
104+
89105
@VisibleForTesting
90106
public WorkProcessorOperatorFactory getWorkProcessorOperatorFactory()
91107
{

core/trino-main/src/main/java/io/trino/operator/join/JoinBridgeManager.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public static JoinBridgeManager<PartitionedLookupSourceFactory> lookupAllAtOnce(
4343
private final List<Type> buildOutputTypes;
4444
private final boolean buildOuter;
4545
private final T joinBridge;
46+
private final ListenableFuture<Void> whenBuildFinishes;
4647

4748
private final AtomicBoolean initialized = new AtomicBoolean();
4849
private JoinLifecycle joinLifecycle;
@@ -57,6 +58,7 @@ public JoinBridgeManager(
5758
this.buildOuter = buildOuter;
5859
this.joinBridge = requireNonNull(joinBridge, "joinBridge is null");
5960
this.buildOutputTypes = requireNonNull(buildOutputTypes, "buildOutputTypes is null");
61+
this.whenBuildFinishes = requireNonNull(joinBridge.whenBuildFinishes(), "whenBuildFinishes is null");
6062
}
6163

6264
private void initializeIfNecessary()
@@ -67,7 +69,7 @@ private void initializeIfNecessary()
6769
return;
6870
}
6971
int finalProbeFactoryCount = probeFactoryCount.get();
70-
joinLifecycle = new JoinLifecycle(joinBridge, finalProbeFactoryCount, buildOuter ? 1 : 0);
72+
joinLifecycle = new JoinLifecycle(whenBuildFinishes, joinBridge, finalProbeFactoryCount, buildOuter ? 1 : 0);
7173
initialized.set(true);
7274
}
7375
}
@@ -83,6 +85,11 @@ public void incrementProbeFactoryCount()
8385
probeFactoryCount.increment();
8486
}
8587

88+
public ListenableFuture<Void> getBuildFinishedFuture()
89+
{
90+
return whenBuildFinishes;
91+
}
92+
8693
public T getJoinBridge()
8794
{
8895
initializeIfNecessary();
@@ -139,7 +146,7 @@ private static class JoinLifecycle
139146
private final ListenableFuture<Void> whenBuildAndProbeFinishes;
140147
private final ListenableFuture<Void> whenAllFinishes;
141148

142-
public JoinLifecycle(JoinBridge joinBridge, int probeFactoryCount, int outerFactoryCount)
149+
private JoinLifecycle(ListenableFuture<Void> whenBuildFinishes, JoinBridge joinBridge, int probeFactoryCount, int outerFactoryCount)
143150
{
144151
// When all probe and lookup-outer operators finish, destroy the join bridge (freeing the memory)
145152
// * Each LookupOuterOperatorFactory count as 1
@@ -152,7 +159,7 @@ public JoinLifecycle(JoinBridge joinBridge, int probeFactoryCount, int outerFact
152159
// * Each probe operator count as 1
153160
probeReferenceCount = new ReferenceCount(probeFactoryCount);
154161

155-
whenBuildAndProbeFinishes = Futures.whenAllSucceed(joinBridge.whenBuildFinishes(), probeReferenceCount.getFreeFuture()).call(() -> null, directExecutor());
162+
whenBuildAndProbeFinishes = Futures.whenAllSucceed(whenBuildFinishes, probeReferenceCount.getFreeFuture()).call(() -> null, directExecutor());
156163
whenAllFinishes = Futures.whenAllSucceed(whenBuildAndProbeFinishes, outerReferenceCount.getFreeFuture()).call(() -> null, directExecutor());
157164
whenAllFinishes.addListener(joinBridge::destroy, directExecutor());
158165
}

core/trino-main/src/main/java/io/trino/operator/join/JoinOperatorFactory.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,18 @@
1313
*/
1414
package io.trino.operator.join;
1515

16+
import com.google.common.util.concurrent.ListenableFuture;
1617
import io.trino.operator.OperatorFactory;
1718

1819
import java.util.Optional;
1920

2021
public interface JoinOperatorFactory
2122
{
2223
Optional<OperatorFactory> createOuterOperatorFactory();
24+
25+
/**
26+
* Future that indicates when the build side of the join has been completed and probe processing
27+
* can begin. Used by {@link OperatorFactory#pipelineDependenciesSatisfied()}.
28+
*/
29+
ListenableFuture<Void> buildPipelineReady();
2330
}

core/trino-main/src/main/java/io/trino/operator/join/LookupJoinOperatorFactory.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.google.common.collect.ImmutableList;
1717
import com.google.common.primitives.Ints;
18+
import com.google.common.util.concurrent.ListenableFuture;
1819
import io.trino.operator.HashGenerator;
1920
import io.trino.operator.JoinOperatorType;
2021
import io.trino.operator.OperatorFactory;
@@ -164,6 +165,12 @@ public String getOperatorType()
164165
return LookupJoinOperator.class.getSimpleName();
165166
}
166167

168+
@Override
169+
public ListenableFuture<Void> buildPipelineReady()
170+
{
171+
return joinBridgeManager.getBuildFinishedFuture();
172+
}
173+
167174
@Override
168175
public WorkProcessorOperator create(ProcessorContext processorContext, WorkProcessor<Page> sourcePages)
169176
{

core/trino-main/src/main/java/io/trino/operator/join/unspilled/LookupJoinOperatorFactory.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.trino.operator.join.unspilled;
1515

1616
import com.google.common.collect.ImmutableList;
17+
import com.google.common.util.concurrent.ListenableFuture;
1718
import io.trino.operator.JoinOperatorType;
1819
import io.trino.operator.OperatorFactory;
1920
import io.trino.operator.ProcessorContext;
@@ -133,6 +134,12 @@ public String getOperatorType()
133134
return LookupJoinOperator.class.getSimpleName();
134135
}
135136

137+
@Override
138+
public ListenableFuture<Void> buildPipelineReady()
139+
{
140+
return joinBridgeManager.getBuildFinishedFuture();
141+
}
142+
136143
@Override
137144
public WorkProcessorOperator create(ProcessorContext processorContext, WorkProcessor<Page> sourcePages)
138145
{

0 commit comments

Comments
 (0)