Skip to content

Commit 8c467c9

Browse files
committed
Change Approach to Exact/Inexact/Unsupported
1 parent 3b31893 commit 8c467c9

File tree

9 files changed

+147
-139
lines changed

9 files changed

+147
-139
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,12 @@ impl FileSource for TestSource {
220220
..self.clone()
221221
});
222222
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
223-
vec![PushedDown::Yes; filters.len()],
223+
vec![PushedDown::Exact; filters.len()],
224224
)
225225
.with_updated_node(new_node))
226226
} else {
227227
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
228-
vec![PushedDown::No; filters.len()],
228+
vec![PushedDown::Unsupported; filters.len()],
229229
))
230230
}
231231
}
@@ -542,8 +542,8 @@ impl ExecutionPlan for TestNode {
542542
let first_pushdown_result = self_pushdown_result[0].clone();
543543

544544
match &first_pushdown_result.discriminant {
545-
PushedDown::No => {
546-
// We have a filter to push down
545+
PushedDown::Unsupported | PushedDown::Inexact => {
546+
// We have a filter that wasn't exactly pushed down, so we need to apply it
547547
let new_child = FilterExec::try_new(
548548
Arc::clone(&first_pushdown_result.predicate),
549549
Arc::clone(&self.input),
@@ -555,7 +555,8 @@ impl ExecutionPlan for TestNode {
555555
res.updated_node = Some(Arc::new(new_self) as Arc<dyn ExecutionPlan>);
556556
Ok(res)
557557
}
558-
PushedDown::Yes => {
558+
PushedDown::Exact => {
559+
// Filter was exactly pushed down, no need to apply it again
559560
let res = FilterPushdownPropagation::if_all(child_pushdown_result);
560561
Ok(res)
561562
}

datafusion/datasource-parquet/src/source.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,9 @@ impl FileSource for ParquetSource {
707707
.into_iter()
708708
.map(|filter| {
709709
if can_expr_be_pushed_down_with_schemas(&filter, table_schema) {
710+
// When pushdown_filters is true, filters will be used for row-level filtering
711+
// When pushdown_filters is false, filters will be used for stats pruning only
712+
// We'll mark them as supported here and adjust later based on pushdown_filters
710713
PushedDownPredicate::supported(filter)
711714
} else {
712715
PushedDownPredicate::unsupported(filter)
@@ -715,19 +718,19 @@ impl FileSource for ParquetSource {
715718
.collect();
716719
if filters
717720
.iter()
718-
.all(|f| matches!(f.discriminant, PushedDown::No))
721+
.all(|f| matches!(f.discriminant, PushedDown::Unsupported))
719722
{
720723
// No filters can be pushed down, so we can just return the remaining filters
721724
// and avoid replacing the source in the physical plan.
722725
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
723-
vec![PushedDown::No; filters.len()],
726+
vec![PushedDown::Unsupported; filters.len()],
724727
));
725728
}
726729
let allowed_filters = filters
727730
.iter()
728731
.filter_map(|f| match f.discriminant {
729-
PushedDown::Yes => Some(Arc::clone(&f.predicate)),
730-
PushedDown::No => None,
732+
PushedDown::Exact => Some(Arc::clone(&f.predicate)),
733+
PushedDown::Inexact | PushedDown::Unsupported => None,
731734
})
732735
.collect_vec();
733736
let predicate = match source.predicate {
@@ -740,13 +743,24 @@ impl FileSource for ParquetSource {
740743
source = source.with_pushdown_filters(pushdown_filters);
741744
let source = Arc::new(source);
742745
// If pushdown_filters is false we tell our parents that they still have to handle the filters,
743-
// even if we updated the predicate to include the filters (they will only be used for stats pruning).
746+
// even though we updated the predicate to include the filters (they will be used for stats pruning only).
747+
// In this case, we return Inexact for filters that can be pushed down (used for stats pruning)
748+
// and Unsupported for filters that cannot be pushed down at all.
744749
if !pushdown_filters {
750+
let result_discriminants = filters
751+
.iter()
752+
.map(|f| match f.discriminant {
753+
PushedDown::Exact => PushedDown::Inexact, // Downgrade to Inexact (stats pruning only)
754+
PushedDown::Unsupported => PushedDown::Unsupported, // Keep as Unsupported
755+
_ => unreachable!("Only Exact or Unsupported expected at this point"),
756+
})
757+
.collect();
745758
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
746-
vec![PushedDown::No; filters.len()],
759+
result_discriminants,
747760
)
748761
.with_updated_node(source));
749762
}
763+
// If pushdown_filters is true, we return the original discriminants (Exact for supported filters)
750764
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
751765
filters.iter().map(|f| f.discriminant).collect(),
752766
)

datafusion/datasource/src/file.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,9 @@ pub trait FileSource: Send + Sync {
121121
filters: Vec<Arc<dyn PhysicalExpr>>,
122122
_config: &ConfigOptions,
123123
) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
124+
// Default implementation: don't support filter pushdown
124125
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
125-
vec![PushedDown::No; filters.len()],
126+
vec![PushedDown::Unsupported; filters.len()],
126127
))
127128
}
128129

datafusion/datasource/src/source.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,9 @@ pub trait DataSource: Send + Sync + Debug {
185185
filters: Vec<Arc<dyn PhysicalExpr>>,
186186
_config: &ConfigOptions,
187187
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
188+
// Default implementation: don't support filter pushdown
188189
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
189-
vec![PushedDown::No; filters.len()],
190+
vec![PushedDown::Unsupported; filters.len()],
190191
))
191192
}
192193
}

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -294,16 +294,6 @@ impl DynamicFilterPhysicalExpr {
294294
.await;
295295
}
296296

297-
/// Check if this dynamic filter is being actively used by any consumers.
298-
///
299-
/// Returns `true` if there are references beyond the producer (e.g., the HashJoinExec
300-
/// that created the filter). This is useful to avoid computing expensive filter
301-
/// expressions when no consumer will actually use them.
302-
pub fn is_used(self: &Arc<Self>) -> bool {
303-
// Strong count > 1 means at least one consumer is holding a reference beyond the producer.
304-
Arc::strong_count(self) > 1
305-
}
306-
307297
fn render(
308298
&self,
309299
f: &mut std::fmt::Formatter<'_>,
@@ -617,47 +607,4 @@ mod test {
617607
// wait_complete should return immediately
618608
dynamic_filter.wait_complete().await;
619609
}
620-
621-
#[test]
622-
fn test_is_used() {
623-
// Create a dynamic filter
624-
let filter = Arc::new(DynamicFilterPhysicalExpr::new(
625-
vec![],
626-
lit(true) as Arc<dyn PhysicalExpr>,
627-
));
628-
629-
// Initially, only one reference exists (the filter itself)
630-
assert!(
631-
!filter.is_used(),
632-
"Filter should not be used with only one reference"
633-
);
634-
635-
// Simulate a consumer holding a reference (e.g., ParquetExec)
636-
let consumer1 = Arc::clone(&filter);
637-
assert!(
638-
filter.is_used(),
639-
"Filter should be used with a consumer reference"
640-
);
641-
642-
// Multiple consumers
643-
let consumer2 = Arc::clone(&filter);
644-
assert!(
645-
filter.is_used(),
646-
"Filter should still be used with multiple consumers"
647-
);
648-
649-
// Drop one consumer
650-
drop(consumer1);
651-
assert!(
652-
filter.is_used(),
653-
"Filter should still be used with remaining consumer"
654-
);
655-
656-
// Drop all consumers
657-
drop(consumer2);
658-
assert!(
659-
!filter.is_used(),
660-
"Filter should not be used after all consumers dropped"
661-
);
662-
}
663610
}

datafusion/physical-optimizer/src/filter_pushdown.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -495,9 +495,10 @@ fn push_down_filters(
495495
let mut all_predicates = self_filtered.items().to_vec();
496496

497497
// Apply second filter pass: collect indices of parent filters that can be pushed down
498-
let parent_filters_for_child = parent_filtered
499-
.chain_filter_slice(&parent_filters, |filter| {
500-
matches!(filter.discriminant, PushedDown::Yes)
498+
// Push down filters that are either Exact or Inexact (both mean the child will use them)
499+
let parent_filters_for_child =
500+
parent_filtered.chain_filter_slice(&parent_filters, |filter| {
501+
matches!(filter.discriminant, PushedDown::Exact | PushedDown::Inexact)
501502
});
502503

503504
// Add the filtered parent predicates to all_predicates
@@ -535,7 +536,7 @@ fn push_down_filters(
535536
.collect_vec();
536537
// Map the results from filtered self filters back to their original positions using FilteredVec
537538
let mapped_self_results =
538-
self_filtered.map_results_to_original(all_filters, PushedDown::No);
539+
self_filtered.map_results_to_original(all_filters, PushedDown::Unsupported);
539540

540541
// Wrap each result with its corresponding expression
541542
let self_filter_results: Vec<_> = mapped_self_results
@@ -548,7 +549,7 @@ fn push_down_filters(
548549

549550
// Start by marking all parent filters as unsupported for this child
550551
for parent_filter_pushdown_support in parent_filter_pushdown_supports.iter_mut() {
551-
parent_filter_pushdown_support.push(PushedDown::No);
552+
parent_filter_pushdown_support.push(PushedDown::Unsupported);
552553
assert_eq!(
553554
parent_filter_pushdown_support.len(),
554555
child_idx + 1,
@@ -557,7 +558,7 @@ fn push_down_filters(
557558
}
558559
// Map results from pushed-down filters back to original parent filter indices
559560
let mapped_parent_results = parent_filters_for_child
560-
.map_results_to_original(parent_filters, PushedDown::No);
561+
.map_results_to_original(parent_filters, PushedDown::Unsupported);
561562

562563
// Update parent_filter_pushdown_supports with the mapped results
563564
// mapped_parent_results already has the results at their original indices

datafusion/physical-plan/src/filter.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -521,19 +521,20 @@ impl ExecutionPlan for FilterExec {
521521
if !matches!(phase, FilterPushdownPhase::Pre) {
522522
return Ok(FilterPushdownPropagation::if_all(child_pushdown_result));
523523
}
524-
// We absorb any parent filters that were not handled by our children
524+
// We absorb any parent filters that were not handled by our children with exact filtering
525525
let unsupported_parent_filters =
526526
child_pushdown_result.parent_filters.iter().filter_map(|f| {
527-
matches!(f.all(), PushedDown::No).then_some(Arc::clone(&f.filter))
527+
// Keep filters that weren't handled with exact filtering (Inexact or Unsupported)
528+
(!matches!(f.all(), PushedDown::Exact)).then_some(Arc::clone(&f.filter))
528529
});
529530
let unsupported_self_filters = child_pushdown_result
530531
.self_filters
531532
.first()
532533
.expect("we have exactly one child")
533534
.iter()
534535
.filter_map(|f| match f.discriminant {
535-
PushedDown::Yes => None,
536-
PushedDown::No => Some(&f.predicate),
536+
PushedDown::Exact => None,
537+
PushedDown::Inexact | PushedDown::Unsupported => Some(&f.predicate),
537538
})
538539
.cloned();
539540

@@ -592,8 +593,11 @@ impl ExecutionPlan for FilterExec {
592593
Some(Arc::new(new) as _)
593594
};
594595

596+
// FilterExec always applies filters exactly.
597+
// Even if the child returned Inexact or Unsupported, we absorb those filters
598+
// and apply them, so from the parent's perspective, the filter is handled exactly.
595599
Ok(FilterPushdownPropagation {
596-
filters: vec![PushedDown::Yes; child_pushdown_result.parent_filters.len()],
600+
filters: vec![PushedDown::Exact; child_pushdown_result.parent_filters.len()],
597601
updated_node,
598602
})
599603
}

0 commit comments

Comments
 (0)