Skip to content

Commit 360d626

Browse files
committed
Address PR feedback.
1 parent e7314bb commit 360d626

File tree

2 files changed

+131
-104
lines changed

2 files changed

+131
-104
lines changed

crates/iceberg/src/arrow/reader.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use parquet::file::metadata::{
4545
use parquet::schema::types::{SchemaDescriptor, Type as ParquetType};
4646

4747
use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader;
48-
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
48+
use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
4949
use crate::arrow::{arrow_schema_to_schema, get_arrow_datum};
5050
use crate::delete_vector::DeleteVector;
5151
use crate::error::Result;
@@ -235,13 +235,11 @@ impl ArrowReader {
235235
// RecordBatchTransformer performs any transformations required on the RecordBatches
236236
// that come back from the file, such as type promotion, default column insertion
237237
// and column re-ordering.
238-
let mut record_batch_transformer = RecordBatchTransformer::build_with_partition_data(
239-
task.schema_ref(),
240-
task.project_field_ids(),
241-
task.partition_spec.clone(),
242-
task.partition.clone(),
243-
task.name_mapping.clone(),
244-
);
238+
let mut record_batch_transformer =
239+
RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids())
240+
.with_partition(task.partition_spec.clone(), task.partition.clone())
241+
.with_name_mapping(task.name_mapping.clone())
242+
.build();
245243

246244
if let Some(batch_size) = batch_size {
247245
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);

crates/iceberg/src/arrow/record_batch_transformer.rs

Lines changed: 125 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ fn constants_map(
5959
// Only identity transforms should use constant values from partition metadata
6060
if matches!(field.transform, Transform::Identity) {
6161
// Get the partition value for this field
62-
if let Some(Some(Literal::Primitive(value))) = partition_data.iter().nth(pos) {
62+
if let Some(Literal::Primitive(value)) = &partition_data[pos] {
6363
constants.insert(field.source_id, value.clone());
6464
}
6565
}
@@ -143,16 +143,108 @@ enum SchemaComparison {
143143
Different,
144144
}
145145

146+
/// Builder for RecordBatchTransformer to improve ergonomics when constructing with optional parameters.
147+
///
148+
/// See [`RecordBatchTransformer`] for details on partition spec, partition data, and name mapping.
149+
#[derive(Debug)]
150+
pub(crate) struct RecordBatchTransformerBuilder {
151+
snapshot_schema: Arc<IcebergSchema>,
152+
projected_iceberg_field_ids: Vec<i32>,
153+
partition_spec: Option<Arc<PartitionSpec>>,
154+
partition_data: Option<Struct>,
155+
name_mapping: Option<Arc<NameMapping>>,
156+
}
157+
158+
impl RecordBatchTransformerBuilder {
159+
pub(crate) fn new(
160+
snapshot_schema: Arc<IcebergSchema>,
161+
projected_iceberg_field_ids: &[i32],
162+
) -> Self {
163+
Self {
164+
snapshot_schema,
165+
projected_iceberg_field_ids: projected_iceberg_field_ids.to_vec(),
166+
partition_spec: None,
167+
partition_data: None,
168+
name_mapping: None,
169+
}
170+
}
171+
172+
/// Set partition spec and data together for identifying identity-transformed partition columns.
173+
///
174+
/// Both partition_spec and partition_data must be provided together since the spec defines
175+
/// which fields are identity-partitioned, and the data provides their constant values.
176+
/// One without the other cannot produce a valid constants map.
177+
pub(crate) fn with_partition(
178+
mut self,
179+
partition_spec: Option<Arc<PartitionSpec>>,
180+
partition_data: Option<Struct>,
181+
) -> Self {
182+
self.partition_spec = partition_spec;
183+
self.partition_data = partition_data;
184+
self
185+
}
186+
187+
pub(crate) fn with_name_mapping(mut self, name_mapping: Option<Arc<NameMapping>>) -> Self {
188+
self.name_mapping = name_mapping;
189+
self
190+
}
191+
192+
pub(crate) fn build(self) -> RecordBatchTransformer {
193+
RecordBatchTransformer {
194+
snapshot_schema: self.snapshot_schema,
195+
projected_iceberg_field_ids: self.projected_iceberg_field_ids,
196+
partition_spec: self.partition_spec,
197+
partition_data: self.partition_data,
198+
name_mapping: self.name_mapping,
199+
batch_transform: None,
200+
}
201+
}
202+
}
203+
204+
/// Transforms RecordBatches from Parquet files to match the Iceberg table schema.
205+
///
206+
/// Handles schema evolution, column reordering, type promotion, and implements the Iceberg spec's
207+
/// "Column Projection" rules for resolving field IDs "not present" in data files:
208+
/// 1. Return the value from partition metadata if an Identity Transform exists
209+
/// 2. Use schema.name-mapping.default metadata to map field id to columns without field id
210+
/// 3. Return the default value if it has a defined initial-default
211+
/// 4. Return null in all other cases
212+
///
213+
/// # Partition Spec and Data
214+
///
215+
/// **Bucket partitioning**: Distinguish identity transforms (use partition metadata constants)
216+
/// from non-identity transforms like bucket (read from data file) to enable runtime filtering on
217+
/// bucket-partitioned columns. For example, `bucket(4, id)` stores only the bucket number in
218+
/// partition metadata, so actual `id` values must be read from the data file.
219+
///
220+
/// **Add_files field ID conflicts**: When importing Hive tables, partition columns can have field IDs
221+
/// conflicting with Parquet data columns (e.g., Parquet has field_id=1->"name", but Iceberg expects
222+
/// field_id=1->"id"). Per spec, such fields are "not present" and should use name mapping (rule #2).
223+
///
224+
/// This matches Java's ParquetSchemaUtil.applyNameMapping approach but detects conflicts during projection.
225+
///
226+
/// # References
227+
/// - Spec: https://iceberg.apache.org/spec/#column-projection
228+
/// - Java: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
229+
/// - Java: parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java
146230
#[derive(Debug)]
147231
pub(crate) struct RecordBatchTransformer {
148232
snapshot_schema: Arc<IcebergSchema>,
149233
projected_iceberg_field_ids: Vec<i32>,
150234

151-
// Optional partition spec and data for proper constant identification
235+
/// Partition spec for identifying identity-transformed partition columns (spec rule #1).
236+
/// Only fields with identity transforms use partition data constants; non-identity transforms
237+
/// (bucket, truncate, etc.) must read source columns from data files.
152238
partition_spec: Option<Arc<PartitionSpec>>,
239+
240+
/// Partition data providing constant values for identity-transformed partition columns (spec rule #1).
241+
/// For example, in a file at path `dept=engineering/file.parquet`, this would contain
242+
/// the value "engineering" for the dept field.
153243
partition_data: Option<Struct>,
154244

155-
// Optional name mapping for resolving field IDs from column names
245+
/// Name mapping for resolving field IDs from column names when field IDs are missing or
246+
/// conflicting (spec rule #2). Used primarily for Hive tables imported via add_files where
247+
/// Parquet field IDs may conflict with Iceberg schema field IDs.
156248
name_mapping: Option<Arc<NameMapping>>,
157249

158250
// BatchTransform gets lazily constructed based on the schema of
@@ -167,56 +259,7 @@ impl RecordBatchTransformer {
167259
snapshot_schema: Arc<IcebergSchema>,
168260
projected_iceberg_field_ids: &[i32],
169261
) -> Self {
170-
Self::build_with_partition_data(
171-
snapshot_schema,
172-
projected_iceberg_field_ids,
173-
None,
174-
None,
175-
None,
176-
)
177-
}
178-
179-
/// Build a RecordBatchTransformer with partition spec and data for proper constant identification.
180-
///
181-
/// Implements the Iceberg spec's "Column Projection" rules for resolving field IDs "not present" in data files:
182-
/// 1. Return the value from partition metadata if an Identity Transform exists
183-
/// 2. Use schema.name-mapping.default metadata to map field id to columns without field id
184-
/// 3. Return the default value if it has a defined initial-default
185-
/// 4. Return null in all other cases
186-
///
187-
/// # Why this method exists
188-
///
189-
/// 1. **Bucket partitioning**: Distinguish identity transforms (use partition metadata constants)
190-
/// from non-identity transforms like bucket (read from data file) to enable runtime filtering on
191-
/// bucket-partitioned columns.
192-
///
193-
/// 2. **Add_files field ID conflicts**: When importing Hive tables, partition columns can have field IDs
194-
/// conflicting with Parquet data columns (e.g., Parquet has field_id=1->"name", but Iceberg expects
195-
/// field_id=1->"id"). Per spec, such fields are "not present" and should use name mapping (rule #2).
196-
///
197-
/// This matches Java's ParquetSchemaUtil.applyNameMapping approach but detects conflicts during projection.
198-
///
199-
/// # References
200-
/// - Spec: https://iceberg.apache.org/spec/#column-projection
201-
/// - Java: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
202-
/// - Java: parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java
203-
pub(crate) fn build_with_partition_data(
204-
snapshot_schema: Arc<IcebergSchema>,
205-
projected_iceberg_field_ids: &[i32],
206-
partition_spec: Option<Arc<PartitionSpec>>,
207-
partition_data: Option<Struct>,
208-
name_mapping: Option<Arc<NameMapping>>,
209-
) -> Self {
210-
let projected_iceberg_field_ids = projected_iceberg_field_ids.to_vec();
211-
212-
Self {
213-
snapshot_schema,
214-
projected_iceberg_field_ids,
215-
partition_spec,
216-
partition_data,
217-
name_mapping,
218-
batch_transform: None,
219-
}
262+
RecordBatchTransformerBuilder::new(snapshot_schema, projected_iceberg_field_ids).build()
220263
}
221264

222265
pub(crate) fn process_record_batch(
@@ -657,7 +700,9 @@ mod test {
657700
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
658701
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
659702

660-
use crate::arrow::record_batch_transformer::RecordBatchTransformer;
703+
use crate::arrow::record_batch_transformer::{
704+
RecordBatchTransformer, RecordBatchTransformerBuilder,
705+
};
661706
use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Struct, Type};
662707

663708
#[test]
@@ -948,13 +993,10 @@ mod test {
948993

949994
let projected_field_ids = [1, 2, 3, 4]; // id, name, dept, subdept
950995

951-
let mut transformer = RecordBatchTransformer::build_with_partition_data(
952-
snapshot_schema,
953-
&projected_field_ids,
954-
None,
955-
None,
956-
Some(name_mapping),
957-
);
996+
let mut transformer =
997+
RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids)
998+
.with_name_mapping(Some(name_mapping))
999+
.build();
9581000

9591001
// Create a Parquet RecordBatch with data for: name="John Doe", subdept="communications"
9601002
let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
@@ -1079,13 +1121,10 @@ mod test {
10791121

10801122
let projected_field_ids = [1, 2]; // id, name
10811123

1082-
let mut transformer = RecordBatchTransformer::build_with_partition_data(
1083-
snapshot_schema,
1084-
&projected_field_ids,
1085-
Some(partition_spec),
1086-
Some(partition_data),
1087-
None,
1088-
);
1124+
let mut transformer =
1125+
RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids)
1126+
.with_partition(Some(partition_spec), Some(partition_data))
1127+
.build();
10891128

10901129
// Create a Parquet RecordBatch with actual data
10911130
// The id column MUST be read from here, not treated as a constant
@@ -1202,13 +1241,10 @@ mod test {
12021241

12031242
let projected_field_ids = [1, 2, 3]; // id, dept, name
12041243

1205-
let mut transformer = RecordBatchTransformer::build_with_partition_data(
1206-
snapshot_schema,
1207-
&projected_field_ids,
1208-
Some(partition_spec),
1209-
Some(partition_data),
1210-
None,
1211-
);
1244+
let mut transformer =
1245+
RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids)
1246+
.with_partition(Some(partition_spec), Some(partition_data))
1247+
.build();
12121248

12131249
let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
12141250
Arc::new(Int32Array::from(vec![100, 200])),
@@ -1320,13 +1356,10 @@ mod test {
13201356

13211357
let projected_field_ids = [1, 2]; // row_id (field_id=1), name (field_id=2)
13221358

1323-
let mut transformer = RecordBatchTransformer::build_with_partition_data(
1324-
snapshot_schema,
1325-
&projected_field_ids,
1326-
Some(partition_spec),
1327-
Some(partition_data),
1328-
None,
1329-
);
1359+
let mut transformer =
1360+
RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids)
1361+
.with_partition(Some(partition_spec), Some(partition_data))
1362+
.build();
13301363

13311364
// Create a Parquet RecordBatch with actual data
13321365
// Despite column rename, data should be read via field_id=1
@@ -1433,13 +1466,11 @@ mod test {
14331466

14341467
let projected_field_ids = [1, 2, 3, 4, 5]; // id, dept, data, category, notes
14351468

1436-
let mut transformer = RecordBatchTransformer::build_with_partition_data(
1437-
snapshot_schema,
1438-
&projected_field_ids,
1439-
Some(partition_spec),
1440-
Some(partition_data),
1441-
Some(name_mapping),
1442-
);
1469+
let mut transformer =
1470+
RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids)
1471+
.with_partition(Some(partition_spec), Some(partition_data))
1472+
.with_name_mapping(Some(name_mapping))
1473+
.build();
14431474

14441475
let parquet_batch = RecordBatch::try_new(parquet_schema, vec![
14451476
Arc::new(Int32Array::from(vec![100, 200])),
@@ -1553,13 +1584,11 @@ mod test {
15531584

15541585
let projected_field_ids = [1, 2, 3, 4]; // id, name, dept, subdept
15551586

1556-
let mut transformer = RecordBatchTransformer::build_with_partition_data(
1557-
snapshot_schema,
1558-
&projected_field_ids,
1559-
Some(partition_spec),
1560-
Some(partition_data),
1561-
Some(name_mapping),
1562-
);
1587+
let mut transformer =
1588+
RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids)
1589+
.with_partition(Some(partition_spec), Some(partition_data))
1590+
.with_name_mapping(Some(name_mapping))
1591+
.build();
15631592

15641593
// Parquet data (one row)
15651594
let parquet_batch = RecordBatch::try_new(parquet_schema, vec![

0 commit comments

Comments
 (0)