Skip to content

Commit 477053d

Browse files
alambJefffrey
andauthored
Optimize planning / stop cloning Strings / Fields so much (2-3% faster planning time) (#18415)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #18413 ## Rationale for this change Avoid a bunch of clones / String copies during planning ## What changes are included in this PR? Change several methods on DFSchema to return `&FieldRef` rather than `&Field` which permits `Arc::clone` rather than a deep `Field` clone ## Are these changes tested? yes by CI I also ran benchmarks that show a small but consistent speedup in the planning benchmarks ## Are there any user-facing changes? Yes, there are several API changes in DFSchema that now return `FieldRef` rather than `Field` which allows using `Arc::clone` rather than `clone`. I have updated the upgrading guide too --------- Co-authored-by: Jeffrey Vo <jeffrey.vo.australia@gmail.com>
1 parent 5519b61 commit 477053d

File tree

11 files changed

+214
-85
lines changed

11 files changed

+214
-85
lines changed

datafusion/common/src/datatype.rs

Lines changed: 107 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! [`DataTypeExt`] and [`FieldExt`] extension trait for working with DataTypes to Fields
18+
//! [`DataTypeExt`] and [`FieldExt`] extension trait for working with Arrow [`DataType`] and [`Field`]s
1919
2020
use crate::arrow::datatypes::{DataType, Field, FieldRef};
21+
use crate::metadata::FieldMetadata;
2122
use std::sync::Arc;
2223

2324
/// DataFusion extension methods for Arrow [`DataType`]
@@ -61,7 +62,54 @@ impl DataTypeExt for DataType {
6162
}
6263

6364
/// DataFusion extension methods for Arrow [`Field`] and [`FieldRef`]
65+
///
66+
/// This trait is implemented for both [`Field`] and [`FieldRef`] and
67+
/// provides convenience methods for efficiently working with both types.
68+
///
69+
/// For [`FieldRef`], the methods will attempt to unwrap the `Arc`
70+
/// to avoid unnecessary cloning when possible.
6471
pub trait FieldExt {
72+
/// Ensure the field is named `new_name`, returning the given field if the
73+
/// name matches, and a new field if not.
74+
///
75+
/// This method avoids `clone`ing fields and names if the name is the same
76+
/// as the field's existing name.
77+
///
78+
/// Example:
79+
/// ```
80+
/// # use std::sync::Arc;
81+
/// # use arrow::datatypes::{DataType, Field};
82+
/// # use datafusion_common::datatype::FieldExt;
83+
/// let int_field = Field::new("my_int", DataType::Int32, true);
84+
/// // rename to "your_int"
85+
/// let renamed_field = int_field.renamed("your_int");
86+
/// assert_eq!(renamed_field.name(), "your_int");
87+
/// ```
88+
fn renamed(self, new_name: &str) -> Self;
89+
90+
/// Ensure the field has the given data type
91+
///
92+
/// Note this is different than simply calling [`Field::with_data_type`] as
93+
/// it avoids copying if the data type is already the same.
94+
///
95+
/// Example:
96+
/// ```
97+
/// # use std::sync::Arc;
98+
/// # use arrow::datatypes::{DataType, Field};
99+
/// # use datafusion_common::datatype::FieldExt;
100+
/// let int_field = Field::new("my_int", DataType::Int32, true);
101+
/// // change to Float64
102+
/// let retyped_field = int_field.retyped(DataType::Float64);
103+
/// assert_eq!(retyped_field.data_type(), &DataType::Float64);
104+
/// ```
105+
fn retyped(self, new_data_type: DataType) -> Self;
106+
107+
/// Add field metadata to the Field
108+
fn with_field_metadata(self, metadata: &FieldMetadata) -> Self;
109+
110+
/// Add optional field metadata,
111+
fn with_field_metadata_opt(self, metadata: Option<&FieldMetadata>) -> Self;
112+
65113
/// Returns a new Field representing a List of this Field's DataType.
66114
///
67115
/// For example if input represents an `Int32`, the return value will
@@ -130,6 +178,32 @@ pub trait FieldExt {
130178
}
131179

132180
impl FieldExt for Field {
181+
fn renamed(self, new_name: &str) -> Self {
182+
// check if this is a new name before allocating a new Field / copying
183+
// the existing one
184+
if self.name() != new_name {
185+
self.with_name(new_name)
186+
} else {
187+
self
188+
}
189+
}
190+
191+
fn retyped(self, new_data_type: DataType) -> Self {
192+
self.with_data_type(new_data_type)
193+
}
194+
195+
fn with_field_metadata(self, metadata: &FieldMetadata) -> Self {
196+
metadata.add_to_field(self)
197+
}
198+
199+
fn with_field_metadata_opt(self, metadata: Option<&FieldMetadata>) -> Self {
200+
if let Some(metadata) = metadata {
201+
self.with_field_metadata(metadata)
202+
} else {
203+
self
204+
}
205+
}
206+
133207
fn into_list(self) -> Self {
134208
DataType::List(Arc::new(self.into_list_item())).into_nullable_field()
135209
}
@@ -149,6 +223,34 @@ impl FieldExt for Field {
149223
}
150224

151225
impl FieldExt for Arc<Field> {
226+
fn renamed(mut self, new_name: &str) -> Self {
227+
if self.name() != new_name {
228+
// avoid cloning if possible
229+
Arc::make_mut(&mut self).set_name(new_name);
230+
}
231+
self
232+
}
233+
234+
fn retyped(mut self, new_data_type: DataType) -> Self {
235+
if self.data_type() != &new_data_type {
236+
// avoid cloning if possible
237+
Arc::make_mut(&mut self).set_data_type(new_data_type);
238+
}
239+
self
240+
}
241+
242+
fn with_field_metadata(self, metadata: &FieldMetadata) -> Self {
243+
metadata.add_to_field_ref(self)
244+
}
245+
246+
fn with_field_metadata_opt(self, metadata: Option<&FieldMetadata>) -> Self {
247+
if let Some(metadata) = metadata {
248+
self.with_field_metadata(metadata)
249+
} else {
250+
self
251+
}
252+
}
253+
152254
fn into_list(self) -> Self {
153255
DataType::List(self.into_list_item())
154256
.into_nullable_field()
@@ -161,13 +263,11 @@ impl FieldExt for Arc<Field> {
161263
.into()
162264
}
163265

164-
fn into_list_item(self) -> Self {
266+
fn into_list_item(mut self) -> Self {
165267
if self.name() != Field::LIST_FIELD_DEFAULT_NAME {
166-
Arc::unwrap_or_clone(self)
167-
.with_name(Field::LIST_FIELD_DEFAULT_NAME)
168-
.into()
169-
} else {
170-
self
268+
// avoid cloning if possible
269+
Arc::make_mut(&mut self).set_name(Field::LIST_FIELD_DEFAULT_NAME);
171270
}
271+
self
172272
}
173273
}

datafusion/common/src/dfschema.rs

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use arrow::datatypes::{
3737
/// A reference-counted reference to a [DFSchema].
3838
pub type DFSchemaRef = Arc<DFSchema>;
3939

40-
/// DFSchema wraps an Arrow schema and adds relation names.
40+
/// DFSchema wraps an Arrow schema and add a relation (table) name.
4141
///
4242
/// The schema may hold the fields across multiple tables. Some fields may be
4343
/// qualified and some unqualified. A qualified field is a field that has a
@@ -47,8 +47,14 @@ pub type DFSchemaRef = Arc<DFSchema>;
4747
/// have a distinct name from any qualified field names. This allows finding a
4848
/// qualified field by name to be possible, so long as there aren't multiple
4949
/// qualified fields with the same name.
50+
///]
51+
/// # See Also
52+
/// * [DFSchemaRef], an alias to `Arc<DFSchema>`
53+
/// * [DataTypeExt], common methods for working with Arrow [DataType]s
54+
/// * [FieldExt], extension methods for working with Arrow [Field]s
5055
///
51-
/// There is an alias to `Arc<DFSchema>` named [DFSchemaRef].
56+
/// [DataTypeExt]: crate::datatype::DataTypeExt
57+
/// [FieldExt]: crate::datatype::FieldExt
5258
///
5359
/// # Creating qualified schemas
5460
///
@@ -346,20 +352,22 @@ impl DFSchema {
346352
self.field_qualifiers.extend(qualifiers);
347353
}
348354

349-
/// Get a list of fields
355+
/// Get a list of fields for this schema
350356
pub fn fields(&self) -> &Fields {
351357
&self.inner.fields
352358
}
353359

354-
/// Returns an immutable reference of a specific `Field` instance selected using an
355-
/// offset within the internal `fields` vector
356-
pub fn field(&self, i: usize) -> &Field {
360+
/// Returns a reference to [`FieldRef`] for a column at specific index
361+
/// within the schema.
362+
///
363+
/// See also [Self::qualified_field] to get both qualifier and field
364+
pub fn field(&self, i: usize) -> &FieldRef {
357365
&self.inner.fields[i]
358366
}
359367

360-
/// Returns an immutable reference of a specific `Field` instance selected using an
361-
/// offset within the internal `fields` vector and its qualifier
362-
pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &Field) {
368+
/// Returns the qualifier (if any) and [`FieldRef`] for a column at specific
369+
/// index within the schema.
370+
pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &FieldRef) {
363371
(self.field_qualifiers[i].as_ref(), self.field(i))
364372
}
365373

@@ -410,12 +418,12 @@ impl DFSchema {
410418
.is_some()
411419
}
412420

413-
/// Find the field with the given name
421+
/// Find the [`FieldRef`] with the given name and optional qualifier
414422
pub fn field_with_name(
415423
&self,
416424
qualifier: Option<&TableReference>,
417425
name: &str,
418-
) -> Result<&Field> {
426+
) -> Result<&FieldRef> {
419427
if let Some(qualifier) = qualifier {
420428
self.field_with_qualified_name(qualifier, name)
421429
} else {
@@ -428,7 +436,7 @@ impl DFSchema {
428436
&self,
429437
qualifier: Option<&TableReference>,
430438
name: &str,
431-
) -> Result<(Option<&TableReference>, &Field)> {
439+
) -> Result<(Option<&TableReference>, &FieldRef)> {
432440
if let Some(qualifier) = qualifier {
433441
let idx = self
434442
.index_of_column_by_name(Some(qualifier), name)
@@ -440,10 +448,10 @@ impl DFSchema {
440448
}
441449

442450
/// Find all fields having the given qualifier
443-
pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&Field> {
451+
pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&FieldRef> {
444452
self.iter()
445453
.filter(|(q, _)| q.map(|q| q.eq(qualifier)).unwrap_or(false))
446-
.map(|(_, f)| f.as_ref())
454+
.map(|(_, f)| f)
447455
.collect()
448456
}
449457

@@ -459,22 +467,20 @@ impl DFSchema {
459467
}
460468

461469
/// Find all fields that match the given name
462-
pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> {
470+
pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&FieldRef> {
463471
self.fields()
464472
.iter()
465473
.filter(|field| field.name() == name)
466-
.map(|f| f.as_ref())
467474
.collect()
468475
}
469476

470477
/// Find all fields that match the given name and return them with their qualifier
471478
pub fn qualified_fields_with_unqualified_name(
472479
&self,
473480
name: &str,
474-
) -> Vec<(Option<&TableReference>, &Field)> {
481+
) -> Vec<(Option<&TableReference>, &FieldRef)> {
475482
self.iter()
476483
.filter(|(_, field)| field.name() == name)
477-
.map(|(qualifier, field)| (qualifier, field.as_ref()))
478484
.collect()
479485
}
480486

@@ -499,7 +505,7 @@ impl DFSchema {
499505
pub fn qualified_field_with_unqualified_name(
500506
&self,
501507
name: &str,
502-
) -> Result<(Option<&TableReference>, &Field)> {
508+
) -> Result<(Option<&TableReference>, &FieldRef)> {
503509
let matches = self.qualified_fields_with_unqualified_name(name);
504510
match matches.len() {
505511
0 => Err(unqualified_field_not_found(name, self)),
@@ -528,7 +534,7 @@ impl DFSchema {
528534
}
529535

530536
/// Find the field with the given name
531-
pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> {
537+
pub fn field_with_unqualified_name(&self, name: &str) -> Result<&FieldRef> {
532538
self.qualified_field_with_unqualified_name(name)
533539
.map(|(_, field)| field)
534540
}
@@ -538,7 +544,7 @@ impl DFSchema {
538544
&self,
539545
qualifier: &TableReference,
540546
name: &str,
541-
) -> Result<&Field> {
547+
) -> Result<&FieldRef> {
542548
let idx = self
543549
.index_of_column_by_name(Some(qualifier), name)
544550
.ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
@@ -550,7 +556,7 @@ impl DFSchema {
550556
pub fn qualified_field_from_column(
551557
&self,
552558
column: &Column,
553-
) -> Result<(Option<&TableReference>, &Field)> {
559+
) -> Result<(Option<&TableReference>, &FieldRef)> {
554560
self.qualified_field_with_name(column.relation.as_ref(), &column.name)
555561
}
556562

@@ -1220,7 +1226,7 @@ pub trait ExprSchema: std::fmt::Debug {
12201226
}
12211227

12221228
// Return the column's field
1223-
fn field_from_column(&self, col: &Column) -> Result<&Field>;
1229+
fn field_from_column(&self, col: &Column) -> Result<&FieldRef>;
12241230
}
12251231

12261232
// Implement `ExprSchema` for `Arc<DFSchema>`
@@ -1241,13 +1247,13 @@ impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
12411247
self.as_ref().data_type_and_nullable(col)
12421248
}
12431249

1244-
fn field_from_column(&self, col: &Column) -> Result<&Field> {
1250+
fn field_from_column(&self, col: &Column) -> Result<&FieldRef> {
12451251
self.as_ref().field_from_column(col)
12461252
}
12471253
}
12481254

12491255
impl ExprSchema for DFSchema {
1250-
fn field_from_column(&self, col: &Column) -> Result<&Field> {
1256+
fn field_from_column(&self, col: &Column) -> Result<&FieldRef> {
12511257
match &col.relation {
12521258
Some(r) => self.field_with_qualified_name(r, &col.name),
12531259
None => self.field_with_unqualified_name(&col.name),

datafusion/common/src/metadata.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use std::{collections::BTreeMap, sync::Arc};
1919

20-
use arrow::datatypes::{DataType, Field};
20+
use arrow::datatypes::{DataType, Field, FieldRef};
2121
use hashbrown::HashMap;
2222

2323
use crate::{DataFusionError, ScalarValue, error::_plan_err};
@@ -320,6 +320,16 @@ impl FieldMetadata {
320320

321321
field.with_metadata(self.to_hashmap())
322322
}
323+
324+
/// Updates the metadata on the FieldRef with this metadata, if it is not empty.
325+
pub fn add_to_field_ref(&self, mut field_ref: FieldRef) -> FieldRef {
326+
if self.inner.is_empty() {
327+
return field_ref;
328+
}
329+
330+
Arc::make_mut(&mut field_ref).set_metadata(self.to_hashmap());
331+
field_ref
332+
}
323333
}
324334

325335
impl From<&Field> for FieldMetadata {

datafusion/core/src/dataframe/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use std::sync::Arc;
4949
use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
5050
use arrow::compute::{cast, concat};
5151
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
52+
use arrow_schema::FieldRef;
5253
use datafusion_common::config::{CsvOptions, JsonOptions};
5354
use datafusion_common::{
5455
exec_err, internal_datafusion_err, not_impl_err, plan_datafusion_err, plan_err,
@@ -2241,7 +2242,7 @@ impl DataFrame {
22412242
.schema()
22422243
.iter()
22432244
.map(|(qualifier, field)| {
2244-
if qualifier.eq(&qualifier_rename) && field.as_ref() == field_rename {
2245+
if qualifier.eq(&qualifier_rename) && field == field_rename {
22452246
(
22462247
col(Column::from((qualifier, field)))
22472248
.alias_qualified(qualifier.cloned(), new_name),
@@ -2413,7 +2414,7 @@ impl DataFrame {
24132414
.schema()
24142415
.fields()
24152416
.iter()
2416-
.map(|f| f.as_ref().clone())
2417+
.map(Arc::clone)
24172418
.collect()
24182419
} else {
24192420
self.find_columns(&columns)?
@@ -2450,7 +2451,7 @@ impl DataFrame {
24502451
}
24512452

24522453
// Helper to find columns from names
2453-
fn find_columns(&self, names: &[String]) -> Result<Vec<Field>> {
2454+
fn find_columns(&self, names: &[String]) -> Result<Vec<FieldRef>> {
24542455
let schema = self.logical_plan().schema();
24552456
names
24562457
.iter()

0 commit comments

Comments
 (0)