-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Compute Dynamic Filters only when a consumer supports them #18938
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Compute Dynamic Filters only when a consumer supports them #18938
Conversation
ebc401a to
3b31893
Compare
| pub fn is_used(self: &Arc<Self>) -> bool { | ||
| // Strong count > 1 means at least one consumer is holding a reference beyond the producer. | ||
| Arc::strong_count(self) > 1 | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really sure how to test a condition where is_used() returns false without adding too much machinery or making the dynamic_filter attribute from HashJoin public which would make it easy to mess with the Arc reference count.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can’t you just make a new DynamicFilterPhysicalExpr and check that is_used is False?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you have a test already?
adriangb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is draft but the current code looks good to me, I’ll approve once it’s ready :)
|
Something funky is going on here with the Arc count, some queries are not pushing down the filter to the probe because Arc count remains at 1 -> It doesn't happen in all the tests though, which is strange Edit: For the tests that fails seems like partition 0 never gets to see a I think this might be because we clone the In any case, seems like the strong_count approach might hit some edge cases here... |
62688c7 to
7664b0d
Compare
7664b0d to
8c467c9
Compare
…ilters-only-when-consumer-asks-for-it
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me -- thank you @LiaCastaneda
The only thing I think is needed is to add a note to the upgrade guide
@adriangb do you want to take a look at this PR prior to merge?
| /// Discriminant for the result of pushing down a filter into a child node. | ||
| #[derive(Debug, Clone, Copy)] | ||
| #[derive(Debug, Clone, Copy, PartialEq, Eq)] | ||
| pub enum PushedDown { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a public enum (doc link) I think this will be an API change (I marked the PR as such)
Can you please add a note to the DataFusion 52 upgrade guide explaining how people need to update their existing code (e.g. if they used to return Yes or No what should they return after this change)?
|
run benchmarks |
|
🤖 |
|
🤖: Benchmark completed Details
|
adriangb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this might be because we clone the
DynamicFilterPhysicalExprdirectly in the execute phase ofDataSourceExec(at least for this kind of node)In any case, seems like the strong_count approach might hit some edge cases here...
This is interesting. Could it indicate a bug? I don't think we downcast into DynamicFilterPhysicalExpr anywhere in the Parquet specific code. So if we keep a reference it must be via Arc::clone.
I ask because I think this approach is what we actually want. As per https://github.com/apache/datafusion/pull/18938/files#r2578193421 I'm not even sure this change ends up having a different behavior at runtime?
| // Only create the dynamic filter if the probe side will actually use it (Exact or Inexact). | ||
| // If it's Unsupported, don't compute the filter since it won't be used. | ||
| let will_be_used = !matches!(filter.discriminant, PushedDown::Unsupported); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is the case, don't we end up in the same place as Yes/No? I.e. this change only seems helpful if we did something like "only create the filter if the child said Exact".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it lets scans say "I can't use this at all" (Unsupported), so we can skip computing filters entirely if stats prunning is not supported either - the Yes/No system had no way to express that: if we had stats pruning with the filters, it would fall under the No discriminant, but we would still need them. I'm also thinking: if we know a scan will only use the filter for stats pruning (Inexact), maybe would it make sense to compute just the min/max bounds instead of both IN LIST and bounds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these are both good reasons to make this API change. Maybe we can justify the change by doing what you are saying and skipping pushing down the entire hash table if it won't be used? But then again that is basically free... and where do bloom filters fall into this calculation? i.e. bloom filter pruning only works if HashJoinExec produces an InListExpr and the scan node is a Parquet node (or other format that supports bloom filters). That seems like an awful lot of complex coordination between the producer/consumer that is specific to each file (some may have bloom filters some don't) and the filter being pushed down (min/max vs. InList vs. Hash Table).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it seems like it adds complexity to the “which filter to use” decision. Maybe the only clear use case is:
it lets scans say "I can't use this at all" (Unsupported), so we can skip computing filters entirely if stats prunning is not supported eitheri think it lets scans say "I can't use this at all" (Unsupported), so we can skip computing filters entirely if stats prunning is not supported either
I was also thinking, what if we just let consumers communicate what kinds of filters they support, and the producer only adjusts that decision based on memory or row-count limits? Or would that be an anti-pattern? In any case, that still wouldn’t let the producer understand the purpose of the dynamic filters (if for stats or row level filtering)
Yeah, sorry for the confusion -- I switched the approach to exact/inexact/unsupported just as an alternative since it was getting really hard to get
I took a look the other day, and the thing is that for some reason the strong count is not accounting for the leaf node that supposedly also holds a reference to The 1 -> 2 strong count is actually expected here because the first partition creates the SharedBuildAccumulator which also holds a reference to the DynamicFilterPhysicalExpr, but if the hash join + the leaf keep the ref (which is what we should be seeing) I would have expected to see something like: |
|
run benchmarks |
|
🤖 |
|
show benchmark queue |
|
🤖 Hi @alamb, you asked to view the benchmark queue (#18938 (comment)).
|
|
🤖: Benchmark completed Details
|
|
I didn't had time to get back to this :( I’ll be on vacation until next next week, so I won’t be able to look at this again until then. If anyone wants to take over feel free, otherwise, I’ll continue when I’m back! |
Which issue does this PR close?
Closes #17527
Rationale for this change
Currently, DataFusion computes bounds for all queries that contain a HashJoinExec node whenever the option enable_dynamic_filter_pushdown is set to true (default). It might make sense to compute these bounds only when we explicitly know there is a consumer that will use them.
What changes are included in this PR?
This PR expands the filter pushdown result enum from two variants (Yes/No) to three variants : Exact/Inexact/Unsupported as suggested in #18856 and #17527
in
handle_child_pushdown_result, the HashJoinExec checks the discriminant returned by its probe side child to determine whether the dynamic filter will be used. If the child returns Unsupported, the HashJoinExec skips creating the dynamic filter accumulator, avoiding unnecessary computation.Are these changes tested?
Added a test
test_hash_join_dynamic_filter_with_unsupported_scanthat verifies that the DynamicFilter placeholder is not present in the probe node.Are there any user-facing changes?
Yes, the PushedDown enum now has three variants instead of two.