Add FilterExecBuilder (#19608)#19619
Conversation
This commit fixes issue #19612 where accumulators that don't implement retract_batch exhibit buggy behavior in window frame queries. ## Problem When aggregate functions are used with window frames like `ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW`, DataFusion uses PlainAggregateWindowExpr which calls evaluate() multiple times on the same accumulator instance. Accumulators that use std::mem::take() in their evaluate() method consume their internal state, causing incorrect results on subsequent calls. ## Solution 1. **percentile_cont**: Modified evaluate() to use mutable reference instead of consuming the Vec. Added retract_batch() support for both PercentileContAccumulator and DistinctPercentileContAccumulator. 2. **string_agg**: Changed SimpleStringAggAccumulator::evaluate() to clone the accumulated string instead of taking it. ## Changes - datafusion/functions-aggregate/src/percentile_cont.rs: - Changed calculate_percentile() to take &mut [T::Native] instead of Vec<T::Native> - Updated PercentileContAccumulator::evaluate() to pass reference - Updated DistinctPercentileContAccumulator::evaluate() to clone values - Added retract_batch() implementation using HashMap for efficient removal - Updated PercentileContGroupsAccumulator::evaluate() for consistency - datafusion/functions-aggregate/src/string_agg.rs: - Changed evaluate() to use clone() instead of std::mem::take() - datafusion/sqllogictest/test_files/aggregate.slt: - Added test cases for percentile_cont with window frames - Added test comparing median() vs percentile_cont(0.5) behavior - Added test for string_agg cumulative window frame - docs/source/library-user-guide/functions/adding-udfs.md: - Added documentation about window-compatible accumulators - Explained evaluate() state preservation requirements - Documented retract_batch() implementation guidance Closes #19612
nuno-faria
left a comment
There was a problem hiding this comment.
Thanks @GaneshPatil7517 for taking a look at this. I left some comments below. I think it might be better to add a builder as proposed by @adriangb instead of the new constructor.
| /// )?; | ||
| /// ``` | ||
| #[expect(clippy::needless_pass_by_value)] | ||
| pub fn try_new_with_projection( |
There was a problem hiding this comment.
@adriangb suggested to add a builder instead of a new constructor and I tend to agree. @GaneshPatil7517 could you create a FilterExecBuilder, with:
- a
newfunction which would take the predicate and input, since they are mandatory. - several
with_...methods for the projection, default_selectivity, batch_size, and fetch. - a
buildmethod which would return theFilterExec.
We then might even consider to have the original try_new use the builder instead, to reduce the amount of duplicate code.
| @@ -0,0 +1,147 @@ | |||
| <!--- | |||
There was a problem hiding this comment.
I don't think a new user guide is needed just for this change.
| size_of_val(self) + self.all_values.capacity() * size_of::<T::Native>() | ||
| } | ||
|
|
||
| fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> { |
There was a problem hiding this comment.
I'm not sure how this relates to this PR.
|
|
||
| statement ok | ||
| drop table distinct_avg; | ||
|
|
There was a problem hiding this comment.
Also not sure why new aggregate tests are required.
| [`create_udaf`]: https://docs.rs/datafusion/latest/datafusion/logical_expr/fn.create_udaf.html | ||
| [`advanced_udaf.rs`]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/udf/advanced_udaf.rs | ||
|
|
||
| ### Window Frame Compatible Accumulators |
There was a problem hiding this comment.
This also doesn't fit in this PR.
Adds FilterExecBuilder pattern with fluent API Allows setting projection, selectivity, batch_size, fetch in one build Refactors try_new to use builder internally (reduces duplication) Ensures compute_properties executes only once Fixes #19608
74944f3 to
bf8ea99
Compare
|
@GaneshPatil7517 thanks for the changes. There are still some changes that I think are from a different PR, at |
|
ok @nuno-faria ill remove it... |
|
@nuno-faria Please Review this... |
|
@alamb could you please enable the workflows in this PR? Thanks. |
|
I’ve set them to run |
|
there are 2 failing and 30 successful checks, let me solve this.... |
|
Hi @nuno-faria & @adriangb , Thank you for your time and support! 🙌 |
adriangb
left a comment
There was a problem hiding this comment.
Should we deprecate with_projection and other methods on FilterExec and re-implement them in terms of the building until they are removed?
|
ok ill work on that... |
As suggested in PR review, deprecate with_projection(), with_default_selectivity(), and with_batch_size() methods on FilterExec. These methods now use FilterExecBuilder internally for backward compatibility while guiding users toward the builder pattern. - Marked methods as deprecated since 51.0.0 - Re-implemented using FilterExecBuilder to maintain functionality - All 114 filter tests passing - Provides gentle migration path for users
5f35662 to
26e5c46
Compare
|
hey @adriangb please can you review it i updated it.... |
|
Hey @adriangb sir please can you cheack now... |
- Replace .clone() with Arc::clone() to follow Rust best practices - Replace deprecated method calls in internal code with direct builder usage - Update with_new_children, try_swapping_with_projection, and EmbeddedProjection impl - All 114 filter tests passing - No clippy warnings or errors
9596ceb to
aa4a892
Compare
| #[deprecated(since = "52.0.0", note = "Use FilterExecBuilder::with_default_selectivity instead")] | ||
| pub fn with_default_selectivity( | ||
| mut self, | ||
| self, | ||
| default_selectivity: u8, | ||
| ) -> Result<Self, DataFusionError> { | ||
| if default_selectivity > 100 { | ||
| return plan_err!( | ||
| "Default filter selectivity value needs to be less than or equal to 100" | ||
| ); | ||
| } | ||
| self.default_selectivity = default_selectivity; | ||
| Ok(self) | ||
| FilterExecBuilder::new(Arc::clone(&self.predicate), Arc::clone(&self.input)) | ||
| .with_projection(self.projection.clone()) | ||
| .with_default_selectivity(default_selectivity) | ||
| .with_batch_size(self.batch_size) | ||
| .with_fetch(self.fetch) | ||
| .build() | ||
| } |
There was a problem hiding this comment.
I'm not sure if the with_default_selectivity method should be deprecated, since before it simply updated the field. Now we would need to create a new FilterExec from scratch.
There was a problem hiding this comment.
ok ill work on it...
The reviewer pointed out that with_default_selectivity() simply updates a field and doesn't need the overhead of creating a new FilterExec via the builder. Restored the original efficient implementation. Only with_projection() and with_batch_size() remain deprecated, as they benefit from the builder pattern's single property computation. - Restored original with_default_selectivity implementation - Updated upgrading.md to reflect only 2 deprecated methods - All 114 filter tests passing - No clippy warnings
f961dc9 to
a601075
Compare
|
@adriangb could you run the workflows again? |
ok ill updating it... |
Per reviewer feedback, updated all internal uses of deprecated with_projection() and with_batch_size() methods to use FilterExecBuilder instead: - datafusion/core/src/physical_planner.rs: Use FilterExecBuilder for filter creation - datafusion/proto/src/physical_plan/mod.rs: Use FilterExecBuilder in proto deserialization - datafusion/physical-plan/src/filter.rs: Updated test to use builder pattern Also restored with_default_selectivity() to non-deprecated status since it simply updates a field value without the overhead of rebuilding FilterExec. All tests passing, no clippy warnings.
3701930 to
dac2557
Compare
|
hey @adriangb could you run the workflows again.....? |
|
Hi @nuno-faria & @adriangb , |
|
run benchmark sql_planner |
|
🤖 |
FilterExecBuilder (#19608)
alamb
left a comment
There was a problem hiding this comment.
Thanks @GaneshPatil7517 and @nuno-faria and @adriangb -- this looks really nice to me
| /// # Deprecated | ||
| /// Use [`FilterExecBuilder::with_projection`] instead | ||
| #[deprecated( | ||
| since = "52.0.0", |
There was a problem hiding this comment.
this technically will now go out in 53 I think
|
|
||
| You can see the current [status of the `52.0.0`release here](https://github.com/apache/datafusion/issues/18566) | ||
|
|
||
| ### `FilterExec` builder methods deprecated |
There was a problem hiding this comment.
We need to move these to the 53 release section
|
Benchmark script failed with exit code 101. Last 10 lines of output: Click to expand |
nuno-faria
left a comment
There was a problem hiding this comment.
Thanks @GaneshPatil7517. I still have comments, this time about the with_projection method.
| // Check if the projection is valid | ||
| can_project(&self.schema(), projection.as_ref())?; | ||
|
|
||
| let projection = match projection { | ||
| Some(projection) => match &self.projection { | ||
| Some(p) => Some(projection.iter().map(|i| p[*i]).collect()), | ||
| None => Some(projection), | ||
| }, | ||
| None => None, | ||
| }; | ||
|
|
||
| FilterExecBuilder::new(Arc::clone(&self.predicate), Arc::clone(&self.input)) | ||
| .with_projection(projection) | ||
| .with_default_selectivity(self.default_selectivity) | ||
| .with_batch_size(self.batch_size) | ||
| .with_fetch(self.fetch) | ||
| .build() |
There was a problem hiding this comment.
I'm having some doubts about deprecating with_projection of FilterExec (I highlight here the with_projection of impl EmbeddedProjection, which has the same code). Mainly about this part here:
let projection = match projection {
Some(projection) => match &self.projection {
Some(p) => Some(projection.iter().map(|i| p[*i]).collect()),
None => Some(projection),
},
None => None,
};It appears that the projection passed to the method tries to project the existing projection. I'm not sure why this is the case, but I think if we deprecate with_projection we lose the ability to do this. But I also haven't seen a with_projection called on an existing FilterExec, it is always accompanied with a try_new.
There was a problem hiding this comment.
I guess FilterExecBuilder could do the same thing? I can see why that would be the correct behavior.
There was a problem hiding this comment.
But in this case FilterExecBuilder always builds a new filter, while the previous with_projection could operate over an existing one.
There was a problem hiding this comment.
I still don't get it. The old with_projection also built a new FilterExec:
let cache = Self::compute_properties(
&self.input,
&self.predicate,
self.default_selectivity,
projection.as_ref(),
)?;
Ok(Self {
predicate: Arc::clone(&self.predicate),
input: Arc::clone(&self.input),
metrics: self.metrics.clone(),
default_selectivity: self.default_selectivity,
cache,
projection,
batch_size: self.batch_size,
fetch: self.fetch,
})The only difference I see is that the old implementation composed projections, the new one replaces them. So maybe all we need to do is implement the projection composition in FilterExecBuilder::with_projection to match the existing behavior?
There was a problem hiding this comment.
Thanks for the detailed analysis @adriangb and @nuno-faria! You're right - the old implementation composed projections while the new one replaces them. I'll update FilterExecBuilder::with_projection to compose the projections instead of replacing, matching the original behavior. This way we maintain backward compatibility while still having the cleaner builder API.
|
Hi @GaneshPatil7517 I'm sorry this has taken many rounds of feedback including delays between the reviews. It can be hard to find time to be a good open source contributor / reviewer sometimes. I hope that wasn't too discouraging. I think this is a good idea and want to see it across the finish line. |
…19854) - Closes #19608, - replaces #19619 --------- Co-authored-by: Ganesh Patil <[email protected]>
This PR introduces a new FilterExec constructor that accepts a projection at creation time,
ensuring compute_properties() executes only once.
What was added:
try_new_with_projection()method for FilterExeccompute_properties()now runs a single timeFixes: #19608