Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion/physical-plan/src/aggregates/group_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ pub trait GroupValues: Send {
/// Emits the group values
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;

/// Signals that input is complete and drain mode should be activated
fn input_done(&mut self) {}

/// Clear the contents and shrink the capacity to the size of the batch (free up memory usage)
fn clear_shrink(&mut self, batch: &RecordBatch);
}
Expand Down
89 changes: 66 additions & 23 deletions datafusion/physical-plan/src/aggregates/group_values/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ pub struct GroupValuesRows {

/// Random state for creating hashes
random_state: RandomState,

/// State for iterative emission (activated after input is complete)
/// When true, emit() uses offset-based slicing instead of copying remaining rows
drain_mode: bool,

/// Current offset for drain mode emission (number of rows already emitted)
emission_offset: usize,
}

impl GroupValuesRows {
Expand Down Expand Up @@ -107,11 +114,19 @@ impl GroupValuesRows {
hashes_buffer: Default::default(),
rows_buffer,
random_state: crate::aggregates::AGGREGATION_HASH_SEED,
drain_mode: false,
emission_offset: 0,
})
}
}

impl GroupValues for GroupValuesRows {
fn input_done(&mut self) {
self.drain_mode = true;
self.map.clear();
self.map_size = 0;
}

fn intern(&mut self, cols: &[ArrayRef], groups: &mut Vec<usize>) -> Result<()> {
// Convert the group keys into the row format
let group_rows = &mut self.rows_buffer;
Expand Down Expand Up @@ -185,10 +200,22 @@ impl GroupValues for GroupValuesRows {
self.len() == 0
}

/// Returns the number of group values.
///
/// In drain mode (after `input_done()`), returns remaining groups not yet emitted,
/// which matches the accumulator state size for consistency.
fn len(&self) -> usize {
self.group_values
.as_ref()
.map(|group_values| group_values.num_rows())
.map(|group_values| {
let total_rows = group_values.num_rows();
if self.drain_mode {
// In drain mode, return remaining rows (not yet emitted)
total_rows.saturating_sub(self.emission_offset)
} else {
total_rows
}
})
.unwrap_or(0)
}

Expand All @@ -206,29 +233,43 @@ impl GroupValues for GroupValuesRows {
output
}
EmitTo::First(n) => {
let groups_rows = group_values.iter().take(n);
let output = self.row_converter.convert_rows(groups_rows)?;
// Clear out first n group keys by copying them to a new Rows.
// TODO file some ticket in arrow-rs to make this more efficient?
let mut new_group_values = self.row_converter.empty_rows(0, 0);
for row in group_values.iter().skip(n) {
new_group_values.push(row);
}
std::mem::swap(&mut new_group_values, &mut group_values);

self.map.retain(|(_exists_hash, group_idx)| {
// Decrement group index by n
match group_idx.checked_sub(n) {
// Group index was >= n, shift value down
Some(sub) => {
*group_idx = sub;
true
}
// Group index was < n, so remove from table
None => false,
if self.drain_mode {
let start = self.emission_offset;
let end = std::cmp::min(start + n, group_values.num_rows());
let iter = (start..end).map(|i| group_values.row(i));
let output = self.row_converter.convert_rows(iter)?;
self.emission_offset = end;
if self.emission_offset == group_values.num_rows() {
group_values.clear();
self.emission_offset = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I fully understand this. Looking at the previous code, it does look like it was already emitting groups incrementally.

The difference I see is that in this new code we track the offset with self.emission_offset but the other side of the if statements mutates self.group_values in place to trim the results that have already been emitted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The emission_offset optimization is just to avoid the expensive "copy remaining rows" operation that the old EmitTo::First path did during input processing. The real fix is replacing emit(EmitTo::All) (which blocks for seconds on large group counts) with incremental drain https://github.com/apache/datafusion/pull/18906/files#diff-69c8ecaca5e2c7005f2ed1facaa41f80b45bfd006f2357e53ff3072f535c287dR1196

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't that mean then that the else part (when drain_mode == false) is now dead code? FWIW I can put a panic!() here:

                    output
                } else {
+                   panic!("foo");
                    let groups_rows = group_values.iter().take(n);
                    let output = self.row_converter.convert_rows(groups_rows)?;

And all the tests in the crate pass

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's needed for emit_early_if_necessary() for Partial mode with memory pressure and group_ordering.emit_to() when input is sorted or partially sorted. Those 2 cases can trigger incremental / early emission.

I added a test case for sorted input c4903b6 that triggers that path.

Copy link
Contributor

@gabotechs gabotechs Nov 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll reframe. If during the benchmarks you see that the total execution time is the same with the old code vs these new one that executes with drain_mode == true, how about just leaving group_values/row.rs as it was?

If there's actually no noticeable performance improvement, we might same some lines of code and complexity by just keeping the old path here in group_values/row.rs, as the actual improvement is happening in row_hash.rs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can give that a try but it will be less efficient for sure. There is no value in retaining the state of the hash map when draining. And you can see that the drain_mode == false path is doing much heavier operations compared to drain_mode == true.

}
});
output
output
} else {
let groups_rows = group_values.iter().take(n);
let output = self.row_converter.convert_rows(groups_rows)?;

// Clear out first n group keys by copying them to a new Rows.
// TODO file some ticket in arrow-rs to make this more efficient?
let mut new_group_values = self.row_converter.empty_rows(0, 0);
for row in group_values.iter().skip(n) {
new_group_values.push(row);
}
std::mem::swap(&mut new_group_values, &mut group_values);

self.map.retain(|(_exists_hash, group_idx)| {
// Decrement group index by n
match group_idx.checked_sub(n) {
// Group index was >= n, shift value down
Some(sub) => {
*group_idx = sub;
true
}
// Group index was < n, so remove from table
None => false,
}
});
output
}
}
};

Expand All @@ -255,6 +296,8 @@ impl GroupValues for GroupValuesRows {
self.map_size = self.map.capacity() * size_of::<(u64, usize)>();
self.hashes_buffer.clear();
self.hashes_buffer.shrink_to(count);
self.drain_mode = false;
self.emission_offset = 0;
}
}

Expand Down
Loading
Loading