diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 8a3ab3a955..f1c4f86f23 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -881,6 +881,9 @@ mod tests { project_field_ids: vec![2, 3], predicate: None, deletes: vec![pos_del, eq_del], + partition: None, + partition_spec: None, + name_mapping: None, }; // Load the deletes - should handle both types without error diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index c0b1392dc6..e12daf5324 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use futures::{StreamExt, TryStreamExt}; use crate::arrow::ArrowReader; -use crate::arrow::record_batch_transformer::RecordBatchTransformer; +use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; use crate::io::FileIO; use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; use crate::spec::{Schema, SchemaRef}; @@ -82,7 +82,7 @@ impl BasicDeleteFileLoader { equality_ids: &[i32], ) -> Result { let mut record_batch_transformer = - RecordBatchTransformer::build(target_schema.clone(), equality_ids); + RecordBatchTransformerBuilder::new(target_schema.clone(), equality_ids).build(); let record_batch_stream = record_batch_stream.map(move |record_batch| { record_batch.and_then(|record_batch| { diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index 4250974bcd..14b5124ee6 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -341,6 +341,9 @@ pub(crate) mod tests { project_field_ids: vec![], predicate: None, deletes: vec![pos_del_1, pos_del_2.clone()], + partition: None, + partition_spec: None, + name_mapping: None, }, FileScanTask { start: 0, @@ -352,6 +355,9 @@ pub(crate) mod tests { project_field_ids: vec![], predicate: None, deletes: vec![pos_del_3], + partition: None, + partition_spec: None, + name_mapping: None, }, ]; diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index fed8f19c05..ab5a96f751 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -45,7 +45,7 @@ use parquet::file::metadata::{ use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; -use crate::arrow::record_batch_transformer::RecordBatchTransformer; +use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; use crate::arrow::{arrow_schema_to_schema, get_arrow_datum}; use crate::delete_vector::DeleteVector; use crate::error::Result; @@ -55,7 +55,7 @@ use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; -use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type}; +use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; @@ -181,7 +181,8 @@ impl ArrowReader { let should_load_page_index = (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); - let delete_filter_rx = delete_file_loader.load_deletes(&task.deletes, task.schema.clone()); + let delete_filter_rx = + delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema)); // Migrated tables lack field IDs, requiring us to inspect the schema to choose // between field-ID-based or position-based projection @@ -193,7 +194,9 @@ impl ArrowReader { ) .await?; - // Parquet files from Hive/Spark migrations lack field IDs in their metadata + // Check if Parquet file has embedded field IDs + // Corresponds to Java's ParquetSchemaUtil.hasIds() + // Reference: parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java:118 let missing_field_ids = initial_stream_builder .schema() .fields() @@ -201,11 +204,38 @@ impl ArrowReader { .next() .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()); - // Adding position-based fallback IDs at schema level (not per-batch) enables projection - // on files that lack embedded field IDs. We recreate the builder to apply the modified schema. + // Three-branch schema resolution strategy matching Java's ReadConf constructor + // + // Per Iceberg spec Column Projection rules: + // "Columns in Iceberg data files are selected by field id. The table schema's column + // names and order may change after a data file is written, and projection must be done + // using field ids." + // https://iceberg.apache.org/spec/#column-projection + // + // When Parquet files lack field IDs (e.g., Hive/Spark migrations via add_files), + // we must assign field IDs BEFORE reading data to enable correct projection. + // + // Java's ReadConf determines field ID strategy: + // - Branch 1: hasIds(fileSchema) → trust embedded field IDs, use pruneColumns() + // - Branch 2: nameMapping present → applyNameMapping(), then pruneColumns() + // - Branch 3: fallback → addFallbackIds(), then pruneColumnsFallback() let mut record_batch_stream_builder = if missing_field_ids { - let arrow_schema = - add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema()); + // Parquet file lacks field IDs - must assign them before reading + let arrow_schema = if let Some(name_mapping) = &task.name_mapping { + // Branch 2: Apply name mapping to assign correct Iceberg field IDs + // Per spec rule #2: "Use schema.name-mapping.default metadata to map field id + // to columns without field id" + // Corresponds to Java's ParquetSchemaUtil.applyNameMapping() + apply_name_mapping_to_arrow_schema( + Arc::clone(initial_stream_builder.schema()), + name_mapping, + )? + } else { + // Branch 3: No name mapping - use position-based fallback IDs + // Corresponds to Java's ParquetSchemaUtil.addFallbackIds() + add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema()) + }; + let options = ArrowReaderOptions::new().with_schema(arrow_schema); Self::create_parquet_record_batch_stream_builder( @@ -216,11 +246,14 @@ impl ArrowReader { ) .await? } else { + // Branch 1: File has embedded field IDs - trust them initial_stream_builder }; - // Fallback IDs don't match Parquet's embedded field IDs (since they don't exist), - // so we must use position-based projection instead of field-ID matching + // Create projection mask based on field IDs + // - If file has embedded IDs: field-ID-based projection (missing_field_ids=false) + // - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match) + // - If fallback IDs: position-based projection (missing_field_ids=true) let projection_mask = Self::get_arrow_projection_mask( &task.project_field_ids, &task.schema, @@ -234,9 +267,18 @@ impl ArrowReader { // RecordBatchTransformer performs any transformations required on the RecordBatches // that come back from the file, such as type promotion, default column insertion - // and column re-ordering - let mut record_batch_transformer = - RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids()); + // and column re-ordering. + let mut record_batch_transformer_builder = + RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids()); + + if let (Some(partition_spec), Some(partition_data)) = + (task.partition_spec.clone(), task.partition.clone()) + { + record_batch_transformer_builder = + record_batch_transformer_builder.with_partition(partition_spec, partition_data); + } + + let mut record_batch_transformer = record_batch_transformer_builder.build(); if let Some(batch_size) = batch_size { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); @@ -919,6 +961,77 @@ fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap Result> { + debug_assert!( + arrow_schema + .fields() + .iter() + .next() + .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()), + "Schema already has field IDs - name mapping should not be applied" + ); + + use arrow_schema::Field; + + let fields_with_mapped_ids: Vec<_> = arrow_schema + .fields() + .iter() + .map(|field| { + // Look up this column name in name mapping to get the Iceberg field ID. + // Corresponds to Java's ApplyNameMapping visitor which calls + // nameMapping.find(currentPath()) and returns field.withId() if found. + // + // If the field isn't in the mapping, leave it WITHOUT assigning an ID + // (matching Java's behavior of returning the field unchanged). + // Later, during projection, fields without IDs are filtered out. + let mapped_field_opt = name_mapping + .fields() + .iter() + .find(|f| f.names().contains(&field.name().to_string())); + + let mut metadata = field.metadata().clone(); + + if let Some(mapped_field) = mapped_field_opt { + if let Some(field_id) = mapped_field.field_id() { + // Field found in mapping with a field_id → assign it + metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string()); + } + // If field_id is None, leave the field without an ID (will be filtered by projection) + } + // If field not found in mapping, leave it without an ID (will be filtered by projection) + + Field::new(field.name(), field.data_type().clone(), field.is_nullable()) + .with_metadata(metadata) + }) + .collect(); + + Ok(Arc::new(ArrowSchema::new_with_metadata( + fields_with_mapped_ids, + arrow_schema.metadata().clone(), + ))) +} + /// Add position-based fallback field IDs to Arrow schema for Parquet files lacking them. /// Enables projection on migrated files (e.g., from Hive/Spark). /// @@ -1948,6 +2061,9 @@ message schema { project_field_ids: vec![1], predicate: Some(predicate.bind(schema, true).unwrap()), deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, })] .into_iter(), )) as FileScanTaskStream; @@ -2266,6 +2382,9 @@ message schema { project_field_ids: vec![1], predicate: None, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, }; // Task 2: read the second and third row groups @@ -2279,6 +2398,9 @@ message schema { project_field_ids: vec![1], predicate: None, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, }; let tasks1 = Box::pin(futures::stream::iter(vec![Ok(task1)])) as FileScanTaskStream; @@ -2403,6 +2525,9 @@ message schema { project_field_ids: vec![1, 2], // Request both columns 'a' and 'b' predicate: None, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, })] .into_iter(), )) as FileScanTaskStream; @@ -2571,6 +2696,9 @@ message schema { partition_spec_id: 0, equality_ids: None, }], + partition: None, + partition_spec: None, + name_mapping: None, }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; @@ -2786,6 +2914,9 @@ message schema { partition_spec_id: 0, equality_ids: None, }], + partition: None, + partition_spec: None, + name_mapping: None, }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; @@ -2994,6 +3125,9 @@ message schema { partition_spec_id: 0, equality_ids: None, }], + partition: None, + partition_spec: None, + name_mapping: None, }; let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; @@ -3094,6 +3228,9 @@ message schema { project_field_ids: vec![1, 2], predicate: None, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, })] .into_iter(), )) as FileScanTaskStream; @@ -3188,6 +3325,9 @@ message schema { project_field_ids: vec![1, 3], predicate: None, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, })] .into_iter(), )) as FileScanTaskStream; @@ -3271,6 +3411,9 @@ message schema { project_field_ids: vec![1, 2, 3], predicate: None, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, })] .into_iter(), )) as FileScanTaskStream; @@ -3368,6 +3511,9 @@ message schema { project_field_ids: vec![1, 2], predicate: None, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, })] .into_iter(), )) as FileScanTaskStream; @@ -3494,6 +3640,9 @@ message schema { project_field_ids: vec![1, 2], predicate: None, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, })] .into_iter(), )) as FileScanTaskStream; @@ -3587,6 +3736,9 @@ message schema { project_field_ids: vec![1, 5, 2], predicate: None, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, })] .into_iter(), )) as FileScanTaskStream; @@ -3693,6 +3845,9 @@ message schema { project_field_ids: vec![1, 2, 3], predicate: Some(predicate.bind(schema, true).unwrap()), deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, })] .into_iter(), )) as FileScanTaskStream; @@ -3708,4 +3863,162 @@ message schema { // Should return empty results assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0)); } + + /// Test bucket partitioning reads source column from data file (not partition metadata). + /// + /// This is an integration test verifying the complete ArrowReader pipeline with bucket partitioning. + /// It corresponds to TestRuntimeFiltering tests in Iceberg Java (e.g., testRenamedSourceColumnTable). + /// + /// # Iceberg Spec Requirements + /// + /// Per the Iceberg spec "Column Projection" section: + /// > "Return the value from partition metadata if an **Identity Transform** exists for the field" + /// + /// This means: + /// - Identity transforms (e.g., `identity(dept)`) use constants from partition metadata + /// - Non-identity transforms (e.g., `bucket(4, id)`) must read source columns from data files + /// - Partition metadata for bucket transforms stores bucket numbers (0-3), NOT source values + /// + /// Java's PartitionUtil.constantsMap() implements this via: + /// ```java + /// if (field.transform().isIdentity()) { + /// idToConstant.put(field.sourceId(), converted); + /// } + /// ``` + /// + /// # What This Test Verifies + /// + /// This test ensures the full ArrowReader → RecordBatchTransformer pipeline correctly handles + /// bucket partitioning when FileScanTask provides partition_spec and partition_data: + /// + /// - Parquet file has field_id=1 named "id" with actual data [1, 5, 9, 13] + /// - FileScanTask specifies partition_spec with bucket(4, id) and partition_data with bucket=1 + /// - RecordBatchTransformer.constants_map() excludes bucket-partitioned field from constants + /// - ArrowReader correctly reads [1, 5, 9, 13] from the data file + /// - Values are NOT replaced with constant 1 from partition metadata + /// + /// # Why This Matters + /// + /// Without correct handling: + /// - Runtime filtering would break (e.g., `WHERE id = 5` would fail) + /// - Query results would be incorrect (all rows would have id=1) + /// - Bucket partitioning would be unusable for query optimization + /// + /// # References + /// - Iceberg spec: format/spec.md "Column Projection" + "Partition Transforms" + /// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java + /// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java + #[tokio::test] + async fn test_bucket_partitioning_reads_source_column_from_file() { + use arrow_array::Int32Array; + + use crate::spec::{Literal, PartitionSpec, Struct, Transform}; + + // Iceberg schema with id and name columns + let schema = Arc::new( + Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + // Partition spec: bucket(4, id) + let partition_spec = Arc::new( + PartitionSpec::builder(schema.clone()) + .with_spec_id(0) + .add_partition_field("id", "id_bucket", Transform::Bucket(4)) + .unwrap() + .build() + .unwrap(), + ); + + // Partition data: bucket value is 1 + let partition_data = Struct::from_iter(vec![Some(Literal::int(1))]); + + // Create Arrow schema with field IDs for Parquet file + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + ])); + + // Write Parquet file with data + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + let id_data = Arc::new(Int32Array::from(vec![1, 5, 9, 13])) as ArrayRef; + let name_data = + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie", "Dave"])) as ArrayRef; + + let to_write = + RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + let file = File::create(format!("{}/data.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + // Read the Parquet file with partition spec and data + let reader = ArrowReaderBuilder::new(file_io).build(); + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/data.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: None, + deletes: vec![], + partition: Some(partition_data), + partition_spec: Some(partition_spec), + name_mapping: None, + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + // Verify we got the correct data + assert_eq!(result.len(), 1); + let batch = &result[0]; + + assert_eq!(batch.num_columns(), 2); + assert_eq!(batch.num_rows(), 4); + + // The id column MUST contain actual values from the Parquet file [1, 5, 9, 13], + // NOT the constant partition value 1 + let id_col = batch + .column(0) + .as_primitive::(); + assert_eq!(id_col.value(0), 1); + assert_eq!(id_col.value(1), 5); + assert_eq!(id_col.value(2), 9); + assert_eq!(id_col.value(3), 13); + + let name_col = batch.column(1).as_string::(); + assert_eq!(name_col.value(0), "Alice"); + assert_eq!(name_col.value(1), "Bob"); + assert_eq!(name_col.value(2), "Charlie"); + assert_eq!(name_col.value(3), "Dave"); + } } diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index 5fbbbb106a..e7d8b8f0fb 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -29,9 +29,44 @@ use arrow_schema::{ use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::arrow::schema_to_arrow_schema; -use crate::spec::{Literal, PrimitiveLiteral, Schema as IcebergSchema}; +use crate::spec::{ + Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, Struct, Transform, +}; use crate::{Error, ErrorKind, Result}; +/// Build a map of field ID to constant value for identity-partitioned fields. +/// +/// Implements Iceberg spec "Column Projection" rule #1: use partition metadata constants +/// only for identity-transformed fields. Non-identity transforms (bucket, truncate, year, etc.) +/// store derived values in partition metadata, so source columns must be read from data files. +/// +/// Example: For `bucket(4, id)`, partition metadata has `id_bucket = 2` (bucket number), +/// but the actual `id` values (100, 200, 300) are only in the data file. +/// +/// Matches Java's `PartitionUtil.constantsMap()` which filters `if (field.transform().isIdentity())`. +/// +/// # References +/// - Spec: https://iceberg.apache.org/spec/#column-projection +/// - Java: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java:constantsMap() +fn constants_map( + partition_spec: &PartitionSpec, + partition_data: &Struct, +) -> HashMap { + let mut constants = HashMap::new(); + + for (pos, field) in partition_spec.fields().iter().enumerate() { + // Only identity transforms should use constant values from partition metadata + if matches!(field.transform, Transform::Identity) { + // Get the partition value for this field + if let Some(Literal::Primitive(value)) = &partition_data[pos] { + constants.insert(field.source_id, value.clone()); + } + } + } + + constants +} + /// Indicates how a particular column in a processed RecordBatch should /// be sourced. #[derive(Debug)] @@ -107,32 +142,107 @@ enum SchemaComparison { Different, } +/// Builder for RecordBatchTransformer to improve ergonomics when constructing with optional parameters. +/// +/// See [`RecordBatchTransformer`] for details on partition spec and partition data. #[derive(Debug)] -pub(crate) struct RecordBatchTransformer { +pub(crate) struct RecordBatchTransformerBuilder { snapshot_schema: Arc, projected_iceberg_field_ids: Vec, - - // BatchTransform gets lazily constructed based on the schema of - // the first RecordBatch we receive from the file - batch_transform: Option, + partition_spec: Option>, + partition_data: Option, } -impl RecordBatchTransformer { - /// Build a RecordBatchTransformer for a given - /// Iceberg snapshot schema and list of projected field ids. - pub(crate) fn build( +impl RecordBatchTransformerBuilder { + pub(crate) fn new( snapshot_schema: Arc, projected_iceberg_field_ids: &[i32], ) -> Self { - let projected_iceberg_field_ids = projected_iceberg_field_ids.to_vec(); - Self { snapshot_schema, - projected_iceberg_field_ids, + projected_iceberg_field_ids: projected_iceberg_field_ids.to_vec(), + partition_spec: None, + partition_data: None, + } + } + + /// Set partition spec and data together for identifying identity-transformed partition columns. + /// + /// Both partition_spec and partition_data must be provided together since the spec defines + /// which fields are identity-partitioned, and the data provides their constant values. + /// One without the other cannot produce a valid constants map. + pub(crate) fn with_partition( + mut self, + partition_spec: Arc, + partition_data: Struct, + ) -> Self { + self.partition_spec = Some(partition_spec); + self.partition_data = Some(partition_data); + self + } + + pub(crate) fn build(self) -> RecordBatchTransformer { + RecordBatchTransformer { + snapshot_schema: self.snapshot_schema, + projected_iceberg_field_ids: self.projected_iceberg_field_ids, + partition_spec: self.partition_spec, + partition_data: self.partition_data, batch_transform: None, } } +} + +/// Transforms RecordBatches from Parquet files to match the Iceberg table schema. +/// +/// Handles schema evolution, column reordering, type promotion, and implements the Iceberg spec's +/// "Column Projection" rules for resolving field IDs "not present" in data files: +/// 1. Return the value from partition metadata if an Identity Transform exists +/// 2. Use schema.name-mapping.default metadata to map field id to columns without field id (applied in ArrowReader) +/// 3. Return the default value if it has a defined initial-default +/// 4. Return null in all other cases +/// +/// # Field ID Resolution +/// +/// Field ID resolution happens in ArrowReader before data is read (matching Java's ReadConf): +/// - If file has embedded field IDs: trust them (ParquetSchemaUtil.hasIds() = true) +/// - If file lacks IDs and name_mapping exists: apply name mapping (ParquetSchemaUtil.applyNameMapping()) +/// - If file lacks IDs and no name_mapping: use position-based fallback (ParquetSchemaUtil.addFallbackIds()) +/// +/// By the time RecordBatchTransformer processes data, all field IDs are trustworthy. +/// This transformer only handles remaining projection rules (#1, #3, #4) for fields still "not present". +/// +/// # Partition Spec and Data +/// +/// **Bucket partitioning**: Distinguish identity transforms (use partition metadata constants) +/// from non-identity transforms like bucket (read from data file) to enable runtime filtering on +/// bucket-partitioned columns. For example, `bucket(4, id)` stores only the bucket number in +/// partition metadata, so actual `id` values must be read from the data file. +/// +/// # References +/// - Spec: https://iceberg.apache.org/spec/#column-projection +/// - Java: parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java (field ID resolution) +/// - Java: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java (partition constants) +#[derive(Debug)] +pub(crate) struct RecordBatchTransformer { + snapshot_schema: Arc, + projected_iceberg_field_ids: Vec, + + /// Partition spec for identifying identity-transformed partition columns (spec rule #1). + /// Only fields with identity transforms use partition data constants; non-identity transforms + /// (bucket, truncate, etc.) must read source columns from data files. + partition_spec: Option>, + /// Partition data providing constant values for identity-transformed partition columns (spec rule #1). + /// For example, in a file at path `dept=engineering/file.parquet`, this would contain + /// the value "engineering" for the dept field. + partition_data: Option, + + // BatchTransform gets lazily constructed based on the schema of + // the first RecordBatch we receive from the file + batch_transform: Option, +} + +impl RecordBatchTransformer { pub(crate) fn process_record_batch( &mut self, record_batch: RecordBatch, @@ -147,7 +257,7 @@ impl RecordBatchTransformer { .with_match_field_names(false) .with_row_count(Some(record_batch.num_rows())); RecordBatch::try_new_with_options( - target_schema.clone(), + Arc::clone(target_schema), self.transform_columns(record_batch.columns(), operations)?, &options, )? @@ -157,7 +267,7 @@ impl RecordBatchTransformer { .with_match_field_names(false) .with_row_count(Some(record_batch.num_rows())); RecordBatch::try_new_with_options( - target_schema.clone(), + Arc::clone(target_schema), record_batch.columns().to_vec(), &options, )? @@ -167,6 +277,8 @@ impl RecordBatchTransformer { record_batch.schema_ref(), self.snapshot_schema.as_ref(), &self.projected_iceberg_field_ids, + self.partition_spec.as_ref().map(|s| s.as_ref()), + self.partition_data.as_ref(), )?); self.process_record_batch(record_batch)? @@ -185,6 +297,8 @@ impl RecordBatchTransformer { source_schema: &ArrowSchemaRef, snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32], + partition_spec: Option<&PartitionSpec>, + partition_data: Option<&Struct>, ) -> Result { let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?); let field_id_to_mapped_schema_map = @@ -205,6 +319,12 @@ impl RecordBatchTransformer { let target_schema = Arc::new(ArrowSchema::new(fields?)); + let constants_map = if let (Some(spec), Some(data)) = (partition_spec, partition_data) { + constants_map(spec, data) + } else { + HashMap::new() + }; + match Self::compare_schemas(source_schema, &target_schema) { SchemaComparison::Equivalent => Ok(BatchTransform::PassThrough), SchemaComparison::NameChangesOnly => Ok(BatchTransform::ModifySchema { target_schema }), @@ -214,6 +334,8 @@ impl RecordBatchTransformer { snapshot_schema, projected_iceberg_field_ids, field_id_to_mapped_schema_map, + constants_map, + partition_spec, )?, target_schema, }), @@ -270,57 +392,92 @@ impl RecordBatchTransformer { snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32], field_id_to_mapped_schema_map: HashMap, + constants_map: HashMap, + _partition_spec: Option<&PartitionSpec>, ) -> Result> { let field_id_to_source_schema_map = Self::build_field_id_to_arrow_schema_map(source_schema)?; - projected_iceberg_field_ids.iter().map(|field_id|{ - let (target_field, _) = field_id_to_mapped_schema_map.get(field_id).ok_or( - Error::new(ErrorKind::Unexpected, "could not find field in schema") - )?; - let target_type = target_field.data_type(); - - Ok(if let Some((source_field, source_index)) = field_id_to_source_schema_map.get(field_id) { - // column present in source + projected_iceberg_field_ids + .iter() + .map(|field_id| { + let (target_field, _) = + field_id_to_mapped_schema_map + .get(field_id) + .ok_or(Error::new( + ErrorKind::Unexpected, + "could not find field in schema", + ))?; + let target_type = target_field.data_type(); - if source_field.data_type().equals_datatype(target_type) { - // no promotion required - ColumnSource::PassThrough { - source_index: *source_index + let iceberg_field = snapshot_schema.field_by_id(*field_id).ok_or(Error::new( + ErrorKind::Unexpected, + "Field not found in snapshot schema", + ))?; + + // Iceberg spec's "Column Projection" rules (https://iceberg.apache.org/spec/#column-projection). + // For fields "not present" in data files: + // 1. Use partition metadata (identity transforms only) + // 2. Use name mapping + // 3. Use initial_default + // 4. Return null + // + // Why check partition constants before Parquet field IDs (Java: BaseParquetReaders.java:299): + // In add_files scenarios, partition columns may exist in BOTH Parquet AND partition metadata. + // Partition metadata is authoritative - it defines which partition this file belongs to. + + // Field ID resolution now happens in ArrowReader via: + // 1. Embedded field IDs (ParquetSchemaUtil.hasIds() = true) - trust them + // 2. Name mapping (ParquetSchemaUtil.applyNameMapping()) - applied upfront + // 3. Position-based fallback (ParquetSchemaUtil.addFallbackIds()) - applied upfront + // + // At this point, all field IDs in the source schema are trustworthy. + // No conflict detection needed - schema resolution happened in reader.rs. + let field_by_id = field_id_to_source_schema_map.get(field_id).map( + |(source_field, source_index)| { + if source_field.data_type().equals_datatype(target_type) { + ColumnSource::PassThrough { + source_index: *source_index, + } + } else { + ColumnSource::Promote { + target_type: target_type.clone(), + source_index: *source_index, + } + } + }, + ); + + // Apply spec's fallback steps for "not present" fields. + let column_source = if let Some(constant_value) = constants_map.get(field_id) { + // Rule #1: Identity partition constant + ColumnSource::Add { + value: Some(constant_value.clone()), + target_type: target_type.clone(), } + } else if let Some(source) = field_by_id { + source } else { - // promotion required - ColumnSource::Promote { + // Rules #2, #3 and #4: + // Rule #2 (name mapping) was already applied in reader.rs if needed. + // If field_id is still not found, the column doesn't exist in the Parquet file. + // Fall through to rule #3 (initial_default) or rule #4 (null). + let default_value = iceberg_field.initial_default.as_ref().and_then(|lit| { + if let Literal::Primitive(prim) = lit { + Some(prim.clone()) + } else { + None + } + }); + ColumnSource::Add { + value: default_value, target_type: target_type.clone(), - source_index: *source_index, } - } - } else { - // column must be added - let iceberg_field = snapshot_schema.field_by_id(*field_id).ok_or( - Error::new(ErrorKind::Unexpected, "Field not found in snapshot schema") - )?; - - let default_value = if let Some(iceberg_default_value) = - &iceberg_field.initial_default - { - let Literal::Primitive(primitive_literal) = iceberg_default_value else { - return Err(Error::new( - ErrorKind::Unexpected, - format!("Default value for column must be primitive type, but encountered {iceberg_default_value:?}") - )); - }; - Some(primitive_literal.clone()) - } else { - None }; - ColumnSource::Add { - value: default_value, - target_type: target_type.clone(), - } + Ok(column_source) }) - }).collect() + .collect() } fn build_field_id_to_arrow_schema_map( @@ -328,25 +485,19 @@ impl RecordBatchTransformer { ) -> Result> { let mut field_id_to_source_schema = HashMap::new(); for (source_field_idx, source_field) in source_schema.fields.iter().enumerate() { - let this_field_id = source_field - .metadata() - .get(PARQUET_FIELD_ID_META_KEY) - .ok_or_else(|| { + // Check if field has a field ID in metadata + if let Some(field_id_str) = source_field.metadata().get(PARQUET_FIELD_ID_META_KEY) { + let this_field_id = field_id_str.parse().map_err(|e| { Error::new( ErrorKind::DataInvalid, - "field ID not present in parquet metadata", - ) - })? - .parse() - .map_err(|e| { - Error::new( - ErrorKind::DataInvalid, - format!("field id not parseable as an i32: {e}"), + format!("field id not parseable as an i32: {}", e), ) })?; - field_id_to_source_schema - .insert(this_field_id, (source_field.clone(), source_field_idx)); + field_id_to_source_schema + .insert(this_field_id, (source_field.clone(), source_field_idx)); + } + // If field doesn't have a field ID, skip it - name mapping will handle it } Ok(field_id_to_source_schema) @@ -447,7 +598,7 @@ impl RecordBatchTransformer { (dt, _) => { return Err(Error::new( ErrorKind::Unexpected, - format!("unexpected target column type {dt}"), + format!("unexpected target column type {}", dt), )); } }) @@ -466,8 +617,10 @@ mod test { use arrow_schema::{DataType, Field, Schema as ArrowSchema}; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; - use crate::arrow::record_batch_transformer::RecordBatchTransformer; - use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Type}; + use crate::arrow::record_batch_transformer::{ + RecordBatchTransformer, RecordBatchTransformerBuilder, + }; + use crate::spec::{Literal, NestedField, PrimitiveType, Schema, Struct, Type}; #[test] fn build_field_id_to_source_schema_map_works() { @@ -492,7 +645,9 @@ mod test { let snapshot_schema = Arc::new(iceberg_table_schema()); let projected_iceberg_field_ids = [13, 14]; - let mut inst = RecordBatchTransformer::build(snapshot_schema, &projected_iceberg_field_ids); + let mut inst = + RecordBatchTransformerBuilder::new(snapshot_schema, &projected_iceberg_field_ids) + .build(); let result = inst .process_record_batch(source_record_batch_no_migration_required()) @@ -508,7 +663,9 @@ mod test { let snapshot_schema = Arc::new(iceberg_table_schema()); let projected_iceberg_field_ids = [10, 11, 12, 14, 15]; // a, b, c, e, f - let mut inst = RecordBatchTransformer::build(snapshot_schema, &projected_iceberg_field_ids); + let mut inst = + RecordBatchTransformerBuilder::new(snapshot_schema, &projected_iceberg_field_ids) + .build(); let result = inst.process_record_batch(source_record_batch()).unwrap(); @@ -537,7 +694,8 @@ mod test { let projected_iceberg_field_ids = [1, 2, 3]; let mut transformer = - RecordBatchTransformer::build(snapshot_schema, &projected_iceberg_field_ids); + RecordBatchTransformerBuilder::new(snapshot_schema, &projected_iceberg_field_ids) + .build(); let file_schema = Arc::new(ArrowSchema::new(vec![ simple_field("id", DataType::Int32, false, "1"), @@ -696,4 +854,592 @@ mod test { value.to_string(), )])) } + + /// Test for add_files with Parquet files that have NO field IDs (Hive tables). + /// + /// This reproduces the scenario from Iceberg spec where: + /// - Hive-style partitioned Parquet files are imported via add_files procedure + /// - Parquet files originally DO NOT have field IDs (typical for Hive tables) + /// - ArrowReader applies name mapping to assign correct Iceberg field IDs + /// - Iceberg schema assigns field IDs: id (1), name (2), dept (3), subdept (4) + /// - Partition columns (id, dept) have initial_default values + /// + /// Per the Iceberg spec (https://iceberg.apache.org/spec/#column-projection), + /// this scenario requires `schema.name-mapping.default` from table metadata + /// to correctly map Parquet columns by name to Iceberg field IDs. + /// This mapping is now applied in ArrowReader before data is processed. + /// + /// Expected behavior: + /// 1. id=1 (from initial_default) - spec rule #3 + /// 2. name="John Doe" (from Parquet with field_id=2 assigned by reader) - found by field ID + /// 3. dept="hr" (from initial_default) - spec rule #3 + /// 4. subdept="communications" (from Parquet with field_id=4 assigned by reader) - found by field ID + #[test] + fn add_files_with_name_mapping_applied_in_reader() { + // Iceberg schema after add_files: id (partition), name, dept (partition), subdept + let snapshot_schema = Arc::new( + Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int)) + .with_initial_default(Literal::int(1)) + .into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional(3, "dept", Type::Primitive(PrimitiveType::String)) + .with_initial_default(Literal::string("hr")) + .into(), + NestedField::optional(4, "subdept", Type::Primitive(PrimitiveType::String)) + .into(), + ]) + .build() + .unwrap(), + ); + + // Simulate ArrowReader having applied name mapping: + // Original Parquet: name, subdept (NO field IDs) + // After reader.rs applies name mapping: name (field_id=2), subdept (field_id=4) + // + // Note: Partition columns (id, dept) are NOT in the Parquet file - they're in directory paths + use std::collections::HashMap; + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("name", DataType::Utf8, true).with_metadata(HashMap::from([( + "PARQUET:field_id".to_string(), + "2".to_string(), + )])), + Field::new("subdept", DataType::Utf8, true).with_metadata(HashMap::from([( + "PARQUET:field_id".to_string(), + "4".to_string(), + )])), + ])); + + let projected_field_ids = [1, 2, 3, 4]; // id, name, dept, subdept + + let mut transformer = + RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids).build(); + + // Create a Parquet RecordBatch with data for: name="John Doe", subdept="communications" + let parquet_batch = RecordBatch::try_new(parquet_schema, vec![ + Arc::new(StringArray::from(vec!["John Doe"])), + Arc::new(StringArray::from(vec!["communications"])), + ]) + .unwrap(); + + let result = transformer.process_record_batch(parquet_batch).unwrap(); + + // Verify the transformed RecordBatch has: + // - id=1 (from initial_default, not from Parquet) + // - name="John Doe" (from Parquet with correct field_id=2) + // - dept="hr" (from initial_default, not from Parquet) + // - subdept="communications" (from Parquet with correct field_id=4) + assert_eq!(result.num_columns(), 4); + assert_eq!(result.num_rows(), 1); + + let id_column = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_column.value(0), 1); + + let name_column = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(name_column.value(0), "John Doe"); + + let dept_column = result + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(dept_column.value(0), "hr"); + + let subdept_column = result + .column(3) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(subdept_column.value(0), "communications"); + } + + /// Test for bucket partitioning where source columns must be read from data files. + /// + /// This test verifies correct implementation of the Iceberg spec's "Column Projection" rules: + /// > "Return the value from partition metadata if an **Identity Transform** exists for the field" + /// + /// # Why this test is critical + /// + /// The key insight is that partition metadata stores TRANSFORMED values, not source values: + /// - For `bucket(4, id)`, partition metadata has `id_bucket = 2` (the bucket number) + /// - The actual `id` column values (100, 200, 300) are ONLY in the data file + /// + /// If iceberg-rust incorrectly treated bucket-partitioned fields as constants, it would: + /// 1. Replace all `id` values with the constant `2` from partition metadata + /// 2. Break runtime filtering (e.g., `WHERE id = 100` would match no rows) + /// 3. Return incorrect query results + /// + /// # What this test verifies + /// + /// - Bucket-partitioned fields (e.g., `bucket(4, id)`) are read from the data file + /// - The source column `id` contains actual values (100, 200, 300), not constants + /// - Java's `PartitionUtil.constantsMap()` behavior is correctly replicated: + /// ```java + /// if (field.transform().isIdentity()) { // FALSE for bucket transforms + /// idToConstant.put(field.sourceId(), converted); + /// } + /// ``` + /// + /// # Real-world impact + /// + /// This reproduces the failure scenario from Iceberg Java's TestRuntimeFiltering: + /// - Tables partitioned by `bucket(N, col)` are common for load balancing + /// - Queries filter on the source column: `SELECT * FROM tbl WHERE col = value` + /// - Runtime filtering pushes predicates down to Iceberg file scans + /// - Without this fix, the filter would match against constant partition values instead of data + /// + /// # References + /// - Iceberg spec: format/spec.md "Column Projection" + "Partition Transforms" + /// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java + /// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java + #[test] + fn bucket_partitioning_reads_source_column_from_file() { + use crate::spec::{Struct, Transform}; + + // Table schema: id (data column), name (data column), id_bucket (partition column) + let snapshot_schema = Arc::new( + Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + // Partition spec: bucket(4, id) - the id field is bucketed + let partition_spec = Arc::new( + crate::spec::PartitionSpec::builder(snapshot_schema.clone()) + .with_spec_id(0) + .add_partition_field("id", "id_bucket", Transform::Bucket(4)) + .unwrap() + .build() + .unwrap(), + ); + + // Partition data: bucket value is 2 + // In Iceberg, partition data is a Struct where each field corresponds to a partition field + let partition_data = Struct::from_iter(vec![Some(Literal::int(2))]); + + // Parquet file contains both id and name columns + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + simple_field("id", DataType::Int32, false, "1"), + simple_field("name", DataType::Utf8, true, "2"), + ])); + + let projected_field_ids = [1, 2]; // id, name + + let mut transformer = + RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) + .with_partition(partition_spec, partition_data) + .build(); + + // Create a Parquet RecordBatch with actual data + // The id column MUST be read from here, not treated as a constant + let parquet_batch = RecordBatch::try_new(parquet_schema, vec![ + Arc::new(Int32Array::from(vec![100, 200, 300])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])), + ]) + .unwrap(); + + let result = transformer.process_record_batch(parquet_batch).unwrap(); + + // Verify the transformed RecordBatch correctly reads id from the file + // (NOT as a constant from partition metadata) + assert_eq!(result.num_columns(), 2); + assert_eq!(result.num_rows(), 3); + + let id_column = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + // These values MUST come from the Parquet file, not be replaced by constants + assert_eq!(id_column.value(0), 100); + assert_eq!(id_column.value(1), 200); + assert_eq!(id_column.value(2), 300); + + let name_column = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(name_column.value(0), "Alice"); + assert_eq!(name_column.value(1), "Bob"); + assert_eq!(name_column.value(2), "Charlie"); + } + + /// Test that identity-transformed partition fields ARE treated as constants. + /// + /// This is the complement to `bucket_partitioning_reads_source_column_from_file`, + /// verifying that constants_map() correctly identifies identity-transformed + /// partition fields per the Iceberg spec. + /// + /// # Spec requirement (format/spec.md "Column Projection") + /// + /// > "Return the value from partition metadata if an Identity Transform exists for the field + /// > and the partition value is present in the `partition` struct on `data_file` object + /// > in the manifest. This allows for metadata only migrations of Hive tables." + /// + /// # Why identity transforms use constants + /// + /// Unlike bucket/truncate/year/etc., identity transforms don't modify the value: + /// - `identity(dept)` stores the actual `dept` value in partition metadata + /// - Partition metadata has `dept = "engineering"` (the real value, not a hash/bucket) + /// - This value can be used directly without reading the data file + /// + /// # Performance benefit + /// + /// For Hive migrations where partition columns aren't in data files: + /// - Partition metadata provides the column values + /// - No need to read from data files (metadata-only query optimization) + /// - Common pattern: `dept=engineering/subdept=backend/file.parquet` + /// - `dept` and `subdept` are in directory structure, not in `file.parquet` + /// - Iceberg populates these from partition metadata as constants + /// + /// # What this test verifies + /// + /// - Identity-partitioned fields use constants from partition metadata + /// - The `dept` column is populated with `"engineering"` (not read from file) + /// - Java's `PartitionUtil.constantsMap()` behavior is matched: + /// ```java + /// if (field.transform().isIdentity()) { // TRUE for identity + /// idToConstant.put(field.sourceId(), converted); + /// } + /// ``` + /// + /// # References + /// - Iceberg spec: format/spec.md "Column Projection" + /// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java + #[test] + fn identity_partition_uses_constant_from_metadata() { + use crate::spec::{Struct, Transform}; + + // Table schema: id (data column), dept (partition column), name (data column) + let snapshot_schema = Arc::new( + Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "dept", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional(3, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + // Partition spec: identity(dept) - the dept field uses identity transform + let partition_spec = Arc::new( + crate::spec::PartitionSpec::builder(snapshot_schema.clone()) + .with_spec_id(0) + .add_partition_field("dept", "dept", Transform::Identity) + .unwrap() + .build() + .unwrap(), + ); + + // Partition data: dept="engineering" + let partition_data = Struct::from_iter(vec![Some(Literal::string("engineering"))]); + + // Parquet file contains only id and name (dept is in partition path) + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + simple_field("id", DataType::Int32, false, "1"), + simple_field("name", DataType::Utf8, true, "3"), + ])); + + let projected_field_ids = [1, 2, 3]; // id, dept, name + + let mut transformer = + RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) + .with_partition(partition_spec, partition_data) + .build(); + + let parquet_batch = RecordBatch::try_new(parquet_schema, vec![ + Arc::new(Int32Array::from(vec![100, 200])), + Arc::new(StringArray::from(vec!["Alice", "Bob"])), + ]) + .unwrap(); + + let result = transformer.process_record_batch(parquet_batch).unwrap(); + + // Verify the dept column is populated with the constant from partition metadata + assert_eq!(result.num_columns(), 3); + assert_eq!(result.num_rows(), 2); + + let id_column = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_column.value(0), 100); + assert_eq!(id_column.value(1), 200); + + let dept_column = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + // This value MUST come from partition metadata (constant) + assert_eq!(dept_column.value(0), "engineering"); + assert_eq!(dept_column.value(1), "engineering"); + + let name_column = result + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(name_column.value(0), "Alice"); + assert_eq!(name_column.value(1), "Bob"); + } + + /// Test bucket partitioning with renamed source column. + /// + /// This verifies correct behavior for TestRuntimeFiltering.testRenamedSourceColumnTable() in Iceberg Java. + /// When a source column is renamed after partitioning is established, field-ID-based mapping + /// must still correctly identify the column in Parquet files. + /// + /// # Scenario + /// + /// 1. Table created with `bucket(4, id)` partitioning + /// 2. Data written to Parquet files (field_id=1, name="id") + /// 3. Column renamed: `ALTER TABLE ... RENAME COLUMN id TO row_id` + /// 4. Iceberg schema now has: field_id=1, name="row_id" + /// 5. Parquet files still have: field_id=1, name="id" + /// + /// # Expected Behavior Per Iceberg Spec + /// + /// Per the Iceberg spec "Column Projection" section and Java's PartitionUtil.constantsMap(): + /// - Bucket transforms are NON-identity, so partition metadata stores bucket numbers (0-3), not source values + /// - Source columns for non-identity transforms MUST be read from data files + /// - Field-ID-based mapping should find the column by field_id=1 (ignoring name mismatch) + /// - Runtime filtering on `row_id` should work correctly + /// + /// # What This Tests + /// + /// This test ensures that when FileScanTask provides partition_spec and partition_data: + /// - constants_map() correctly identifies that bucket(4, row_id) is NOT an identity transform + /// - The source column (field_id=1) is NOT added to constants_map + /// - Field-ID-based mapping reads actual values from the Parquet file + /// - Values [100, 200, 300] are read, not replaced with bucket constant 2 + /// + /// # References + /// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java::testRenamedSourceColumnTable + /// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java::constantsMap() + /// - Iceberg spec: format/spec.md "Column Projection" section + #[test] + fn test_bucket_partitioning_with_renamed_source_column() { + use crate::spec::{Struct, Transform}; + + // Iceberg schema after rename: row_id (was id), name + let snapshot_schema = Arc::new( + Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "row_id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(), + ); + + // Partition spec: bucket(4, row_id) - but source_id still points to field_id=1 + let partition_spec = Arc::new( + crate::spec::PartitionSpec::builder(snapshot_schema.clone()) + .with_spec_id(0) + .add_partition_field("row_id", "row_id_bucket", Transform::Bucket(4)) + .unwrap() + .build() + .unwrap(), + ); + + // Partition data: bucket value is 2 + let partition_data = Struct::from_iter(vec![Some(Literal::int(2))]); + + // Parquet file has OLD column name "id" but SAME field_id=1 + // Field-ID-based mapping should find this despite name mismatch + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + simple_field("id", DataType::Int32, false, "1"), + simple_field("name", DataType::Utf8, true, "2"), + ])); + + let projected_field_ids = [1, 2]; // row_id (field_id=1), name (field_id=2) + + let mut transformer = + RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) + .with_partition(partition_spec, partition_data) + .build(); + + // Create a Parquet RecordBatch with actual data + // Despite column rename, data should be read via field_id=1 + let parquet_batch = RecordBatch::try_new(parquet_schema, vec![ + Arc::new(Int32Array::from(vec![100, 200, 300])), + Arc::new(StringArray::from(vec!["Alice", "Bob", "Charlie"])), + ]) + .unwrap(); + + let result = transformer.process_record_batch(parquet_batch).unwrap(); + + // Verify the transformed RecordBatch correctly reads data despite name mismatch + assert_eq!(result.num_columns(), 2); + assert_eq!(result.num_rows(), 3); + + let row_id_column = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + // These values MUST come from the Parquet file via field_id=1, + // not be replaced by the bucket constant (2) + assert_eq!(row_id_column.value(0), 100); + assert_eq!(row_id_column.value(1), 200); + assert_eq!(row_id_column.value(2), 300); + + let name_column = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(name_column.value(0), "Alice"); + assert_eq!(name_column.value(1), "Bob"); + assert_eq!(name_column.value(2), "Charlie"); + } + + /// Comprehensive integration test that verifies all 4 Iceberg spec rules work correctly. + /// + /// Per the Iceberg spec (https://iceberg.apache.org/spec/#column-projection), + /// "Values for field ids which are not present in a data file must be resolved + /// according the following rules:" + /// + /// This test creates a scenario where each rule is exercised: + /// - Rule #1: dept (identity-partitioned) -> constant from partition metadata + /// - Rule #2: data (via name mapping) -> read from Parquet file by name + /// - Rule #3: category (initial_default) -> use default value + /// - Rule #4: notes (no default) -> return null + /// + /// # References + /// - Iceberg spec: format/spec.md "Column Projection" section + #[test] + fn test_all_four_spec_rules() { + use crate::spec::Transform; + + // Iceberg schema with columns designed to exercise each spec rule + let snapshot_schema = Arc::new( + Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + // Field in Parquet by field ID (normal case) + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + // Rule #1: Identity-partitioned field - should use partition metadata + NestedField::required(2, "dept", Type::Primitive(PrimitiveType::String)).into(), + // Rule #2: Field resolved by name mapping (ArrowReader already applied) + NestedField::required(3, "data", Type::Primitive(PrimitiveType::String)).into(), + // Rule #3: Field with initial_default + NestedField::optional(4, "category", Type::Primitive(PrimitiveType::String)) + .with_initial_default(Literal::string("default_category")) + .into(), + // Rule #4: Field with no default - should be null + NestedField::optional(5, "notes", Type::Primitive(PrimitiveType::String)) + .into(), + ]) + .build() + .unwrap(), + ); + + // Partition spec: identity transform on dept + let partition_spec = Arc::new( + crate::spec::PartitionSpec::builder(snapshot_schema.clone()) + .with_spec_id(0) + .add_partition_field("dept", "dept", Transform::Identity) + .unwrap() + .build() + .unwrap(), + ); + + // Partition data: dept="engineering" + let partition_data = Struct::from_iter(vec![Some(Literal::string("engineering"))]); + + // Parquet schema: simulates post-ArrowReader state where name mapping already applied + // Has id (field_id=1) and data (field_id=3, assigned by ArrowReader via name mapping) + // Missing: dept (in partition), category (has default), notes (no default) + let parquet_schema = Arc::new(ArrowSchema::new(vec![ + simple_field("id", DataType::Int32, false, "1"), + simple_field("data", DataType::Utf8, false, "3"), + ])); + + let projected_field_ids = [1, 2, 3, 4, 5]; // id, dept, data, category, notes + + let mut transformer = + RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) + .with_partition(partition_spec, partition_data) + .build(); + + let parquet_batch = RecordBatch::try_new(parquet_schema, vec![ + Arc::new(Int32Array::from(vec![100, 200])), + Arc::new(StringArray::from(vec!["value1", "value2"])), + ]) + .unwrap(); + + let result = transformer.process_record_batch(parquet_batch).unwrap(); + + assert_eq!(result.num_columns(), 5); + assert_eq!(result.num_rows(), 2); + + // Verify each column demonstrates the correct spec rule: + + // Normal case: id from Parquet by field ID + let id_column = result + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_column.value(0), 100); + assert_eq!(id_column.value(1), 200); + + // Rule #1: dept from partition metadata (identity transform) + let dept_column = result + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(dept_column.value(0), "engineering"); + assert_eq!(dept_column.value(1), "engineering"); + + // Rule #2: data from Parquet via name mapping + let data_column = result + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(data_column.value(0), "value1"); + assert_eq!(data_column.value(1), "value2"); + + // Rule #3: category from initial_default + let category_column = result + .column(3) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(category_column.value(0), "default_category"); + assert_eq!(category_column.value(1), "default_category"); + + // Rule #4: notes is null (no default, not in Parquet, not in partition) + let notes_column = result + .column(4) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(notes_column.is_null(0)); + assert!(notes_column.is_null(1)); + } } diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index 3f7c29dbf4..fe3f5c8f7e 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -128,6 +128,13 @@ impl ManifestEntryContext { .map(|x| x.as_ref().snapshot_bound_predicate.clone()), deletes, + + // Include partition data and spec from manifest entry + partition: Some(self.manifest_entry.data_file.partition.clone()), + // TODO: Pass actual PartitionSpec through context chain for native flow + partition_spec: None, + // TODO: Extract name_mapping from table metadata property "schema.name-mapping.default" + name_mapping: None, }) } } diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 6884e00b9b..3e319ca062 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -1777,6 +1777,9 @@ pub mod tests { record_count: Some(100), data_file_format: DataFileFormat::Parquet, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, }; test_fn(task); @@ -1791,6 +1794,9 @@ pub mod tests { record_count: None, data_file_format: DataFileFormat::Avro, deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, }; test_fn(task); } diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs index 32fe3ae309..e1ef241a57 100644 --- a/crates/iceberg/src/scan/task.rs +++ b/crates/iceberg/src/scan/task.rs @@ -15,16 +15,39 @@ // specific language governing permissions and limitations // under the License. +use std::sync::Arc; + use futures::stream::BoxStream; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Serialize, Serializer}; use crate::Result; use crate::expr::BoundPredicate; -use crate::spec::{DataContentType, DataFileFormat, ManifestEntryRef, Schema, SchemaRef}; +use crate::spec::{ + DataContentType, DataFileFormat, ManifestEntryRef, NameMapping, PartitionSpec, Schema, + SchemaRef, Struct, +}; /// A stream of [`FileScanTask`]. pub type FileScanTaskStream = BoxStream<'static, Result>; +/// Serialization helper that always returns NotImplementedError. +/// Used for fields that should not be serialized but we want to be explicit about it. +fn serialize_not_implemented(_: &T, _: S) -> std::result::Result +where S: Serializer { + Err(serde::ser::Error::custom( + "Serialization not implemented for this field", + )) +} + +/// Deserialization helper that always returns NotImplementedError. +/// Used for fields that should not be deserialized but we want to be explicit about it. +fn deserialize_not_implemented<'de, D, T>(_: D) -> std::result::Result +where D: serde::Deserializer<'de> { + Err(serde::de::Error::custom( + "Deserialization not implemented for this field", + )) +} + /// A task to scan part of file. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct FileScanTask { @@ -54,6 +77,33 @@ pub struct FileScanTask { /// The list of delete files that may need to be applied to this data file pub deletes: Vec, + + /// Partition data from the manifest entry, used to identify which columns can use + /// constant values from partition metadata vs. reading from the data file. + /// Per the Iceberg spec, only identity-transformed partition fields should use constants. + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(serialize_with = "serialize_not_implemented")] + #[serde(deserialize_with = "deserialize_not_implemented")] + pub partition: Option, + + /// The partition spec for this file, used to distinguish identity transforms + /// (which use partition metadata constants) from non-identity transforms like + /// bucket/truncate (which must read source columns from the data file). + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(serialize_with = "serialize_not_implemented")] + #[serde(deserialize_with = "deserialize_not_implemented")] + pub partition_spec: Option>, + + /// Name mapping from table metadata (property: schema.name-mapping.default), + /// used to resolve field IDs from column names when Parquet files lack field IDs + /// or have field ID conflicts. + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(serialize_with = "serialize_not_implemented")] + #[serde(deserialize_with = "deserialize_not_implemented")] + pub name_mapping: Option>, } impl FileScanTask {