From 411b88469ef1a64da2a13aefe19c855b5ef742cb Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 21 Oct 2025 20:44:33 -0400 Subject: [PATCH 01/10] Stash reading parquet files without field ids. --- crates/iceberg/src/arrow/reader.rs | 382 +++++++++++++++++++++++------ 1 file changed, 305 insertions(+), 77 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index ff4cff0a64..6dd2d20fb8 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -189,8 +189,9 @@ impl ArrowReader { .await?; // Create a projection mask for the batch stream to select which columns in the - // Parquet file that we want in the response - let projection_mask = Self::get_arrow_projection_mask( + // Parquet file that we want in the response. If position-based fallback is used, + // we also get a mapping of Arrow positions to field IDs. + let (projection_mask, arrow_pos_to_field_id) = Self::get_arrow_projection_mask( &task.project_field_ids, &task.schema, record_batch_stream_builder.parquet_schema(), @@ -316,13 +317,56 @@ impl ArrowReader { record_batch_stream_builder .build()? .map(move |batch| match batch { - Ok(batch) => record_batch_transformer.process_record_batch(batch), + Ok(batch) => { + // If we used position-based fallback, add field ID metadata to the schema + let batch_with_metadata = if let Some(ref mapping) = arrow_pos_to_field_id { + Self::add_field_id_metadata_to_batch(batch, mapping)? + } else { + batch + }; + record_batch_transformer.process_record_batch(batch_with_metadata) + } Err(err) => Err(err.into()), }); Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) } + /// Add field ID metadata to a RecordBatch's schema. + /// This is used when position-based fallback is employed for projection, + /// to pass field ID information to RecordBatchTransformer. + fn add_field_id_metadata_to_batch( + batch: RecordBatch, + arrow_pos_to_field_id: &HashMap, + ) -> Result { + use arrow_schema::Field; + + let old_schema = batch.schema(); + let new_fields: Vec<_> = old_schema + .fields() + .iter() + .enumerate() + .map(|(pos, field)| { + let mut metadata = field.metadata().clone(); + if let Some(field_id) = arrow_pos_to_field_id.get(&pos) { + metadata.insert( + PARQUET_FIELD_ID_META_KEY.to_string(), + field_id.to_string(), + ); + } + Field::new(field.name(), field.data_type().clone(), field.is_nullable()) + .with_metadata(metadata) + }) + .collect(); + + let new_schema = Arc::new(ArrowSchema::new_with_metadata( + new_fields, + old_schema.metadata().clone(), + )); + + Ok(RecordBatch::try_new(new_schema, batch.columns().to_vec())?) + } + pub(crate) async fn create_parquet_record_batch_stream_builder( data_file_path: &str, file_io: FileIO, @@ -468,7 +512,13 @@ impl ArrowReader { visit(&mut collector, predicate)?; let iceberg_field_ids = collector.field_ids(); - let field_id_map = build_field_id_map(parquet_schema)?; + + // Try to build field ID map from Parquet metadata + let field_id_map = match build_field_id_map(parquet_schema)? { + Some(map) => map, + // If Parquet file doesn't have field IDs, use position-based fallback + None => build_fallback_field_id_map(parquet_schema), + }; Ok((iceberg_field_ids, field_id_map)) } @@ -500,7 +550,7 @@ impl ArrowReader { iceberg_schema_of_task: &Schema, parquet_schema: &SchemaDescriptor, arrow_schema: &ArrowSchemaRef, - ) -> Result { + ) -> Result<(ProjectionMask, Option>)> { fn type_promotion_is_valid( file_type: Option<&PrimitiveType>, projected_type: Option<&PrimitiveType>, @@ -534,75 +584,144 @@ impl ArrowReader { } if leaf_field_ids.is_empty() { - Ok(ProjectionMask::all()) + Ok((ProjectionMask::all(), None)) } else { - // Build the map between field id and column index in Parquet schema. - let mut column_map = HashMap::new(); - - let fields = arrow_schema.fields(); - // Pre-project only the fields that have been selected, possibly avoiding converting - // some Arrow types that are not yet supported. - let mut projected_fields: HashMap = HashMap::new(); - let projected_arrow_schema = ArrowSchema::new_with_metadata( - fields.filter_leaves(|_, f| { - f.metadata() - .get(PARQUET_FIELD_ID_META_KEY) - .and_then(|field_id| i32::from_str(field_id).ok()) - .is_some_and(|field_id| { - projected_fields.insert((*f).clone(), field_id); - leaf_field_ids.contains(&field_id) - }) - }), - arrow_schema.metadata().clone(), - ); - let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?; - - fields.filter_leaves(|idx, field| { - let Some(field_id) = projected_fields.get(field).cloned() else { - return false; - }; - - let iceberg_field = iceberg_schema_of_task.field_by_id(field_id); - let parquet_iceberg_field = iceberg_schema.field_by_id(field_id); - - if iceberg_field.is_none() || parquet_iceberg_field.is_none() { - return false; - } + // Check if Arrow schema has field ID metadata by sampling the first field + let has_field_ids = arrow_schema.fields().iter().next().is_some_and(|f| { + f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_some() + }); - if !type_promotion_is_valid( - parquet_iceberg_field - .unwrap() - .field_type - .as_primitive_type(), - iceberg_field.unwrap().field_type.as_primitive_type(), - ) { - return false; - } + if has_field_ids { + // Standard path: use field IDs from Arrow schema metadata + // This matches iceberg-java's ReadConf when hasIds() returns true + Self::get_arrow_projection_mask_with_field_ids( + &leaf_field_ids, + iceberg_schema_of_task, + parquet_schema, + arrow_schema, + type_promotion_is_valid, + ).map(|mask| (mask, None)) + } else { + // Fallback path for Parquet files without field IDs (e.g., migrated tables) + // This matches iceberg-java's pruneColumnsFallback() in ParquetSchemaUtil + // Return both the projection mask and the mapping of Arrow positions to field IDs + Self::get_arrow_projection_mask_fallback( + &leaf_field_ids, + parquet_schema, + ) + } + } + } - column_map.insert(field_id, idx); - true - }); + /// Standard projection mask creation using field IDs from Arrow schema metadata. + /// Matches iceberg-java's ParquetSchemaUtil.pruneColumns() + fn get_arrow_projection_mask_with_field_ids( + leaf_field_ids: &[i32], + iceberg_schema_of_task: &Schema, + parquet_schema: &SchemaDescriptor, + arrow_schema: &ArrowSchemaRef, + type_promotion_is_valid: fn(Option<&PrimitiveType>, Option<&PrimitiveType>) -> bool, + ) -> Result { + let mut column_map = HashMap::new(); + let fields = arrow_schema.fields(); + + // Pre-project only the fields that have been selected, possibly avoiding converting + // some Arrow types that are not yet supported. + let mut projected_fields: HashMap = HashMap::new(); + let projected_arrow_schema = ArrowSchema::new_with_metadata( + fields.filter_leaves(|_, f| { + f.metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .and_then(|field_id| i32::from_str(field_id).ok()) + .is_some_and(|field_id| { + projected_fields.insert((*f).clone(), field_id); + leaf_field_ids.contains(&field_id) + }) + }), + arrow_schema.metadata().clone(), + ); + let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?; - // Only project columns that exist in the Parquet file. - // Missing columns will be added by RecordBatchTransformer with default/NULL values. - // This supports schema evolution where new columns are added to the table schema - // but old Parquet files don't have them yet. - let mut indices = vec![]; - for field_id in leaf_field_ids { - if let Some(col_idx) = column_map.get(&field_id) { - indices.push(*col_idx); - } - // Skip fields that don't exist in the Parquet file - they will be added later + fields.filter_leaves(|idx, field| { + let Some(field_id) = projected_fields.get(field).cloned() else { + return false; + }; + + let iceberg_field = iceberg_schema_of_task.field_by_id(field_id); + let parquet_iceberg_field = iceberg_schema.field_by_id(field_id); + + if iceberg_field.is_none() || parquet_iceberg_field.is_none() { + return false; + } + + if !type_promotion_is_valid( + parquet_iceberg_field + .unwrap() + .field_type + .as_primitive_type(), + iceberg_field.unwrap().field_type.as_primitive_type(), + ) { + return false; } - if indices.is_empty() { - // If no columns from the projection exist in the file, project all columns - // This can happen if all requested columns are new and need to be added by the transformer - Ok(ProjectionMask::all()) + column_map.insert(field_id, idx); + true + }); + + // Only project columns that exist in the Parquet file. + // Missing columns will be added by RecordBatchTransformer with default/NULL values. + // This supports schema evolution where new columns are added to the table schema + // but old Parquet files don't have them yet. + let mut indices = vec![]; + for field_id in leaf_field_ids { + if let Some(col_idx) = column_map.get(field_id) { + indices.push(*col_idx); + } + // Skip fields that don't exist in the Parquet file - they will be added later + } + + if indices.is_empty() { + // If no columns from the projection exist in the file, project all columns + // This can happen if all requested columns are new and need to be added by the transformer + Ok(ProjectionMask::all()) + } else { + Ok(ProjectionMask::leaves(parquet_schema, indices)) + } + } + + /// Fallback projection mask creation for Parquet files without field IDs. + /// Uses position-based matching where field_id N maps to position N-1. + /// Matches iceberg-java's ParquetSchemaUtil.pruneColumnsFallback() + /// Returns both the ProjectionMask and a mapping of Arrow positions to field IDs + /// so that field ID metadata can be added to the Arrow schema after projection. + fn get_arrow_projection_mask_fallback( + leaf_field_ids: &[i32], + parquet_schema: &SchemaDescriptor, + ) -> Result<(ProjectionMask, Option>)> { + // Position-based matching: field_id N → position N-1 (field IDs are 1-indexed) + // This matches the addFallbackIds() behavior where sequential IDs are assigned + let mut indices = vec![]; + let mut arrow_pos_to_field_id = HashMap::new(); + + for (arrow_pos, field_id) in leaf_field_ids.iter().enumerate() { + // Convert field_id to position (field_id 1 → position 0, field_id 2 → position 1, etc.) + let position = (*field_id - 1) as usize; + if position < parquet_schema.num_columns() { + indices.push(position); + arrow_pos_to_field_id.insert(arrow_pos, *field_id); } else { - Ok(ProjectionMask::leaves(parquet_schema, indices)) + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Field ID {} maps to position {} which exceeds Parquet schema columns ({})", + field_id, + position, + parquet_schema.num_columns() + ), + )); } } + Ok((ProjectionMask::leaves(parquet_schema, indices), Some(arrow_pos_to_field_id))) } fn get_row_filter( @@ -720,22 +839,18 @@ impl ArrowReader { } /// Build the map of parquet field id to Parquet column index in the schema. -fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result> { +/// Returns None if the Parquet file doesn't have field IDs embedded (e.g., migrated tables). +fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result>> { let mut column_map = HashMap::new(); + let mut has_field_ids = true; + for (idx, field) in parquet_schema.columns().iter().enumerate() { let field_type = field.self_type(); match field_type { ParquetType::PrimitiveType { basic_info, .. } => { if !basic_info.has_id() { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Leave column idx: {}, name: {}, type {:?} in schema doesn't have field id", - idx, - basic_info.name(), - field_type - ), - )); + has_field_ids = false; + break; } column_map.insert(basic_info.id(), idx); } @@ -751,7 +866,27 @@ fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result HashMap { + let mut column_map = HashMap::new(); + + // Assign field IDs starting at 1 based on column position + // This follows iceberg-java's convention where field IDs are 1-indexed + for (idx, _field) in parquet_schema.columns().iter().enumerate() { + let field_id = (idx + 1) as i32; + column_map.insert(field_id, idx); + } + + column_map } /// A visitor to collect field ids from bound predicates. @@ -1595,7 +1730,7 @@ message schema { ); // Finally avoid selecting fields with unsupported data types - let mask = + let (mask, _) = ArrowReader::get_arrow_projection_mask(&[1], &schema, &parquet_schema, &arrow_schema) .expect("Some ProjectionMask"); assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0])); @@ -2041,4 +2176,97 @@ message schema { assert!(col_b.is_null(1)); assert!(col_b.is_null(2)); } + + /// Test reading Parquet files without field ID metadata (e.g., migrated tables). + /// This exercises the position-based fallback path. + #[tokio::test] + async fn test_read_parquet_file_without_field_ids() { + // Create an Iceberg schema with field IDs 1 and 2 + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + // Create an Arrow schema WITHOUT field ID metadata + // This simulates a Parquet file from a migrated table + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("name", DataType::Utf8, false), // NO metadata! + Field::new("age", DataType::Int32, false), // NO metadata! + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + // Create test data + let name_data = vec!["Alice", "Bob", "Charlie"]; + let age_data = vec![30, 25, 35]; + + use arrow_array::Int32Array; + let name_col = Arc::new(StringArray::from(name_data.clone())) as ArrayRef; + let age_col = Arc::new(Int32Array::from(age_data.clone())) as ArrayRef; + + let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![name_col, age_col]).unwrap(); + + // Write Parquet file without field ID metadata + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); + let mut writer = + ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + // Read the file using ArrowReader + let reader = ArrowReaderBuilder::new(file_io).build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/1.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], // Request both fields + predicate: None, + deletes: vec![], + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + // Verify the data was read correctly using position-based fallback + assert_eq!(result.len(), 1); + let batch = &result[0]; + assert_eq!(batch.num_rows(), 3); + assert_eq!(batch.num_columns(), 2); + + // Check name column (field_id 1 → position 0) + let name_array = batch.column(0).as_string::(); + assert_eq!(name_array.value(0), "Alice"); + assert_eq!(name_array.value(1), "Bob"); + assert_eq!(name_array.value(2), "Charlie"); + + // Check age column (field_id 2 → position 1) + let age_array = batch.column(1).as_primitive::(); + assert_eq!(age_array.value(0), 30); + assert_eq!(age_array.value(1), 25); + assert_eq!(age_array.value(2), 35); + } } From df57d37b87fea1ca760862f86d5a5e5de35a931c Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 21 Oct 2025 20:54:18 -0400 Subject: [PATCH 02/10] Format --- crates/iceberg/src/arrow/reader.rs | 39 +++++++++++++++--------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 6dd2d20fb8..86e41e0786 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -349,10 +349,7 @@ impl ArrowReader { .map(|(pos, field)| { let mut metadata = field.metadata().clone(); if let Some(field_id) = arrow_pos_to_field_id.get(&pos) { - metadata.insert( - PARQUET_FIELD_ID_META_KEY.to_string(), - field_id.to_string(), - ); + metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string()); } Field::new(field.name(), field.data_type().clone(), field.is_nullable()) .with_metadata(metadata) @@ -587,9 +584,11 @@ impl ArrowReader { Ok((ProjectionMask::all(), None)) } else { // Check if Arrow schema has field ID metadata by sampling the first field - let has_field_ids = arrow_schema.fields().iter().next().is_some_and(|f| { - f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_some() - }); + let has_field_ids = arrow_schema + .fields() + .iter() + .next() + .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_some()); if has_field_ids { // Standard path: use field IDs from Arrow schema metadata @@ -600,15 +599,13 @@ impl ArrowReader { parquet_schema, arrow_schema, type_promotion_is_valid, - ).map(|mask| (mask, None)) + ) + .map(|mask| (mask, None)) } else { // Fallback path for Parquet files without field IDs (e.g., migrated tables) // This matches iceberg-java's pruneColumnsFallback() in ParquetSchemaUtil // Return both the projection mask and the mapping of Arrow positions to field IDs - Self::get_arrow_projection_mask_fallback( - &leaf_field_ids, - parquet_schema, - ) + Self::get_arrow_projection_mask_fallback(&leaf_field_ids, parquet_schema) } } } @@ -721,7 +718,10 @@ impl ArrowReader { )); } } - Ok((ProjectionMask::leaves(parquet_schema, indices), Some(arrow_pos_to_field_id))) + Ok(( + ProjectionMask::leaves(parquet_schema, indices), + Some(arrow_pos_to_field_id), + )) } fn get_row_filter( @@ -2196,8 +2196,8 @@ message schema { // Create an Arrow schema WITHOUT field ID metadata // This simulates a Parquet file from a migrated table let arrow_schema = Arc::new(ArrowSchema::new(vec![ - Field::new("name", DataType::Utf8, false), // NO metadata! - Field::new("age", DataType::Int32, false), // NO metadata! + Field::new("name", DataType::Utf8, false), // NO metadata! + Field::new("age", DataType::Int32, false), // NO metadata! ])); let tmp_dir = TempDir::new().unwrap(); @@ -2220,8 +2220,7 @@ message schema { .build(); let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); - let mut writer = - ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); writer.write(&to_write).expect("Writing batch"); writer.close().unwrap(); @@ -2237,7 +2236,7 @@ message schema { data_file_path: format!("{}/1.parquet", table_location), data_file_format: DataFileFormat::Parquet, schema: schema.clone(), - project_field_ids: vec![1, 2], // Request both fields + project_field_ids: vec![1, 2], // Request both fields predicate: None, deletes: vec![], })] @@ -2264,7 +2263,9 @@ message schema { assert_eq!(name_array.value(2), "Charlie"); // Check age column (field_id 2 → position 1) - let age_array = batch.column(1).as_primitive::(); + let age_array = batch + .column(1) + .as_primitive::(); assert_eq!(age_array.value(0), 30); assert_eq!(age_array.value(1), 25); assert_eq!(age_array.value(2), 35); From 22070451dec334378ce34fdc9de598bc81793ef8 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 21 Oct 2025 21:12:58 -0400 Subject: [PATCH 03/10] more tests. --- crates/iceberg/src/arrow/reader.rs | 357 +++++++++++++++++++++++++++-- 1 file changed, 334 insertions(+), 23 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 86e41e0786..078d923828 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -687,7 +687,7 @@ impl ArrowReader { } /// Fallback projection mask creation for Parquet files without field IDs. - /// Uses position-based matching where field_id N maps to position N-1. + /// Uses position-based matching where the i-th requested leaf field maps to the i-th column. /// Matches iceberg-java's ParquetSchemaUtil.pruneColumnsFallback() /// Returns both the ProjectionMask and a mapping of Arrow positions to field IDs /// so that field ID metadata can be added to the Arrow schema after projection. @@ -695,33 +695,37 @@ impl ArrowReader { leaf_field_ids: &[i32], parquet_schema: &SchemaDescriptor, ) -> Result<(ProjectionMask, Option>)> { - // Position-based matching: field_id N → position N-1 (field IDs are 1-indexed) - // This matches the addFallbackIds() behavior where sequential IDs are assigned + // Position-based matching for files without field IDs (e.g., migrated tables). + // Field ID N maps to column position N-1 (field IDs are 1-indexed, positions are 0-indexed). + // This matches iceberg-java's addFallbackIds() which assigns sequential IDs 1, 2, 3, ... + // + // For schema evolution, we only project columns that exist in the file. + // Missing columns will be added later by RecordBatchTransformer with NULL values. let mut indices = vec![]; let mut arrow_pos_to_field_id = HashMap::new(); - for (arrow_pos, field_id) in leaf_field_ids.iter().enumerate() { - // Convert field_id to position (field_id 1 → position 0, field_id 2 → position 1, etc.) - let position = (*field_id - 1) as usize; - if position < parquet_schema.num_columns() { - indices.push(position); - arrow_pos_to_field_id.insert(arrow_pos, *field_id); - } else { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Field ID {} maps to position {} which exceeds Parquet schema columns ({})", - field_id, - position, - parquet_schema.num_columns() - ), - )); + for (result_pos, field_id) in leaf_field_ids.iter().enumerate() { + // Map field_id to Parquet column position (field_ids are 1-indexed) + let parquet_pos = (*field_id - 1) as usize; + + if parquet_pos < parquet_schema.num_columns() { + indices.push(parquet_pos); + // result_pos is the position in the final Arrow schema after projection + arrow_pos_to_field_id.insert(result_pos, *field_id); } + // Skip fields that don't exist in the Parquet file - they will be added later by RecordBatchTransformer + } + + if indices.is_empty() { + // If no columns from the projection exist in the file, project all columns + // This can happen if all requested columns are new and need to be added by the transformer + Ok((ProjectionMask::all(), None)) + } else { + Ok(( + ProjectionMask::leaves(parquet_schema, indices), + Some(arrow_pos_to_field_id), + )) } - Ok(( - ProjectionMask::leaves(parquet_schema, indices), - Some(arrow_pos_to_field_id), - )) } fn get_row_filter( @@ -2179,6 +2183,9 @@ message schema { /// Test reading Parquet files without field ID metadata (e.g., migrated tables). /// This exercises the position-based fallback path. + /// + /// Corresponds to Java's ParquetSchemaUtil.addFallbackIds() + pruneColumnsFallback() + /// in /parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java #[tokio::test] async fn test_read_parquet_file_without_field_ids() { // Create an Iceberg schema with field IDs 1 and 2 @@ -2270,4 +2277,308 @@ message schema { assert_eq!(age_array.value(1), 25); assert_eq!(age_array.value(2), 35); } + + /// Test reading Parquet files without field IDs with partial projection. + /// Only a subset of columns are requested, verifying position-based fallback + /// handles column selection correctly. + #[tokio::test] + async fn test_read_parquet_without_field_ids_partial_projection() { + use arrow_array::Int32Array; + + // Create an Iceberg schema with 4 fields + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "col1", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "col2", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(3, "col3", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(4, "col4", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + // Create an Arrow schema WITHOUT field ID metadata + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("col1", DataType::Utf8, false), + Field::new("col2", DataType::Int32, false), + Field::new("col3", DataType::Utf8, false), + Field::new("col4", DataType::Int32, false), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + // Create test data for all 4 columns + let col1_data = Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef; + let col2_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef; + let col3_data = Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef; + let col4_data = Arc::new(Int32Array::from(vec![30, 40])) as ArrayRef; + + let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![ + col1_data, col2_data, col3_data, col4_data, + ]) + .unwrap(); + + // Write Parquet file without field ID metadata + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + // Read the file using ArrowReader, but only request fields 1 and 3 + let reader = ArrowReaderBuilder::new(file_io).build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/1.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 3], // Only request col1 and col3 + predicate: None, + deletes: vec![], + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + // Verify only the requested columns were returned + assert_eq!(result.len(), 1); + let batch = &result[0]; + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 2); // Only 2 columns (col1 and col3) + + // Check col1 (field_id 1 → position 0) + let col1_array = batch.column(0).as_string::(); + assert_eq!(col1_array.value(0), "a"); + assert_eq!(col1_array.value(1), "b"); + + // Check col3 (field_id 3 → position 2 in file, but position 1 in result) + let col3_array = batch.column(1).as_string::(); + assert_eq!(col3_array.value(0), "c"); + assert_eq!(col3_array.value(1), "d"); + } + + /// Test reading Parquet files without field IDs with schema evolution. + /// The Iceberg schema has more fields than the Parquet file, testing that + /// missing columns are filled with NULLs. + #[tokio::test] + async fn test_read_parquet_without_field_ids_schema_evolution() { + use arrow_array::{Array, Int32Array}; + + // Create an Iceberg schema with 3 fields (but file will only have 2) + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "city", Type::Primitive(PrimitiveType::String)).into(), // New field not in file + ]) + .build() + .unwrap(), + ); + + // Create an Arrow schema WITHOUT field ID metadata, with only 2 columns + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("name", DataType::Utf8, false), + Field::new("age", DataType::Int32, false), + // No "city" column in the file + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + // Create test data with only 2 columns + let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef; + let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef; + + let to_write = + RecordBatch::try_new(arrow_schema.clone(), vec![name_data, age_data]).unwrap(); + + // Write Parquet file without field ID metadata + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + // Read the file using ArrowReader, requesting all 3 fields including the missing one + let reader = ArrowReaderBuilder::new(file_io).build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/1.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2, 3], // Request all 3 fields + predicate: None, + deletes: vec![], + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + // Verify the data was read correctly with the missing column as NULL + assert_eq!(result.len(), 1); + let batch = &result[0]; + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 3); // All 3 columns including the missing one + + // Check name column (field_id 1 → position 0) + let name_array = batch.column(0).as_string::(); + assert_eq!(name_array.value(0), "Alice"); + assert_eq!(name_array.value(1), "Bob"); + + // Check age column (field_id 2 → position 1) + let age_array = batch + .column(1) + .as_primitive::(); + assert_eq!(age_array.value(0), 30); + assert_eq!(age_array.value(1), 25); + + // Check city column (field_id 3 → missing, should be NULL) + let city_array = batch.column(2).as_string::(); + assert_eq!(city_array.null_count(), 2); + assert!(city_array.is_null(0)); + assert!(city_array.is_null(1)); + } + + /// Test reading Parquet files without field IDs that have multiple row groups. + /// This ensures the position-based fallback works correctly across row group boundaries. + #[tokio::test] + async fn test_read_parquet_without_field_ids_multiple_row_groups() { + use arrow_array::Int32Array; + + // Create an Iceberg schema + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "value", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + // Create an Arrow schema WITHOUT field ID metadata + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("name", DataType::Utf8, false), + Field::new("value", DataType::Int32, false), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + // Write Parquet file with multiple row groups (small row group size) + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .set_write_batch_size(2) // Force multiple row groups + .set_max_row_group_size(2) // 2 rows per row group + .build(); + + let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap(); + + // Write 6 rows in 3 batches (will create 3 row groups) + for batch_num in 0..3 { + let name_data = Arc::new(StringArray::from(vec![ + format!("name_{}", batch_num * 2), + format!("name_{}", batch_num * 2 + 1), + ])) as ArrayRef; + let value_data = + Arc::new(Int32Array::from(vec![batch_num * 2, batch_num * 2 + 1])) as ArrayRef; + + let batch = + RecordBatch::try_new(arrow_schema.clone(), vec![name_data, value_data]).unwrap(); + writer.write(&batch).expect("Writing batch"); + } + writer.close().unwrap(); + + // Read the file using ArrowReader + let reader = ArrowReaderBuilder::new(file_io).build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/1.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], // Request both fields + predicate: None, + deletes: vec![], + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + // Verify all data from all row groups was read correctly + assert!(!result.is_empty()); + + // Collect all rows from all batches + let mut all_names = Vec::new(); + let mut all_values = Vec::new(); + + for batch in &result { + let name_array = batch.column(0).as_string::(); + let value_array = batch + .column(1) + .as_primitive::(); + + for i in 0..batch.num_rows() { + all_names.push(name_array.value(i).to_string()); + all_values.push(value_array.value(i)); + } + } + + // Verify we got all 6 rows across all row groups + assert_eq!(all_names.len(), 6); + assert_eq!(all_values.len(), 6); + + // Check the data is correct + for i in 0..6 { + assert_eq!(all_names[i], format!("name_{}", i)); + assert_eq!(all_values[i], i as i32); + } + } } From b29a055bc9b41e11a2bfff55686bb753b092cdaa Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 21 Oct 2025 21:31:00 -0400 Subject: [PATCH 04/10] Update comments. --- crates/iceberg/src/arrow/reader.rs | 62 ++++++++---------------------- 1 file changed, 16 insertions(+), 46 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 078d923828..a6bb7e5e2d 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -2188,7 +2188,6 @@ message schema { /// in /parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java #[tokio::test] async fn test_read_parquet_file_without_field_ids() { - // Create an Iceberg schema with field IDs 1 and 2 let schema = Arc::new( Schema::builder() .with_schema_id(1) @@ -2200,18 +2199,16 @@ message schema { .unwrap(), ); - // Create an Arrow schema WITHOUT field ID metadata - // This simulates a Parquet file from a migrated table + // Parquet file from a migrated table - no field ID metadata let arrow_schema = Arc::new(ArrowSchema::new(vec![ - Field::new("name", DataType::Utf8, false), // NO metadata! - Field::new("age", DataType::Int32, false), // NO metadata! + Field::new("name", DataType::Utf8, false), + Field::new("age", DataType::Int32, false), ])); let tmp_dir = TempDir::new().unwrap(); let table_location = tmp_dir.path().to_str().unwrap().to_string(); let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); - // Create test data let name_data = vec!["Alice", "Bob", "Charlie"]; let age_data = vec![30, 25, 35]; @@ -2221,7 +2218,6 @@ message schema { let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![name_col, age_col]).unwrap(); - // Write Parquet file without field ID metadata let props = WriterProperties::builder() .set_compression(Compression::SNAPPY) .build(); @@ -2232,7 +2228,6 @@ message schema { writer.write(&to_write).expect("Writing batch"); writer.close().unwrap(); - // Read the file using ArrowReader let reader = ArrowReaderBuilder::new(file_io).build(); let tasks = Box::pin(futures::stream::iter( @@ -2243,7 +2238,7 @@ message schema { data_file_path: format!("{}/1.parquet", table_location), data_file_format: DataFileFormat::Parquet, schema: schema.clone(), - project_field_ids: vec![1, 2], // Request both fields + project_field_ids: vec![1, 2], predicate: None, deletes: vec![], })] @@ -2257,19 +2252,17 @@ message schema { .await .unwrap(); - // Verify the data was read correctly using position-based fallback assert_eq!(result.len(), 1); let batch = &result[0]; assert_eq!(batch.num_rows(), 3); assert_eq!(batch.num_columns(), 2); - // Check name column (field_id 1 → position 0) + // Verify position-based mapping: field_id 1 → position 0, field_id 2 → position 1 let name_array = batch.column(0).as_string::(); assert_eq!(name_array.value(0), "Alice"); assert_eq!(name_array.value(1), "Bob"); assert_eq!(name_array.value(2), "Charlie"); - // Check age column (field_id 2 → position 1) let age_array = batch .column(1) .as_primitive::(); @@ -2285,7 +2278,6 @@ message schema { async fn test_read_parquet_without_field_ids_partial_projection() { use arrow_array::Int32Array; - // Create an Iceberg schema with 4 fields let schema = Arc::new( Schema::builder() .with_schema_id(1) @@ -2299,7 +2291,6 @@ message schema { .unwrap(), ); - // Create an Arrow schema WITHOUT field ID metadata let arrow_schema = Arc::new(ArrowSchema::new(vec![ Field::new("col1", DataType::Utf8, false), Field::new("col2", DataType::Int32, false), @@ -2311,7 +2302,6 @@ message schema { let table_location = tmp_dir.path().to_str().unwrap().to_string(); let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); - // Create test data for all 4 columns let col1_data = Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef; let col2_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef; let col3_data = Arc::new(StringArray::from(vec!["c", "d"])) as ArrayRef; @@ -2322,7 +2312,6 @@ message schema { ]) .unwrap(); - // Write Parquet file without field ID metadata let props = WriterProperties::builder() .set_compression(Compression::SNAPPY) .build(); @@ -2333,7 +2322,6 @@ message schema { writer.write(&to_write).expect("Writing batch"); writer.close().unwrap(); - // Read the file using ArrowReader, but only request fields 1 and 3 let reader = ArrowReaderBuilder::new(file_io).build(); let tasks = Box::pin(futures::stream::iter( @@ -2344,7 +2332,7 @@ message schema { data_file_path: format!("{}/1.parquet", table_location), data_file_format: DataFileFormat::Parquet, schema: schema.clone(), - project_field_ids: vec![1, 3], // Only request col1 and col3 + project_field_ids: vec![1, 3], predicate: None, deletes: vec![], })] @@ -2358,18 +2346,15 @@ message schema { .await .unwrap(); - // Verify only the requested columns were returned assert_eq!(result.len(), 1); let batch = &result[0]; assert_eq!(batch.num_rows(), 2); - assert_eq!(batch.num_columns(), 2); // Only 2 columns (col1 and col3) + assert_eq!(batch.num_columns(), 2); - // Check col1 (field_id 1 → position 0) let col1_array = batch.column(0).as_string::(); assert_eq!(col1_array.value(0), "a"); assert_eq!(col1_array.value(1), "b"); - // Check col3 (field_id 3 → position 2 in file, but position 1 in result) let col3_array = batch.column(1).as_string::(); assert_eq!(col3_array.value(0), "c"); assert_eq!(col3_array.value(1), "d"); @@ -2382,38 +2367,34 @@ message schema { async fn test_read_parquet_without_field_ids_schema_evolution() { use arrow_array::{Array, Int32Array}; - // Create an Iceberg schema with 3 fields (but file will only have 2) + // Schema with field 3 added after the file was written let schema = Arc::new( Schema::builder() .with_schema_id(1) .with_fields(vec![ NestedField::required(1, "name", Type::Primitive(PrimitiveType::String)).into(), NestedField::required(2, "age", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::optional(3, "city", Type::Primitive(PrimitiveType::String)).into(), // New field not in file + NestedField::optional(3, "city", Type::Primitive(PrimitiveType::String)).into(), ]) .build() .unwrap(), ); - // Create an Arrow schema WITHOUT field ID metadata, with only 2 columns let arrow_schema = Arc::new(ArrowSchema::new(vec![ Field::new("name", DataType::Utf8, false), Field::new("age", DataType::Int32, false), - // No "city" column in the file ])); let tmp_dir = TempDir::new().unwrap(); let table_location = tmp_dir.path().to_str().unwrap().to_string(); let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); - // Create test data with only 2 columns let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef; let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef; let to_write = RecordBatch::try_new(arrow_schema.clone(), vec![name_data, age_data]).unwrap(); - // Write Parquet file without field ID metadata let props = WriterProperties::builder() .set_compression(Compression::SNAPPY) .build(); @@ -2424,7 +2405,6 @@ message schema { writer.write(&to_write).expect("Writing batch"); writer.close().unwrap(); - // Read the file using ArrowReader, requesting all 3 fields including the missing one let reader = ArrowReaderBuilder::new(file_io).build(); let tasks = Box::pin(futures::stream::iter( @@ -2435,7 +2415,7 @@ message schema { data_file_path: format!("{}/1.parquet", table_location), data_file_format: DataFileFormat::Parquet, schema: schema.clone(), - project_field_ids: vec![1, 2, 3], // Request all 3 fields + project_field_ids: vec![1, 2, 3], predicate: None, deletes: vec![], })] @@ -2449,25 +2429,22 @@ message schema { .await .unwrap(); - // Verify the data was read correctly with the missing column as NULL assert_eq!(result.len(), 1); let batch = &result[0]; assert_eq!(batch.num_rows(), 2); - assert_eq!(batch.num_columns(), 3); // All 3 columns including the missing one + assert_eq!(batch.num_columns(), 3); - // Check name column (field_id 1 → position 0) let name_array = batch.column(0).as_string::(); assert_eq!(name_array.value(0), "Alice"); assert_eq!(name_array.value(1), "Bob"); - // Check age column (field_id 2 → position 1) let age_array = batch .column(1) .as_primitive::(); assert_eq!(age_array.value(0), 30); assert_eq!(age_array.value(1), 25); - // Check city column (field_id 3 → missing, should be NULL) + // Verify missing column filled with NULLs let city_array = batch.column(2).as_string::(); assert_eq!(city_array.null_count(), 2); assert!(city_array.is_null(0)); @@ -2480,7 +2457,6 @@ message schema { async fn test_read_parquet_without_field_ids_multiple_row_groups() { use arrow_array::Int32Array; - // Create an Iceberg schema let schema = Arc::new( Schema::builder() .with_schema_id(1) @@ -2492,7 +2468,6 @@ message schema { .unwrap(), ); - // Create an Arrow schema WITHOUT field ID metadata let arrow_schema = Arc::new(ArrowSchema::new(vec![ Field::new("name", DataType::Utf8, false), Field::new("value", DataType::Int32, false), @@ -2502,11 +2477,11 @@ message schema { let table_location = tmp_dir.path().to_str().unwrap().to_string(); let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); - // Write Parquet file with multiple row groups (small row group size) + // Small row group size to create multiple row groups let props = WriterProperties::builder() .set_compression(Compression::SNAPPY) - .set_write_batch_size(2) // Force multiple row groups - .set_max_row_group_size(2) // 2 rows per row group + .set_write_batch_size(2) + .set_max_row_group_size(2) .build(); let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); @@ -2527,7 +2502,6 @@ message schema { } writer.close().unwrap(); - // Read the file using ArrowReader let reader = ArrowReaderBuilder::new(file_io).build(); let tasks = Box::pin(futures::stream::iter( @@ -2538,7 +2512,7 @@ message schema { data_file_path: format!("{}/1.parquet", table_location), data_file_format: DataFileFormat::Parquet, schema: schema.clone(), - project_field_ids: vec![1, 2], // Request both fields + project_field_ids: vec![1, 2], predicate: None, deletes: vec![], })] @@ -2552,10 +2526,8 @@ message schema { .await .unwrap(); - // Verify all data from all row groups was read correctly assert!(!result.is_empty()); - // Collect all rows from all batches let mut all_names = Vec::new(); let mut all_values = Vec::new(); @@ -2571,11 +2543,9 @@ message schema { } } - // Verify we got all 6 rows across all row groups assert_eq!(all_names.len(), 6); assert_eq!(all_values.len(), 6); - // Check the data is correct for i in 0..6 { assert_eq!(all_names[i], format!("name_{}", i)); assert_eq!(all_values[i], i as i32); From ecc805d6fa65ea807f57e6d648e46786ecb951c9 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 21 Oct 2025 21:49:43 -0400 Subject: [PATCH 05/10] Fix struct logic. --- crates/iceberg/src/arrow/reader.rs | 223 +++++++++++++++++++++++------ 1 file changed, 181 insertions(+), 42 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index a6bb7e5e2d..3d0eda6c98 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -572,41 +572,46 @@ impl ArrowReader { } } - let mut leaf_field_ids = vec![]; - for field_id in field_ids { - let field = iceberg_schema_of_task.field_by_id(*field_id); - if let Some(field) = field { - Self::include_leaf_field_id(field, &mut leaf_field_ids); - } + if field_ids.is_empty() { + return Ok((ProjectionMask::all(), None)); } - if leaf_field_ids.is_empty() { - Ok((ProjectionMask::all(), None)) - } else { - // Check if Arrow schema has field ID metadata by sampling the first field - let has_field_ids = arrow_schema - .fields() - .iter() - .next() - .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_some()); - - if has_field_ids { - // Standard path: use field IDs from Arrow schema metadata - // This matches iceberg-java's ReadConf when hasIds() returns true - Self::get_arrow_projection_mask_with_field_ids( - &leaf_field_ids, - iceberg_schema_of_task, - parquet_schema, - arrow_schema, - type_promotion_is_valid, - ) - .map(|mask| (mask, None)) - } else { - // Fallback path for Parquet files without field IDs (e.g., migrated tables) - // This matches iceberg-java's pruneColumnsFallback() in ParquetSchemaUtil - // Return both the projection mask and the mapping of Arrow positions to field IDs - Self::get_arrow_projection_mask_fallback(&leaf_field_ids, parquet_schema) + // Check if Arrow schema has field ID metadata by sampling the first field + let has_field_ids = arrow_schema + .fields() + .iter() + .next() + .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_some()); + + if has_field_ids { + // Standard path: use field IDs from Arrow schema metadata + // This matches iceberg-java's ReadConf when hasIds() returns true + + // Expand top-level field IDs to leaf field IDs for matching against Parquet schema + let mut leaf_field_ids = vec![]; + for field_id in field_ids { + let field = iceberg_schema_of_task.field_by_id(*field_id); + if let Some(field) = field { + Self::include_leaf_field_id(field, &mut leaf_field_ids); + } } + + Self::get_arrow_projection_mask_with_field_ids( + &leaf_field_ids, + iceberg_schema_of_task, + parquet_schema, + arrow_schema, + type_promotion_is_valid, + ) + .map(|mask| (mask, None)) + } else { + // Fallback path for Parquet files without field IDs (e.g., migrated tables) + // This matches iceberg-java's ParquetSchemaUtil.pruneColumnsFallback() + + // Java uses expectedSchema.columns() which returns only top-level fields, + // then maps them by position to top-level Parquet columns. + // Java projects the ENTIRE top-level column including any nested content (structs, lists, maps). + Self::get_arrow_projection_mask_fallback(field_ids, parquet_schema) } } @@ -687,42 +692,51 @@ impl ArrowReader { } /// Fallback projection mask creation for Parquet files without field IDs. - /// Uses position-based matching where the i-th requested leaf field maps to the i-th column. + /// Uses position-based matching where top-level field ID N maps to top-level column N-1. /// Matches iceberg-java's ParquetSchemaUtil.pruneColumnsFallback() /// Returns both the ProjectionMask and a mapping of Arrow positions to field IDs /// so that field ID metadata can be added to the Arrow schema after projection. fn get_arrow_projection_mask_fallback( - leaf_field_ids: &[i32], + field_ids: &[i32], parquet_schema: &SchemaDescriptor, ) -> Result<(ProjectionMask, Option>)> { // Position-based matching for files without field IDs (e.g., migrated tables). - // Field ID N maps to column position N-1 (field IDs are 1-indexed, positions are 0-indexed). + // Top-level field ID N maps to top-level column position N-1 (field IDs are 1-indexed, positions are 0-indexed). // This matches iceberg-java's addFallbackIds() which assigns sequential IDs 1, 2, 3, ... + // to top-level Parquet columns only. + // + // Unlike the standard path which matches leaf field IDs, the fallback path matches + // TOP-LEVEL field IDs to TOP-LEVEL Parquet columns. When projecting a struct/list/map, + // we project the ENTIRE top-level column including all nested content, matching Java's behavior. // // For schema evolution, we only project columns that exist in the file. // Missing columns will be added later by RecordBatchTransformer with NULL values. - let mut indices = vec![]; + + // Parquet schema has top-level fields; we need to find which root columns to project + let parquet_root_fields = parquet_schema.root_schema().get_fields(); + + let mut root_indices = vec![]; let mut arrow_pos_to_field_id = HashMap::new(); - for (result_pos, field_id) in leaf_field_ids.iter().enumerate() { - // Map field_id to Parquet column position (field_ids are 1-indexed) + for (result_pos, field_id) in field_ids.iter().enumerate() { + // Map top-level field_id to top-level Parquet column position (field_ids are 1-indexed) let parquet_pos = (*field_id - 1) as usize; - if parquet_pos < parquet_schema.num_columns() { - indices.push(parquet_pos); + if parquet_pos < parquet_root_fields.len() { + root_indices.push(parquet_pos); // result_pos is the position in the final Arrow schema after projection arrow_pos_to_field_id.insert(result_pos, *field_id); } // Skip fields that don't exist in the Parquet file - they will be added later by RecordBatchTransformer } - if indices.is_empty() { + if root_indices.is_empty() { // If no columns from the projection exist in the file, project all columns // This can happen if all requested columns are new and need to be added by the transformer Ok((ProjectionMask::all(), None)) } else { Ok(( - ProjectionMask::leaves(parquet_schema, indices), + ProjectionMask::roots(parquet_schema, root_indices), Some(arrow_pos_to_field_id), )) } @@ -2551,4 +2565,129 @@ message schema { assert_eq!(all_values[i], i as i32); } } + + /// Test reading Parquet files without field IDs with nested types (struct). + /// Java's pruneColumnsFallback() projects entire top-level columns including nested content. + /// This test verifies that a top-level struct field is projected correctly with all its nested fields. + #[tokio::test] + async fn test_read_parquet_without_field_ids_with_struct() { + use arrow_array::{Int32Array, StructArray}; + use arrow_schema::Fields; + + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required( + 2, + "person", + Type::Struct(crate::spec::StructType::new(vec![ + NestedField::required( + 3, + "name", + Type::Primitive(PrimitiveType::String), + ) + .into(), + NestedField::required(4, "age", Type::Primitive(PrimitiveType::Int)) + .into(), + ])), + ) + .into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new( + "person", + DataType::Struct(Fields::from(vec![ + Field::new("name", DataType::Utf8, false), + Field::new("age", DataType::Int32, false), + ])), + false, + ), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + let id_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef; + let name_data = Arc::new(StringArray::from(vec!["Alice", "Bob"])) as ArrayRef; + let age_data = Arc::new(Int32Array::from(vec![30, 25])) as ArrayRef; + let person_data = Arc::new(StructArray::from(vec![ + ( + Arc::new(Field::new("name", DataType::Utf8, false)), + name_data, + ), + ( + Arc::new(Field::new("age", DataType::Int32, false)), + age_data, + ), + ])) as ArrayRef; + + let to_write = + RecordBatch::try_new(arrow_schema.clone(), vec![id_data, person_data]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + let reader = ArrowReaderBuilder::new(file_io).build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/1.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2], + predicate: None, + deletes: vec![], + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + assert_eq!(result.len(), 1); + let batch = &result[0]; + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 2); + + let id_array = batch + .column(0) + .as_primitive::(); + assert_eq!(id_array.value(0), 1); + assert_eq!(id_array.value(1), 2); + + let person_array = batch.column(1).as_struct(); + assert_eq!(person_array.num_columns(), 2); + + let name_array = person_array.column(0).as_string::(); + assert_eq!(name_array.value(0), "Alice"); + assert_eq!(name_array.value(1), "Bob"); + + let age_array = person_array + .column(1) + .as_primitive::(); + assert_eq!(age_array.value(0), 30); + assert_eq!(age_array.value(1), 25); + } } From 71fd8761aad4cd3baa14f982e3063a83f574ce07 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 22 Oct 2025 07:08:03 -0400 Subject: [PATCH 06/10] Fix error when filtering out all row groups. --- crates/iceberg/src/arrow/reader.rs | 92 ++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 3d0eda6c98..3f7ed2c39c 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -815,6 +815,13 @@ impl ArrowReader { )); }; + // If all row groups were filtered out, return an empty RowSelection (select no rows) + if let Some(selected_row_groups) = selected_row_groups { + if selected_row_groups.is_empty() { + return Ok(RowSelection::from(Vec::new())); + } + } + let mut selected_row_groups_idx = 0; let page_index = column_index @@ -2690,4 +2697,89 @@ message schema { assert_eq!(age_array.value(0), 30); assert_eq!(age_array.value(1), 25); } + + /// Test reading Parquet files without field IDs with a filter that eliminates all row groups. + /// During development of field ID mapping, we saw a panic when row_selection_enabled=true and + /// all row groups are filtered out. + #[tokio::test] + async fn test_read_parquet_without_field_ids_filter_eliminates_all_rows() { + use arrow_array::{Float64Array, Int32Array}; + + // Schema with fields that will use fallback IDs 1, 2, 3 + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "value", Type::Primitive(PrimitiveType::Double)) + .into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + Field::new("value", DataType::Float64, false), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + // Write data where all ids are >= 10 + let id_data = Arc::new(Int32Array::from(vec![10, 11, 12])) as ArrayRef; + let name_data = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef; + let value_data = Arc::new(Float64Array::from(vec![100.0, 200.0, 300.0])) as ArrayRef; + + let to_write = + RecordBatch::try_new(arrow_schema.clone(), vec![id_data, name_data, value_data]) + .unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + // Filter that eliminates all row groups: id < 5 + let predicate = Reference::new("id").less_than(Datum::int(5)); + + // Enable both row_group_filtering and row_selection - triggered the panic + let reader = ArrowReaderBuilder::new(file_io) + .with_row_group_filtering_enabled(true) + .with_row_selection_enabled(true) + .build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/1.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 2, 3], + predicate: Some(predicate.bind(schema, true).unwrap()), + deletes: vec![], + })] + .into_iter(), + )) as FileScanTaskStream; + + // Should no longer panic + let result = reader + .read(tasks) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + // Should return empty results + assert!(result.is_empty() || result.iter().all(|batch| batch.num_rows() == 0)); + } } From 17a0ab185b0c0402229c8ed3b094fc2abaad4f0c Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 22 Oct 2025 09:35:21 -0400 Subject: [PATCH 07/10] Schema evolution with column in the middle for migrated table. --- crates/iceberg/src/arrow/reader.rs | 101 ++++++++++++++++++++++++++++- 1 file changed, 98 insertions(+), 3 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 3f7ed2c39c..e213c42d9e 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -717,15 +717,16 @@ impl ArrowReader { let mut root_indices = vec![]; let mut arrow_pos_to_field_id = HashMap::new(); + let mut arrow_output_pos = 0; - for (result_pos, field_id) in field_ids.iter().enumerate() { + for field_id in field_ids.iter() { // Map top-level field_id to top-level Parquet column position (field_ids are 1-indexed) let parquet_pos = (*field_id - 1) as usize; if parquet_pos < parquet_root_fields.len() { root_indices.push(parquet_pos); - // result_pos is the position in the final Arrow schema after projection - arrow_pos_to_field_id.insert(result_pos, *field_id); + arrow_pos_to_field_id.insert(arrow_output_pos, *field_id); + arrow_output_pos += 1; } // Skip fields that don't exist in the Parquet file - they will be added later by RecordBatchTransformer } @@ -2698,6 +2699,100 @@ message schema { assert_eq!(age_array.value(1), 25); } + /// Test reading Parquet files without field IDs with schema evolution - column added in the middle. + /// When a new column is inserted between existing columns in the schema order, + /// the fallback projection must correctly map field IDs to output positions. + #[tokio::test] + async fn test_read_parquet_without_field_ids_schema_evolution_add_column_in_middle() { + use arrow_array::{Array, Int32Array}; + + let arrow_schema_old = Arc::new(ArrowSchema::new(vec![ + Field::new("col0", DataType::Int32, true), + Field::new("col1", DataType::Int32, true), + ])); + + // New column added between existing columns: col0 (id=1), newCol (id=5), col1 (id=2) + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::optional(1, "col0", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(5, "newCol", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "col1", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::from_path(&table_location).unwrap().build().unwrap(); + + let col0_data = Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef; + let col1_data = Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef; + + let to_write = + RecordBatch::try_new(arrow_schema_old.clone(), vec![col0_data, col1_data]).unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .build(); + + let file = File::create(format!("{}/1.parquet", &table_location)).unwrap(); + let mut writer = ArrowWriter::try_new(file, to_write.schema(), Some(props)).unwrap(); + writer.write(&to_write).expect("Writing batch"); + writer.close().unwrap(); + + let reader = ArrowReaderBuilder::new(file_io).build(); + + let tasks = Box::pin(futures::stream::iter( + vec![Ok(FileScanTask { + start: 0, + length: 0, + record_count: None, + data_file_path: format!("{}/1.parquet", table_location), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1, 5, 2], + predicate: None, + deletes: vec![], + })] + .into_iter(), + )) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + assert_eq!(result.len(), 1); + let batch = &result[0]; + assert_eq!(batch.num_rows(), 2); + assert_eq!(batch.num_columns(), 3); + + let result_col0 = batch + .column(0) + .as_primitive::(); + assert_eq!(result_col0.value(0), 1); + assert_eq!(result_col0.value(1), 2); + + // New column should be NULL (doesn't exist in old file) + let result_newcol = batch + .column(1) + .as_primitive::(); + assert_eq!(result_newcol.null_count(), 2); + assert!(result_newcol.is_null(0)); + assert!(result_newcol.is_null(1)); + + let result_col1 = batch + .column(2) + .as_primitive::(); + assert_eq!(result_col1.value(0), 10); + assert_eq!(result_col1.value(1), 20); + } + /// Test reading Parquet files without field IDs with a filter that eliminates all row groups. /// During development of field ID mapping, we saw a panic when row_selection_enabled=true and /// all row groups are filtered out. From 9a2bcf6685960f0b5c0440d08893226ad027e983 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Wed, 22 Oct 2025 14:09:32 -0400 Subject: [PATCH 08/10] Refactor build_field_id_map. --- crates/iceberg/src/arrow/reader.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index e213c42d9e..4e457f54da 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -868,15 +868,13 @@ impl ArrowReader { /// Returns None if the Parquet file doesn't have field IDs embedded (e.g., migrated tables). fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result>> { let mut column_map = HashMap::new(); - let mut has_field_ids = true; for (idx, field) in parquet_schema.columns().iter().enumerate() { let field_type = field.self_type(); match field_type { ParquetType::PrimitiveType { basic_info, .. } => { if !basic_info.has_id() { - has_field_ids = false; - break; + return Ok(None); } column_map.insert(basic_info.id(), idx); } @@ -892,11 +890,7 @@ fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result Date: Tue, 28 Oct 2025 13:08:52 -0400 Subject: [PATCH 09/10] Address PR feedback. --- .../iceberg/src/arrow/delete_file_loader.rs | 1 + crates/iceberg/src/arrow/reader.rs | 215 ++++++++++-------- 2 files changed, 120 insertions(+), 96 deletions(-) diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index 592ef2eb4a..f6faf144f6 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -63,6 +63,7 @@ impl BasicDeleteFileLoader { data_file_path, self.file_io.clone(), false, + None, ) .await? .build()? diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 25d836ceda..6b08965a2d 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -181,23 +181,58 @@ impl ArrowReader { let delete_filter_rx = delete_file_loader.load_deletes(&task.deletes, task.schema.clone()); - let mut record_batch_stream_builder = Self::create_parquet_record_batch_stream_builder( + // Create initial stream builder to check if the schema needs any modifications + let initial_stream_builder = Self::create_parquet_record_batch_stream_builder( &task.data_file_path, file_io.clone(), should_load_page_index, + None, ) .await?; + // Check if Arrow schema is missing field IDs (for migrated Parquet files) + let missing_field_ids = initial_stream_builder + .schema() + .fields() + .iter() + .next() + .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()); + + // If the schema needs modifications, recreate the stream builder with the modified schema. + // Currently we only modify schemas that are missing field IDs, but other transformations + // may be added in the future. + let mut record_batch_stream_builder = if missing_field_ids { + // Build modified schema with necessary transformations + let arrow_schema = + add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema()); + let options = ArrowReaderOptions::new().with_schema(arrow_schema); + + // Recreate the builder with the modified schema + Self::create_parquet_record_batch_stream_builder( + &task.data_file_path, + file_io.clone(), + should_load_page_index, + Some(options), + ) + .await? + } else { + // Schema doesn't need modifications, use the initial builder + initial_stream_builder + }; + // Create a projection mask for the batch stream to select which columns in the - // Parquet file that we want in the response. If position-based fallback is used, - // we also get a mapping of Arrow positions to field IDs. - let (projection_mask, arrow_pos_to_field_id) = Self::get_arrow_projection_mask( + // Parquet file that we want in the response. + // If we added fallback field IDs, we must use position-based projection (fallback path). + let (projection_mask, _) = Self::get_arrow_projection_mask( &task.project_field_ids, &task.schema, record_batch_stream_builder.parquet_schema(), record_batch_stream_builder.schema(), + missing_field_ids, // Pass whether schema is missing field IDs, to force fallback path )?; - record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask); + + record_batch_stream_builder = + record_batch_stream_builder.with_projection(projection_mask.clone()); // RecordBatchTransformer performs any transformations required on the RecordBatches // that come back from the file, such as type promotion, default column insertion @@ -341,57 +376,18 @@ impl ArrowReader { record_batch_stream_builder .build()? .map(move |batch| match batch { - Ok(batch) => { - // If we used position-based fallback, add field ID metadata to the schema - let batch_with_metadata = if let Some(ref mapping) = arrow_pos_to_field_id { - Self::add_field_id_metadata_to_batch(batch, mapping)? - } else { - batch - }; - record_batch_transformer.process_record_batch(batch_with_metadata) - } + Ok(batch) => record_batch_transformer.process_record_batch(batch), Err(err) => Err(err.into()), }); Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) } - /// Add field ID metadata to a RecordBatch's schema. - /// This is used when position-based fallback is employed for projection, - /// to pass field ID information to RecordBatchTransformer. - fn add_field_id_metadata_to_batch( - batch: RecordBatch, - arrow_pos_to_field_id: &HashMap, - ) -> Result { - use arrow_schema::Field; - - let old_schema = batch.schema(); - let new_fields: Vec<_> = old_schema - .fields() - .iter() - .enumerate() - .map(|(pos, field)| { - let mut metadata = field.metadata().clone(); - if let Some(field_id) = arrow_pos_to_field_id.get(&pos) { - metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string()); - } - Field::new(field.name(), field.data_type().clone(), field.is_nullable()) - .with_metadata(metadata) - }) - .collect(); - - let new_schema = Arc::new(ArrowSchema::new_with_metadata( - new_fields, - old_schema.metadata().clone(), - )); - - Ok(RecordBatch::try_new(new_schema, batch.columns().to_vec())?) - } - pub(crate) async fn create_parquet_record_batch_stream_builder( data_file_path: &str, file_io: FileIO, should_load_page_index: bool, + arrow_reader_options: Option, ) -> Result>> { // Get the metadata for the Parquet file we need to read and build // a reader for the data within @@ -403,12 +399,9 @@ impl ArrowReader { .with_preload_offset_index(true) .with_preload_page_index(should_load_page_index); - // Create the record batch stream builder, which wraps the parquet file reader - let record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options( - parquet_file_reader, - ArrowReaderOptions::new(), - ) - .await?; + let options = arrow_reader_options.unwrap_or_default(); + let record_batch_stream_builder = + ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, options).await?; Ok(record_batch_stream_builder) } @@ -534,10 +527,10 @@ impl ArrowReader { let iceberg_field_ids = collector.field_ids(); - // Try to build field ID map from Parquet metadata + // Build field ID map from Parquet metadata. + // For files without field IDs (migrated tables), use position-based fallback. let field_id_map = match build_field_id_map(parquet_schema)? { Some(map) => map, - // If Parquet file doesn't have field IDs, use position-based fallback None => build_fallback_field_id_map(parquet_schema), }; @@ -571,6 +564,7 @@ impl ArrowReader { iceberg_schema_of_task: &Schema, parquet_schema: &SchemaDescriptor, arrow_schema: &ArrowSchemaRef, + use_fallback: bool, // Force use of position-based projection ) -> Result<(ProjectionMask, Option>)> { fn type_promotion_is_valid( file_type: Option<&PrimitiveType>, @@ -600,14 +594,14 @@ impl ArrowReader { return Ok((ProjectionMask::all(), None)); } - // Check if Arrow schema has field ID metadata by sampling the first field - let has_field_ids = arrow_schema - .fields() - .iter() - .next() - .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_some()); - - if has_field_ids { + // Use fallback path if explicitly requested (schema with fallback field IDs) + if use_fallback { + // Fallback path for Parquet files without field IDs (schema with fallback field IDs). + // Use position-based projection of top-level columns. + // This matches iceberg-java's ParquetSchemaUtil.pruneColumnsFallback() + Self::get_arrow_projection_mask_fallback(field_ids, parquet_schema) + .map(|(mask, mapping)| (mask, Some(mapping))) + } else { // Standard path: use field IDs from Arrow schema metadata // This matches iceberg-java's ReadConf when hasIds() returns true @@ -628,14 +622,6 @@ impl ArrowReader { type_promotion_is_valid, ) .map(|mask| (mask, None)) - } else { - // Fallback path for Parquet files without field IDs (e.g., migrated tables) - // This matches iceberg-java's ParquetSchemaUtil.pruneColumnsFallback() - - // Java uses expectedSchema.columns() which returns only top-level fields, - // then maps them by position to top-level Parquet columns. - // Java projects the ENTIRE top-level column including any nested content (structs, lists, maps). - Self::get_arrow_projection_mask_fallback(field_ids, parquet_schema) } } @@ -717,34 +703,25 @@ impl ArrowReader { /// Fallback projection mask creation for Parquet files without field IDs. /// Uses position-based matching where top-level field ID N maps to top-level column N-1. + /// Projects entire top-level columns including nested content (structs, lists, maps). /// Matches iceberg-java's ParquetSchemaUtil.pruneColumnsFallback() - /// Returns both the ProjectionMask and a mapping of Arrow positions to field IDs - /// so that field ID metadata can be added to the Arrow schema after projection. + /// + /// Returns the ProjectionMask and a mapping of output positions to field IDs. fn get_arrow_projection_mask_fallback( field_ids: &[i32], parquet_schema: &SchemaDescriptor, - ) -> Result<(ProjectionMask, Option>)> { - // Position-based matching for files without field IDs (e.g., migrated tables). - // Top-level field ID N maps to top-level column position N-1 (field IDs are 1-indexed, positions are 0-indexed). - // This matches iceberg-java's addFallbackIds() which assigns sequential IDs 1, 2, 3, ... - // to top-level Parquet columns only. - // - // Unlike the standard path which matches leaf field IDs, the fallback path matches - // TOP-LEVEL field IDs to TOP-LEVEL Parquet columns. When projecting a struct/list/map, - // we project the ENTIRE top-level column including all nested content, matching Java's behavior. - // - // For schema evolution, we only project columns that exist in the file. - // Missing columns will be added later by RecordBatchTransformer with NULL values. - - // Parquet schema has top-level fields; we need to find which root columns to project - let parquet_root_fields = parquet_schema.root_schema().get_fields(); + ) -> Result<(ProjectionMask, HashMap)> { + // Position-based matching for files without field IDs (migrated tables). + // Top-level field ID N maps to top-level column position N-1 (field IDs are 1-indexed). + // Projects ENTIRE top-level columns including nested content, matching Java's behavior. + let parquet_root_fields = parquet_schema.root_schema().get_fields(); let mut root_indices = vec![]; let mut arrow_pos_to_field_id = HashMap::new(); let mut arrow_output_pos = 0; for field_id in field_ids.iter() { - // Map top-level field_id to top-level Parquet column position (field_ids are 1-indexed) + // Map top-level field_id to top-level Parquet column position let parquet_pos = (*field_id - 1) as usize; if parquet_pos < parquet_root_fields.len() { @@ -752,17 +729,15 @@ impl ArrowReader { arrow_pos_to_field_id.insert(arrow_output_pos, *field_id); arrow_output_pos += 1; } - // Skip fields that don't exist in the Parquet file - they will be added later by RecordBatchTransformer + // Skip fields that don't exist in the file - RecordBatchTransformer will add them as NULLs } if root_indices.is_empty() { - // If no columns from the projection exist in the file, project all columns - // This can happen if all requested columns are new and need to be added by the transformer - Ok((ProjectionMask::all(), None)) + Ok((ProjectionMask::all(), HashMap::new())) } else { Ok(( ProjectionMask::roots(parquet_schema, root_indices), - Some(arrow_pos_to_field_id), + arrow_pos_to_field_id, )) } } @@ -963,6 +938,47 @@ fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap Arc { + // Debug assertion to catch misuse during development + debug_assert!( + arrow_schema + .fields() + .iter() + .next() + .is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()), + "Schema already has field IDs" + ); + + // Create schema with position-based fallback field IDs + use arrow_schema::Field; + + let fields_with_fallback_ids: Vec<_> = arrow_schema + .fields() + .iter() + .enumerate() + .map(|(pos, field)| { + let mut metadata = field.metadata().clone(); + // Assign field ID based on position (1-indexed to match Java) + let field_id = (pos + 1) as i32; + metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string()); + + Field::new(field.name(), field.data_type().clone(), field.is_nullable()) + .with_metadata(metadata) + }) + .collect(); + + Arc::new(ArrowSchema::new_with_metadata( + fields_with_fallback_ids, + arrow_schema.metadata().clone(), + )) +} + /// A visitor to collect field ids from bound predicates. struct CollectFieldIdVisitor { field_ids: HashSet, @@ -1779,6 +1795,7 @@ message schema { &schema, &parquet_schema, &arrow_schema, + false, ) .unwrap_err(); @@ -1794,6 +1811,7 @@ message schema { &schema, &parquet_schema, &arrow_schema, + false, ) .unwrap_err(); @@ -1804,9 +1822,14 @@ message schema { ); // Finally avoid selecting fields with unsupported data types - let (mask, _) = - ArrowReader::get_arrow_projection_mask(&[1], &schema, &parquet_schema, &arrow_schema) - .expect("Some ProjectionMask"); + let (mask, _) = ArrowReader::get_arrow_projection_mask( + &[1], + &schema, + &parquet_schema, + &arrow_schema, + false, + ) + .expect("Some ProjectionMask"); assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0])); } From 7c94bf5416d200d71230fcb316fc735f99651980 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 28 Oct 2025 19:56:00 -0400 Subject: [PATCH 10/10] Refactor and fix comments. --- crates/iceberg/src/arrow/reader.rs | 116 +++++++++++------------------ 1 file changed, 42 insertions(+), 74 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 5186f8834e..83bac5b3a6 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -183,7 +183,8 @@ impl ArrowReader { let delete_filter_rx = delete_file_loader.load_deletes(&task.deletes, task.schema.clone()); - // Create initial stream builder to check if the schema needs any modifications + // Migrated tables lack field IDs, requiring us to inspect the schema to choose + // between field-ID-based or position-based projection let initial_stream_builder = Self::create_parquet_record_batch_stream_builder( &task.data_file_path, file_io.clone(), @@ -192,7 +193,7 @@ impl ArrowReader { ) .await?; - // Check if Arrow schema is missing field IDs (for migrated Parquet files) + // Parquet files from Hive/Spark migrations lack field IDs in their metadata let missing_field_ids = initial_stream_builder .schema() .fields() @@ -200,16 +201,13 @@ impl ArrowReader { .next() .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()); - // If the schema needs modifications, recreate the stream builder with the modified schema. - // Currently we only modify schemas that are missing field IDs, but other transformations - // may be added in the future. + // Adding position-based fallback IDs at schema level (not per-batch) enables projection + // on files that lack embedded field IDs. We recreate the builder to apply the modified schema. let mut record_batch_stream_builder = if missing_field_ids { - // Build modified schema with necessary transformations let arrow_schema = add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema()); let options = ArrowReaderOptions::new().with_schema(arrow_schema); - // Recreate the builder with the modified schema Self::create_parquet_record_batch_stream_builder( &task.data_file_path, file_io.clone(), @@ -218,19 +216,17 @@ impl ArrowReader { ) .await? } else { - // Schema doesn't need modifications, use the initial builder initial_stream_builder }; - // Create a projection mask for the batch stream to select which columns in the - // Parquet file that we want in the response. - // If we added fallback field IDs, we must use position-based projection (fallback path). - let (projection_mask, _) = Self::get_arrow_projection_mask( + // Fallback IDs don't match Parquet's embedded field IDs (since they don't exist), + // so we must use position-based projection instead of field-ID matching + let projection_mask = Self::get_arrow_projection_mask( &task.project_field_ids, &task.schema, record_batch_stream_builder.parquet_schema(), record_batch_stream_builder.schema(), - missing_field_ids, // Pass whether schema is missing field IDs, to force fallback path + missing_field_ids, // Whether to use position-based (true) or field-ID-based (false) projection )?; record_batch_stream_builder = @@ -401,6 +397,7 @@ impl ArrowReader { .with_preload_offset_index(true) .with_preload_page_index(should_load_page_index); + // Create the record batch stream builder, which wraps the parquet file reader let options = arrow_reader_options.unwrap_or_default(); let record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options(parquet_file_reader, options).await?; @@ -529,8 +526,7 @@ impl ArrowReader { let iceberg_field_ids = collector.field_ids(); - // Build field ID map from Parquet metadata. - // For files without field IDs (migrated tables), use position-based fallback. + // Without embedded field IDs, we fall back to position-based mapping for compatibility let field_id_map = match build_field_id_map(parquet_schema)? { Some(map) => map, None => build_fallback_field_id_map(parquet_schema), @@ -539,8 +535,8 @@ impl ArrowReader { Ok((iceberg_field_ids, field_id_map)) } - /// Insert the leaf field id into the field_ids using for projection. - /// For nested type, it will recursively insert the leaf field id. + /// Recursively extract leaf field IDs because Parquet projection works at the leaf column level. + /// Nested types (struct/list/map) are flattened in Parquet's columnar format. fn include_leaf_field_id(field: &NestedField, field_ids: &mut Vec) { match field.field_type.as_ref() { Type::Primitive(_) => { @@ -566,8 +562,8 @@ impl ArrowReader { iceberg_schema_of_task: &Schema, parquet_schema: &SchemaDescriptor, arrow_schema: &ArrowSchemaRef, - use_fallback: bool, // Force use of position-based projection - ) -> Result<(ProjectionMask, Option>)> { + use_fallback: bool, // Whether file lacks embedded field IDs (e.g., migrated from Hive/Spark) + ) -> Result { fn type_promotion_is_valid( file_type: Option<&PrimitiveType>, projected_type: Option<&PrimitiveType>, @@ -593,21 +589,16 @@ impl ArrowReader { } if field_ids.is_empty() { - return Ok((ProjectionMask::all(), None)); + return Ok(ProjectionMask::all()); } - // Use fallback path if explicitly requested (schema with fallback field IDs) if use_fallback { - // Fallback path for Parquet files without field IDs (schema with fallback field IDs). - // Use position-based projection of top-level columns. - // This matches iceberg-java's ParquetSchemaUtil.pruneColumnsFallback() + // Position-based projection necessary because file lacks embedded field IDs Self::get_arrow_projection_mask_fallback(field_ids, parquet_schema) - .map(|(mask, mapping)| (mask, Some(mapping))) } else { - // Standard path: use field IDs from Arrow schema metadata - // This matches iceberg-java's ReadConf when hasIds() returns true + // Field-ID-based projection using embedded field IDs from Parquet metadata - // Expand top-level field IDs to leaf field IDs for matching against Parquet schema + // Parquet's columnar format requires leaf-level (not top-level struct/list/map) projection let mut leaf_field_ids = vec![]; for field_id in field_ids { let field = iceberg_schema_of_task.field_by_id(*field_id); @@ -623,12 +614,11 @@ impl ArrowReader { arrow_schema, type_promotion_is_valid, ) - .map(|mask| (mask, None)) } } - /// Standard projection mask creation using field IDs from Arrow schema metadata. - /// Matches iceberg-java's ParquetSchemaUtil.pruneColumns() + /// Standard projection using embedded field IDs from Parquet metadata. + /// For iceberg-java compatibility with ParquetSchemaUtil.pruneColumns(). fn get_arrow_projection_mask_with_field_ids( leaf_field_ids: &[i32], iceberg_schema_of_task: &Schema, @@ -682,65 +672,48 @@ impl ArrowReader { true }); - // Only project columns that exist in the Parquet file. - // Missing columns will be added by RecordBatchTransformer with default/NULL values. - // This supports schema evolution where new columns are added to the table schema - // but old Parquet files don't have them yet. + // Schema evolution: New columns may not exist in old Parquet files. + // We only project existing columns; RecordBatchTransformer adds default/NULL values. let mut indices = vec![]; for field_id in leaf_field_ids { if let Some(col_idx) = column_map.get(field_id) { indices.push(*col_idx); } - // Skip fields that don't exist in the Parquet file - they will be added later } if indices.is_empty() { - // If no columns from the projection exist in the file, project all columns - // This can happen if all requested columns are new and need to be added by the transformer + // Edge case: All requested columns are new (don't exist in file). + // Project all columns so RecordBatchTransformer has a batch to transform. Ok(ProjectionMask::all()) } else { Ok(ProjectionMask::leaves(parquet_schema, indices)) } } - /// Fallback projection mask creation for Parquet files without field IDs. - /// Uses position-based matching where top-level field ID N maps to top-level column N-1. - /// Projects entire top-level columns including nested content (structs, lists, maps). - /// Matches iceberg-java's ParquetSchemaUtil.pruneColumnsFallback() - /// - /// Returns the ProjectionMask and a mapping of output positions to field IDs. + /// Fallback projection for Parquet files without field IDs. + /// Uses position-based matching: field ID N → column position N-1. + /// Projects entire top-level columns (including nested content) for iceberg-java compatibility. fn get_arrow_projection_mask_fallback( field_ids: &[i32], parquet_schema: &SchemaDescriptor, - ) -> Result<(ProjectionMask, HashMap)> { - // Position-based matching for files without field IDs (migrated tables). - // Top-level field ID N maps to top-level column position N-1 (field IDs are 1-indexed). - // Projects ENTIRE top-level columns including nested content, matching Java's behavior. - + ) -> Result { + // Position-based: field_id N → column N-1 (field IDs are 1-indexed) let parquet_root_fields = parquet_schema.root_schema().get_fields(); let mut root_indices = vec![]; - let mut arrow_pos_to_field_id = HashMap::new(); - let mut arrow_output_pos = 0; for field_id in field_ids.iter() { - // Map top-level field_id to top-level Parquet column position let parquet_pos = (*field_id - 1) as usize; if parquet_pos < parquet_root_fields.len() { root_indices.push(parquet_pos); - arrow_pos_to_field_id.insert(arrow_output_pos, *field_id); - arrow_output_pos += 1; } - // Skip fields that don't exist in the file - RecordBatchTransformer will add them as NULLs + // RecordBatchTransformer adds missing columns with NULL values } if root_indices.is_empty() { - Ok((ProjectionMask::all(), HashMap::new())) + Ok(ProjectionMask::all()) } else { - Ok(( - ProjectionMask::roots(parquet_schema, root_indices), - arrow_pos_to_field_id, - )) + Ok(ProjectionMask::roots(parquet_schema, root_indices)) } } @@ -925,13 +898,11 @@ fn build_field_id_map(parquet_schema: &SchemaDescriptor) -> Result HashMap { let mut column_map = HashMap::new(); - // Assign field IDs starting at 1 based on column position - // This follows iceberg-java's convention where field IDs are 1-indexed + // 1-indexed to match iceberg-java's convention for (idx, _field) in parquet_schema.columns().iter().enumerate() { let field_id = (idx + 1) as i32; column_map.insert(field_id, idx); @@ -940,14 +911,13 @@ fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap Arc { - // Debug assertion to catch misuse during development debug_assert!( arrow_schema .fields() @@ -957,7 +927,6 @@ fn add_fallback_field_ids_to_arrow_schema(arrow_schema: &ArrowSchemaRef) -> Arc< "Schema already has field IDs" ); - // Create schema with position-based fallback field IDs use arrow_schema::Field; let fields_with_fallback_ids: Vec<_> = arrow_schema @@ -966,8 +935,7 @@ fn add_fallback_field_ids_to_arrow_schema(arrow_schema: &ArrowSchemaRef) -> Arc< .enumerate() .map(|(pos, field)| { let mut metadata = field.metadata().clone(); - // Assign field ID based on position (1-indexed to match Java) - let field_id = (pos + 1) as i32; + let field_id = (pos + 1) as i32; // 1-indexed for Java compatibility metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string()); Field::new(field.name(), field.data_type().clone(), field.is_nullable()) @@ -1825,7 +1793,7 @@ message schema { ); // Finally avoid selecting fields with unsupported data types - let (mask, _) = ArrowReader::get_arrow_projection_mask( + let mask = ArrowReader::get_arrow_projection_mask( &[1], &schema, &parquet_schema,