Skip to content

Commit fe45218

Browse files
committed
Apply name mapping in ArrowReader instead of RecordBatchTransformer to match Java's behavior.
1 parent 360d626 commit fe45218

File tree

2 files changed

+177
-153
lines changed

2 files changed

+177
-153
lines changed

crates/iceberg/src/arrow/reader.rs

Lines changed: 113 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator
5555
use crate::expr::{BoundPredicate, BoundReference};
5656
use crate::io::{FileIO, FileMetadata, FileRead};
5757
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
58-
use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
58+
use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type};
5959
use crate::utils::available_parallelism;
6060
use crate::{Error, ErrorKind};
6161

@@ -181,7 +181,8 @@ impl ArrowReader {
181181
let should_load_page_index =
182182
(row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty();
183183

184-
let delete_filter_rx = delete_file_loader.load_deletes(&task.deletes, task.schema.clone());
184+
let delete_filter_rx =
185+
delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema));
185186

186187
// Migrated tables lack field IDs, requiring us to inspect the schema to choose
187188
// between field-ID-based or position-based projection
@@ -193,19 +194,48 @@ impl ArrowReader {
193194
)
194195
.await?;
195196

196-
// Parquet files from Hive/Spark migrations lack field IDs in their metadata
197+
// Check if Parquet file has embedded field IDs
198+
// Corresponds to Java's ParquetSchemaUtil.hasIds()
199+
// Reference: parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java:118
197200
let missing_field_ids = initial_stream_builder
198201
.schema()
199202
.fields()
200203
.iter()
201204
.next()
202205
.is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none());
203206

204-
// Adding position-based fallback IDs at schema level (not per-batch) enables projection
205-
// on files that lack embedded field IDs. We recreate the builder to apply the modified schema.
207+
// Three-branch schema resolution strategy matching Java's ReadConf constructor
208+
//
209+
// Per Iceberg spec Column Projection rules:
210+
// "Columns in Iceberg data files are selected by field id. The table schema's column
211+
// names and order may change after a data file is written, and projection must be done
212+
// using field ids."
213+
// https://iceberg.apache.org/spec/#column-projection
214+
//
215+
// When Parquet files lack field IDs (e.g., Hive/Spark migrations via add_files),
216+
// we must assign field IDs BEFORE reading data to enable correct projection.
217+
//
218+
// Java's ReadConf determines field ID strategy:
219+
// - Branch 1: hasIds(fileSchema) → trust embedded field IDs, use pruneColumns()
220+
// - Branch 2: nameMapping present → applyNameMapping(), then pruneColumns()
221+
// - Branch 3: fallback → addFallbackIds(), then pruneColumnsFallback()
206222
let mut record_batch_stream_builder = if missing_field_ids {
207-
let arrow_schema =
208-
add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema());
223+
// Parquet file lacks field IDs - must assign them before reading
224+
let arrow_schema = if let Some(name_mapping) = &task.name_mapping {
225+
// Branch 2: Apply name mapping to assign correct Iceberg field IDs
226+
// Per spec rule #2: "Use schema.name-mapping.default metadata to map field id
227+
// to columns without field id"
228+
// Corresponds to Java's ParquetSchemaUtil.applyNameMapping()
229+
apply_name_mapping_to_arrow_schema(
230+
Arc::clone(initial_stream_builder.schema()),
231+
name_mapping,
232+
)?
233+
} else {
234+
// Branch 3: No name mapping - use position-based fallback IDs
235+
// Corresponds to Java's ParquetSchemaUtil.addFallbackIds()
236+
add_fallback_field_ids_to_arrow_schema(initial_stream_builder.schema())
237+
};
238+
209239
let options = ArrowReaderOptions::new().with_schema(arrow_schema);
210240

211241
Self::create_parquet_record_batch_stream_builder(
@@ -216,11 +246,14 @@ impl ArrowReader {
216246
)
217247
.await?
218248
} else {
249+
// Branch 1: File has embedded field IDs - trust them
219250
initial_stream_builder
220251
};
221252

222-
// Fallback IDs don't match Parquet's embedded field IDs (since they don't exist),
223-
// so we must use position-based projection instead of field-ID matching
253+
// Create projection mask based on field IDs
254+
// - If file has embedded IDs: field-ID-based projection (missing_field_ids=false)
255+
// - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match)
256+
// - If fallback IDs: position-based projection (missing_field_ids=true)
224257
let projection_mask = Self::get_arrow_projection_mask(
225258
&task.project_field_ids,
226259
&task.schema,
@@ -238,7 +271,6 @@ impl ArrowReader {
238271
let mut record_batch_transformer =
239272
RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids())
240273
.with_partition(task.partition_spec.clone(), task.partition.clone())
241-
.with_name_mapping(task.name_mapping.clone())
242274
.build();
243275

244276
if let Some(batch_size) = batch_size {
@@ -922,6 +954,77 @@ fn build_fallback_field_id_map(parquet_schema: &SchemaDescriptor) -> HashMap<i32
922954
column_map
923955
}
924956

957+
/// Apply name mapping to Arrow schema for Parquet files lacking field IDs.
958+
///
959+
/// Assigns Iceberg field IDs based on column names using the name mapping,
960+
/// enabling correct projection on migrated files (e.g., from Hive/Spark via add_files).
961+
///
962+
/// Per Iceberg spec Column Projection rule #2:
963+
/// "Use schema.name-mapping.default metadata to map field id to columns without field id"
964+
/// https://iceberg.apache.org/spec/#column-projection
965+
///
966+
/// Corresponds to Java's ParquetSchemaUtil.applyNameMapping() and ApplyNameMapping visitor.
967+
/// The key difference is Java operates on Parquet MessageType, while we operate on Arrow Schema.
968+
///
969+
/// # Arguments
970+
/// * `arrow_schema` - Arrow schema from Parquet file (without field IDs)
971+
/// * `name_mapping` - Name mapping from table metadata (TableProperties.DEFAULT_NAME_MAPPING)
972+
///
973+
/// # Returns
974+
/// Arrow schema with field IDs assigned based on name mapping
975+
fn apply_name_mapping_to_arrow_schema(
976+
arrow_schema: ArrowSchemaRef,
977+
name_mapping: &NameMapping,
978+
) -> Result<Arc<ArrowSchema>> {
979+
debug_assert!(
980+
arrow_schema
981+
.fields()
982+
.iter()
983+
.next()
984+
.is_none_or(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()),
985+
"Schema already has field IDs - name mapping should not be applied"
986+
);
987+
988+
use arrow_schema::Field;
989+
990+
let fields_with_mapped_ids: Vec<_> = arrow_schema
991+
.fields()
992+
.iter()
993+
.map(|field| {
994+
// Look up this column name in name mapping to get the Iceberg field ID.
995+
// Corresponds to Java's ApplyNameMapping visitor which calls
996+
// nameMapping.find(currentPath()) and returns field.withId() if found.
997+
//
998+
// If the field isn't in the mapping, leave it WITHOUT assigning an ID
999+
// (matching Java's behavior of returning the field unchanged).
1000+
// Later, during projection, fields without IDs are filtered out.
1001+
let mapped_field_opt = name_mapping
1002+
.fields()
1003+
.iter()
1004+
.find(|f| f.names().contains(&field.name().to_string()));
1005+
1006+
let mut metadata = field.metadata().clone();
1007+
1008+
if let Some(mapped_field) = mapped_field_opt {
1009+
if let Some(field_id) = mapped_field.field_id() {
1010+
// Field found in mapping with a field_id → assign it
1011+
metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string());
1012+
}
1013+
// If field_id is None, leave the field without an ID (will be filtered by projection)
1014+
}
1015+
// If field not found in mapping, leave it without an ID (will be filtered by projection)
1016+
1017+
Field::new(field.name(), field.data_type().clone(), field.is_nullable())
1018+
.with_metadata(metadata)
1019+
})
1020+
.collect();
1021+
1022+
Ok(Arc::new(ArrowSchema::new_with_metadata(
1023+
fields_with_mapped_ids,
1024+
arrow_schema.metadata().clone(),
1025+
)))
1026+
}
1027+
9251028
/// Add position-based fallback field IDs to Arrow schema for Parquet files lacking them.
9261029
/// Enables projection on migrated files (e.g., from Hive/Spark).
9271030
///

0 commit comments

Comments
 (0)