Skip to content

Commit 3b31893

Browse files
committed
Compute Dynamic Filters only when a consumer supports them
1 parent 769f367 commit 3b31893

File tree

2 files changed

+68
-1
lines changed

2 files changed

+68
-1
lines changed

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,16 @@ impl DynamicFilterPhysicalExpr {
294294
.await;
295295
}
296296

297+
/// Check if this dynamic filter is being actively used by any consumers.
298+
///
299+
/// Returns `true` if there are references beyond the producer (e.g., the HashJoinExec
300+
/// that created the filter). This is useful to avoid computing expensive filter
301+
/// expressions when no consumer will actually use them.
302+
pub fn is_used(self: &Arc<Self>) -> bool {
303+
// Strong count > 1 means at least one consumer is holding a reference beyond the producer.
304+
Arc::strong_count(self) > 1
305+
}
306+
297307
fn render(
298308
&self,
299309
f: &mut std::fmt::Formatter<'_>,
@@ -607,4 +617,47 @@ mod test {
607617
// wait_complete should return immediately
608618
dynamic_filter.wait_complete().await;
609619
}
620+
621+
#[test]
622+
fn test_is_used() {
623+
// Create a dynamic filter
624+
let filter = Arc::new(DynamicFilterPhysicalExpr::new(
625+
vec![],
626+
lit(true) as Arc<dyn PhysicalExpr>,
627+
));
628+
629+
// Initially, only one reference exists (the filter itself)
630+
assert!(
631+
!filter.is_used(),
632+
"Filter should not be used with only one reference"
633+
);
634+
635+
// Simulate a consumer holding a reference (e.g., ParquetExec)
636+
let consumer1 = Arc::clone(&filter);
637+
assert!(
638+
filter.is_used(),
639+
"Filter should be used with a consumer reference"
640+
);
641+
642+
// Multiple consumers
643+
let consumer2 = Arc::clone(&filter);
644+
assert!(
645+
filter.is_used(),
646+
"Filter should still be used with multiple consumers"
647+
);
648+
649+
// Drop one consumer
650+
drop(consumer1);
651+
assert!(
652+
filter.is_used(),
653+
"Filter should still be used with remaining consumer"
654+
);
655+
656+
// Drop all consumers
657+
drop(consumer2);
658+
assert!(
659+
!filter.is_used(),
660+
"Filter should not be used after all consumers dropped"
661+
);
662+
}
610663
}

datafusion/physical-plan/src/joins/hash_join/exec.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -910,7 +910,21 @@ impl ExecutionPlan for HashJoinExec {
910910
consider using CoalescePartitionsExec or the EnforceDistribution rule"
911911
);
912912

913-
let enable_dynamic_filter_pushdown = self.dynamic_filter.is_some();
913+
// Only enable dynamic filter pushdown if:
914+
// - The session config enables dynamic filter pushdown
915+
// - A dynamic filter exists
916+
// - At least one consumer is holding a reference to it, this avoids expensive filter
917+
// computation when disabled or when no consumer will use it.
918+
let enable_dynamic_filter_pushdown = context
919+
.session_config()
920+
.options()
921+
.optimizer
922+
.enable_join_dynamic_filter_pushdown
923+
&& self
924+
.dynamic_filter
925+
.as_ref()
926+
.map(|df| df.filter.is_used())
927+
.unwrap_or(false);
914928

915929
let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
916930
let left_fut = match self.mode {

0 commit comments

Comments
 (0)