diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index ab5a96f751..de6bd1ed59 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -54,8 +54,9 @@ use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator; use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator; use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; +use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field}; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; -use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type}; +use crate::spec::{Datum, NameMapping, NestedField, PrimitiveLiteral, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; @@ -250,12 +251,20 @@ impl ArrowReader { initial_stream_builder }; + // Filter out metadata fields for Parquet projection (they don't exist in files) + let project_field_ids_without_metadata: Vec = task + .project_field_ids + .iter() + .filter(|&&id| !is_metadata_field(id)) + .copied() + .collect(); + // Create projection mask based on field IDs // - If file has embedded IDs: field-ID-based projection (missing_field_ids=false) // - If name mapping applied: field-ID-based projection (missing_field_ids=true but IDs now match) // - If fallback IDs: position-based projection (missing_field_ids=true) let projection_mask = Self::get_arrow_projection_mask( - &task.project_field_ids, + &project_field_ids_without_metadata, &task.schema, record_batch_stream_builder.parquet_schema(), record_batch_stream_builder.schema(), @@ -266,16 +275,20 @@ impl ArrowReader { record_batch_stream_builder.with_projection(projection_mask.clone()); // RecordBatchTransformer performs any transformations required on the RecordBatches - // that come back from the file, such as type promotion, default column insertion - // and column re-ordering. + // that come back from the file, such as type promotion, default column insertion, + // column re-ordering, partition constants, and virtual field addition (like _file) let mut record_batch_transformer_builder = - RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids()); + RecordBatchTransformerBuilder::new(task.schema_ref(), task.project_field_ids()) + .with_constant( + RESERVED_FIELD_ID_FILE, + PrimitiveLiteral::String(task.data_file_path.clone()), + )?; if let (Some(partition_spec), Some(partition_data)) = (task.partition_spec.clone(), task.partition.clone()) { record_batch_transformer_builder = - record_batch_transformer_builder.with_partition(partition_spec, partition_data); + record_batch_transformer_builder.with_partition(partition_spec, partition_data)?; } let mut record_batch_transformer = record_batch_transformer_builder.build(); @@ -416,7 +429,10 @@ impl ArrowReader { record_batch_stream_builder .build()? .map(move |batch| match batch { - Ok(batch) => record_batch_transformer.process_record_batch(batch), + Ok(batch) => { + // Process the record batch (type promotion, column reordering, virtual fields, etc.) + record_batch_transformer.process_record_batch(batch) + } Err(err) => Err(err.into()), }); diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index 07ec43918f..3554191cc5 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -20,17 +20,19 @@ use std::sync::Arc; use arrow_array::{ Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Date32Array, Float32Array, - Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, RecordBatchOptions, StringArray, + Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, RecordBatchOptions, RunArray, + StringArray, StructArray, }; use arrow_buffer::NullBuffer; use arrow_cast::cast; use arrow_schema::{ - DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef, + DataType, Field, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef, }; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::arrow::schema_to_arrow_schema; +use crate::metadata_columns::get_metadata_column_name; use crate::spec::{ Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, Struct, Transform, }; @@ -146,13 +148,14 @@ enum SchemaComparison { /// Builder for RecordBatchTransformer to improve ergonomics when constructing with optional parameters. /// -/// See [`RecordBatchTransformer`] for details on partition spec and partition data. +/// Constant fields are pre-computed for both virtual/metadata fields (like _file) and +/// identity-partitioned fields to avoid duplicate work during batch processing. #[derive(Debug)] pub(crate) struct RecordBatchTransformerBuilder { snapshot_schema: Arc, projected_iceberg_field_ids: Vec, - partition_spec: Option>, - partition_data: Option, + constant_fields: HashMap, + field_id_to_mapped_schema_map: Option>, } impl RecordBatchTransformerBuilder { @@ -163,32 +166,68 @@ impl RecordBatchTransformerBuilder { Self { snapshot_schema, projected_iceberg_field_ids: projected_iceberg_field_ids.to_vec(), - partition_spec: None, - partition_data: None, + constant_fields: HashMap::new(), + field_id_to_mapped_schema_map: None, } } + /// Lazily build and cache the field_id_to_mapped_schema_map + fn get_or_build_field_id_map(&mut self) -> Result<&HashMap> { + if self.field_id_to_mapped_schema_map.is_none() { + let mapped_arrow_schema = Arc::new(schema_to_arrow_schema(&self.snapshot_schema)?); + self.field_id_to_mapped_schema_map = Some( + RecordBatchTransformer::build_field_id_to_arrow_schema_map(&mapped_arrow_schema)?, + ); + } + Ok(self.field_id_to_mapped_schema_map.as_ref().unwrap()) + } + + /// Add a constant value for a specific field ID. + /// This is used for virtual/metadata fields like _file that have constant values per batch. + /// + /// # Arguments + /// * `field_id` - The field ID to associate with the constant + /// * `value` - The constant value for this field + pub(crate) fn with_constant(mut self, field_id: i32, value: PrimitiveLiteral) -> Result { + let arrow_type = RecordBatchTransformer::primitive_literal_to_arrow_type(&value)?; + self.constant_fields.insert(field_id, (arrow_type, value)); + Ok(self) + } + /// Set partition spec and data together for identifying identity-transformed partition columns. /// /// Both partition_spec and partition_data must be provided together since the spec defines /// which fields are identity-partitioned, and the data provides their constant values. - /// One without the other cannot produce a valid constants map. + /// This method computes the partition constants and merges them into constant_fields. pub(crate) fn with_partition( mut self, partition_spec: Arc, partition_data: Struct, - ) -> Self { - self.partition_spec = Some(partition_spec); - self.partition_data = Some(partition_data); - self + ) -> Result { + // Compute partition constants for identity-transformed fields + let partition_constants = constants_map(&partition_spec, &partition_data); + + // Ensure field_id_to_mapped_schema_map is built (needed for Arrow type lookup) + self.get_or_build_field_id_map()?; + + // Add partition constants to constant_fields (get Arrow types from schema) + // Safe to borrow field_id_to_mapped_schema_map again now + let field_map = self.field_id_to_mapped_schema_map.as_ref().unwrap(); + for (field_id, value) in partition_constants { + if let Some((field, _)) = field_map.get(&field_id) { + let arrow_type = field.data_type().clone(); + self.constant_fields.insert(field_id, (arrow_type, value)); + } + } + + Ok(self) } pub(crate) fn build(self) -> RecordBatchTransformer { RecordBatchTransformer { snapshot_schema: self.snapshot_schema, projected_iceberg_field_ids: self.projected_iceberg_field_ids, - partition_spec: self.partition_spec, - partition_data: self.partition_data, + constant_fields: self.constant_fields, batch_transform: None, } } @@ -228,16 +267,10 @@ impl RecordBatchTransformerBuilder { pub(crate) struct RecordBatchTransformer { snapshot_schema: Arc, projected_iceberg_field_ids: Vec, - - /// Partition spec for identifying identity-transformed partition columns (spec rule #1). - /// Only fields with identity transforms use partition data constants; non-identity transforms - /// (bucket, truncate, etc.) must read source columns from data files. - partition_spec: Option>, - - /// Partition data providing constant values for identity-transformed partition columns (spec rule #1). - /// For example, in a file at path `dept=engineering/file.parquet`, this would contain - /// the value "engineering" for the dept field. - partition_data: Option, + // Pre-computed constant field information: field_id -> (arrow_type, value) + // Includes both virtual/metadata fields (like _file) and identity-partitioned fields + // Avoids type conversions during batch processing + constant_fields: HashMap, // BatchTransform gets lazily constructed based on the schema of // the first RecordBatch we receive from the file @@ -279,8 +312,7 @@ impl RecordBatchTransformer { record_batch.schema_ref(), self.snapshot_schema.as_ref(), &self.projected_iceberg_field_ids, - self.partition_spec.as_ref().map(|s| s.as_ref()), - self.partition_data.as_ref(), + &self.constant_fields, )?); self.process_record_batch(record_batch)? @@ -299,8 +331,7 @@ impl RecordBatchTransformer { source_schema: &ArrowSchemaRef, snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32], - partition_spec: Option<&PartitionSpec>, - partition_data: Option<&Struct>, + constant_fields: &HashMap, ) -> Result { let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?); let field_id_to_mapped_schema_map = @@ -311,22 +342,41 @@ impl RecordBatchTransformer { let fields: Result> = projected_iceberg_field_ids .iter() .map(|field_id| { - Ok(field_id_to_mapped_schema_map - .get(field_id) - .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))? - .0 - .clone()) + // Check if this is a constant field (virtual or partition) + if constant_fields.contains_key(field_id) { + // For metadata/virtual fields (like _file), get name from metadata_columns + // For partition fields, get name from schema (they exist in schema) + if let Ok(field_name) = get_metadata_column_name(*field_id) { + // This is a metadata/virtual field + let (arrow_type, _) = constant_fields.get(field_id).unwrap(); + Ok(Arc::new( + Field::new(field_name, arrow_type.clone(), false).with_metadata( + HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + field_id.to_string(), + )]), + ), + )) + } else { + // This is a partition field (exists in schema) + Ok(field_id_to_mapped_schema_map + .get(field_id) + .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))? + .0 + .clone()) + } + } else { + Ok(field_id_to_mapped_schema_map + .get(field_id) + .ok_or(Error::new(ErrorKind::Unexpected, "field not found"))? + .0 + .clone()) + } }) .collect(); let target_schema = Arc::new(ArrowSchema::new(fields?)); - let constants_map = if let (Some(spec), Some(data)) = (partition_spec, partition_data) { - constants_map(spec, data) - } else { - HashMap::new() - }; - match Self::compare_schemas(source_schema, &target_schema) { SchemaComparison::Equivalent => Ok(BatchTransform::PassThrough), SchemaComparison::NameChangesOnly => Ok(BatchTransform::ModifySchema { target_schema }), @@ -336,8 +386,7 @@ impl RecordBatchTransformer { snapshot_schema, projected_iceberg_field_ids, field_id_to_mapped_schema_map, - constants_map, - partition_spec, + constant_fields, )?, target_schema, }), @@ -394,8 +443,7 @@ impl RecordBatchTransformer { snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32], field_id_to_mapped_schema_map: HashMap, - constants_map: HashMap, - _partition_spec: Option<&PartitionSpec>, + constant_fields: &HashMap, ) -> Result> { let field_id_to_source_schema_map = Self::build_field_id_to_arrow_schema_map(source_schema)?; @@ -403,6 +451,17 @@ impl RecordBatchTransformer { projected_iceberg_field_ids .iter() .map(|field_id| { + // Check if this is a constant field (metadata/virtual or identity-partitioned) + // Constant fields always use their pre-computed constant values, regardless of whether + // they exist in the Parquet file. This is per Iceberg spec rule #1: partition metadata + // is authoritative and should be preferred over file data. + if let Some((arrow_type, value)) = constant_fields.get(field_id) { + return Ok(ColumnSource::Add { + value: Some(value.clone()), + target_type: arrow_type.clone(), + }); + } + let (target_field, _) = field_id_to_mapped_schema_map .get(field_id) @@ -451,13 +510,8 @@ impl RecordBatchTransformer { ); // Apply spec's fallback steps for "not present" fields. - let column_source = if let Some(constant_value) = constants_map.get(field_id) { - // Rule #1: Identity partition constant - ColumnSource::Add { - value: Some(constant_value.clone()), - target_type: target_type.clone(), - } - } else if let Some(source) = field_by_id { + // Rule #1 (constants) is handled at the beginning of this function + let column_source = if let Some(source) = field_by_id { source } else { // Rules #2, #3 and #4: @@ -582,6 +636,27 @@ impl RecordBatchTransformer { let vals: Vec> = vec![None; num_rows]; Arc::new(Float64Array::from(vals)) } + (DataType::RunEndEncoded(_, _), Some(PrimitiveLiteral::String(value))) => { + // Create Run-End Encoded array for constant string values (e.g., file paths) + // This is more memory-efficient than repeating the same value for every row + let run_ends = if num_rows == 0 { + Int32Array::from(Vec::::new()) + } else { + Int32Array::from(vec![num_rows as i32]) + }; + let values = if num_rows == 0 { + StringArray::from(Vec::<&str>::new()) + } else { + StringArray::from(vec![value.as_str()]) + }; + Arc::new(RunArray::try_new(&run_ends, &values).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to create RunArray for constant string", + ) + .with_source(e) + })?) + } (DataType::Utf8, Some(PrimitiveLiteral::String(value))) => { Arc::new(StringArray::from(vec![value.clone(); num_rows])) } @@ -620,6 +695,35 @@ impl RecordBatchTransformer { } }) } + + /// Converts a PrimitiveLiteral to its corresponding Arrow DataType. + /// This is used for virtual fields to determine the Arrow type based on the constant value. + fn primitive_literal_to_arrow_type(literal: &PrimitiveLiteral) -> Result { + Ok(match literal { + PrimitiveLiteral::Boolean(_) => DataType::Boolean, + PrimitiveLiteral::Int(_) => DataType::Int32, + PrimitiveLiteral::Long(_) => DataType::Int64, + PrimitiveLiteral::Float(_) => DataType::Float32, + PrimitiveLiteral::Double(_) => DataType::Float64, + PrimitiveLiteral::String(_) => { + // Use Run-End Encoding for constant strings (memory efficient) + let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false)); + // Note that this is nullable, as Arrow expects this when building the + // final Arrow schema with `RunArray::try_new`. + let values_field = Arc::new(Field::new("values", DataType::Utf8, true)); + DataType::RunEndEncoded(run_ends_field, values_field) + } + PrimitiveLiteral::Binary(_) => DataType::Binary, + PrimitiveLiteral::Int128(_) => DataType::Decimal128(38, 0), + PrimitiveLiteral::UInt128(_) => DataType::Decimal128(38, 0), + PrimitiveLiteral::AboveMax | PrimitiveLiteral::BelowMin => { + return Err(Error::new( + ErrorKind::Unexpected, + "Cannot create arrow type for AboveMax/BelowMin literal", + )); + } + }) + } } #[cfg(test)] @@ -1136,6 +1240,7 @@ mod test { let mut transformer = RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) .with_partition(partition_spec, partition_data) + .expect("Failed to add partition constants") .build(); // Create a Parquet RecordBatch with actual data @@ -1256,6 +1361,7 @@ mod test { let mut transformer = RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) .with_partition(partition_spec, partition_data) + .expect("Failed to add partition constants") .build(); let parquet_batch = RecordBatch::try_new(parquet_schema, vec![ @@ -1371,6 +1477,7 @@ mod test { let mut transformer = RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) .with_partition(partition_spec, partition_data) + .expect("Failed to add partition constants") .build(); // Create a Parquet RecordBatch with actual data @@ -1475,6 +1582,7 @@ mod test { let mut transformer = RecordBatchTransformerBuilder::new(snapshot_schema, &projected_field_ids) .with_partition(partition_spec, partition_data) + .expect("Failed to add partition constants") .build(); let parquet_batch = RecordBatch::try_new(parquet_schema, vec![ diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index aae8efed74..8d8f40f72d 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -96,4 +96,5 @@ mod utils; pub mod writer; mod delete_vector; +pub mod metadata_columns; pub mod puffin; diff --git a/crates/iceberg/src/metadata_columns.rs b/crates/iceberg/src/metadata_columns.rs new file mode 100644 index 0000000000..c69af33d29 --- /dev/null +++ b/crates/iceberg/src/metadata_columns.rs @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Metadata columns (virtual/reserved fields) for Iceberg tables. +//! +//! This module defines metadata columns that can be requested in projections +//! but are not stored in data files. Instead, they are computed on-the-fly +//! during reading. Examples include the _file column (file path) and future +//! columns like partition values or row numbers. + +use crate::{Error, ErrorKind, Result}; + +/// Reserved field ID for the file path (_file) column per Iceberg spec +pub const RESERVED_FIELD_ID_FILE: i32 = 2147483646; + +/// Reserved column name for the file path metadata column +pub const RESERVED_COL_NAME_FILE: &str = "_file"; + +/// Returns the column name for a metadata field ID. +/// +/// # Arguments +/// * `field_id` - The metadata field ID +/// +/// # Returns +/// The name of the metadata column, or an error if the field ID is not recognized +pub fn get_metadata_column_name(field_id: i32) -> Result<&'static str> { + match field_id { + RESERVED_FIELD_ID_FILE => Ok(RESERVED_COL_NAME_FILE), + _ => { + if field_id > 2147483447 { + Err(Error::new( + ErrorKind::Unexpected, + format!("Unsupported metadata field ID: {field_id}"), + )) + } else { + Err(Error::new( + ErrorKind::Unexpected, + format!("Field ID {field_id} is not a metadata field"), + )) + } + } + } +} + +/// Returns the field ID for a metadata column name. +/// +/// # Arguments +/// * `column_name` - The metadata column name +/// +/// # Returns +/// The field ID of the metadata column, or an error if the column name is not recognized +pub fn get_metadata_field_id(column_name: &str) -> Result { + match column_name { + RESERVED_COL_NAME_FILE => Ok(RESERVED_FIELD_ID_FILE), + _ => Err(Error::new( + ErrorKind::Unexpected, + format!("Unknown/unsupported metadata column name: {column_name}"), + )), + } +} + +/// Checks if a field ID is a metadata field. +/// +/// # Arguments +/// * `field_id` - The field ID to check +/// +/// # Returns +/// `true` if the field ID is a (currently supported) metadata field, `false` otherwise +pub fn is_metadata_field(field_id: i32) -> bool { + field_id == RESERVED_FIELD_ID_FILE + // Additional metadata fields can be checked here in the future +} + +/// Checks if a column name is a metadata column. +/// +/// # Arguments +/// * `column_name` - The column name to check +/// +/// # Returns +/// `true` if the column name is a metadata column, `false` otherwise +pub fn is_metadata_column_name(column_name: &str) -> bool { + get_metadata_field_id(column_name).is_ok() +} diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 3e319ca062..24c03b0b2c 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -36,6 +36,7 @@ use crate::delete_file_index::DeleteFileIndex; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; use crate::io::FileIO; +use crate::metadata_columns::{get_metadata_field_id, is_metadata_column_name}; use crate::runtime::spawn; use crate::spec::{DataContentType, SnapshotRef}; use crate::table::Table; @@ -217,9 +218,13 @@ impl<'a> TableScanBuilder<'a> { let schema = snapshot.schema(self.table.metadata())?; - // Check that all column names exist in the schema. + // Check that all column names exist in the schema (skip reserved columns). if let Some(column_names) = self.column_names.as_ref() { for column_name in column_names { + // Skip reserved columns that don't exist in the schema + if is_metadata_column_name(column_name) { + continue; + } if schema.field_by_name(column_name).is_none() { return Err(Error::new( ErrorKind::DataInvalid, @@ -240,6 +245,12 @@ impl<'a> TableScanBuilder<'a> { }); for column_name in column_names.iter() { + // Handle metadata columns (like "_file") + if is_metadata_column_name(column_name) { + field_ids.push(get_metadata_field_id(column_name)?); + continue; + } + let field_id = schema.field_id_by_name(column_name).ok_or_else(|| { Error::new( ErrorKind::DataInvalid, @@ -254,10 +265,10 @@ impl<'a> TableScanBuilder<'a> { Error::new( ErrorKind::FeatureUnsupported, format!( - "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}" - ), - ) - })?; + "Column {column_name} is not a direct child of schema but a nested field, which is not supported now. Schema: {schema}" + ), + ) + })?; field_ids.push(field_id); } @@ -559,8 +570,10 @@ pub mod tests { use std::fs::File; use std::sync::Arc; + use arrow_array::cast::AsArray; use arrow_array::{ - ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, + Array, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, RecordBatch, + StringArray, }; use futures::{TryStreamExt, stream}; use minijinja::value::Value; @@ -575,6 +588,7 @@ pub mod tests { use crate::arrow::ArrowReaderBuilder; use crate::expr::{BoundPredicate, Reference}; use crate::io::{FileIO, OutputFile}; + use crate::metadata_columns::RESERVED_COL_NAME_FILE; use crate::scan::FileScanTask; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry, @@ -1800,4 +1814,319 @@ pub mod tests { }; test_fn(task); } + + #[tokio::test] + async fn test_select_with_file_column() { + use arrow_array::cast::AsArray; + + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select regular columns plus the _file column + let table_scan = fixture + .table + .scan() + .select(["x", RESERVED_COL_NAME_FILE]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Verify we have 2 columns: x and _file + assert_eq!(batches[0].num_columns(), 2); + + // Verify the x column exists and has correct data + let x_col = batches[0].column_by_name("x").unwrap(); + let x_arr = x_col.as_primitive::(); + assert_eq!(x_arr.value(0), 1); + + // Verify the _file column exists + let file_col = batches[0].column_by_name(RESERVED_COL_NAME_FILE); + assert!( + file_col.is_some(), + "_file column should be present in the batch" + ); + + // Verify the _file column contains a file path + let file_col = file_col.unwrap(); + assert!( + matches!( + file_col.data_type(), + arrow_schema::DataType::RunEndEncoded(_, _) + ), + "_file column should use RunEndEncoded type" + ); + + // Decode the RunArray to verify it contains the file path + let run_array = file_col + .as_any() + .downcast_ref::>() + .expect("_file column should be a RunArray"); + + let values = run_array.values(); + let string_values = values.as_string::(); + assert_eq!(string_values.len(), 1, "Should have a single file path"); + + let file_path = string_values.value(0); + assert!( + file_path.ends_with(".parquet"), + "File path should end with .parquet, got: {}", + file_path + ); + } + + #[tokio::test] + async fn test_select_file_column_position() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select columns in specific order: x, _file, z + let table_scan = fixture + .table + .scan() + .select(["x", RESERVED_COL_NAME_FILE, "z"]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches[0].num_columns(), 3); + + // Verify column order: x at position 0, _file at position 1, z at position 2 + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), "x"); + assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_FILE); + assert_eq!(schema.field(2).name(), "z"); + + // Verify columns by name also works + assert!(batches[0].column_by_name("x").is_some()); + assert!(batches[0].column_by_name(RESERVED_COL_NAME_FILE).is_some()); + assert!(batches[0].column_by_name("z").is_some()); + } + + #[tokio::test] + async fn test_select_file_column_only() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select only the _file column + let table_scan = fixture + .table + .scan() + .select([RESERVED_COL_NAME_FILE]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Should have exactly 1 column + assert_eq!(batches[0].num_columns(), 1); + + // Verify it's the _file column + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE); + + // Verify the batch has the correct number of rows + // The scan reads files 1.parquet and 3.parquet (2.parquet is deleted) + // Each file has 1024 rows, so total is 2048 rows + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 2048); + } + + #[tokio::test] + async fn test_file_column_with_multiple_files() { + use std::collections::HashSet; + + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select x and _file columns + let table_scan = fixture + .table + .scan() + .select(["x", RESERVED_COL_NAME_FILE]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Collect all unique file paths from the batches + let mut file_paths = HashSet::new(); + for batch in &batches { + let file_col = batch.column_by_name(RESERVED_COL_NAME_FILE).unwrap(); + let run_array = file_col + .as_any() + .downcast_ref::>() + .expect("_file column should be a RunArray"); + + let values = run_array.values(); + let string_values = values.as_string::(); + for i in 0..string_values.len() { + file_paths.insert(string_values.value(i).to_string()); + } + } + + // We should have multiple files (the test creates 1.parquet and 3.parquet) + assert!(!file_paths.is_empty(), "Should have at least one file path"); + + // All paths should end with .parquet + for path in &file_paths { + assert!( + path.ends_with(".parquet"), + "All file paths should end with .parquet, got: {}", + path + ); + } + } + + #[tokio::test] + async fn test_file_column_at_start() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select _file at the start + let table_scan = fixture + .table + .scan() + .select([RESERVED_COL_NAME_FILE, "x", "y"]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches[0].num_columns(), 3); + + // Verify _file is at position 0 + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_FILE); + assert_eq!(schema.field(1).name(), "x"); + assert_eq!(schema.field(2).name(), "y"); + } + + #[tokio::test] + async fn test_file_column_at_end() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select _file at the end + let table_scan = fixture + .table + .scan() + .select(["x", "y", RESERVED_COL_NAME_FILE]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert_eq!(batches[0].num_columns(), 3); + + // Verify _file is at position 2 (the end) + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), "x"); + assert_eq!(schema.field(1).name(), "y"); + assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE); + } + + #[tokio::test] + async fn test_select_with_repeated_column_names() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + // Select with repeated column names - both regular columns and virtual columns + // Repeated columns should appear multiple times in the result (duplicates are allowed) + let table_scan = fixture + .table + .scan() + .select([ + "x", + RESERVED_COL_NAME_FILE, + "x", // x repeated + "y", + RESERVED_COL_NAME_FILE, // _file repeated + "y", // y repeated + ]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batch_stream = table_scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + // Verify we have exactly 6 columns (duplicates are allowed and preserved) + assert_eq!( + batches[0].num_columns(), + 6, + "Should have exactly 6 columns with duplicates" + ); + + let schema = batches[0].schema(); + + // Verify columns appear in the exact order requested: x, _file, x, y, _file, y + assert_eq!(schema.field(0).name(), "x", "Column 0 should be x"); + assert_eq!( + schema.field(1).name(), + RESERVED_COL_NAME_FILE, + "Column 1 should be _file" + ); + assert_eq!( + schema.field(2).name(), + "x", + "Column 2 should be x (duplicate)" + ); + assert_eq!(schema.field(3).name(), "y", "Column 3 should be y"); + assert_eq!( + schema.field(4).name(), + RESERVED_COL_NAME_FILE, + "Column 4 should be _file (duplicate)" + ); + assert_eq!( + schema.field(5).name(), + "y", + "Column 5 should be y (duplicate)" + ); + + // Verify all columns have correct data types + assert!( + matches!(schema.field(0).data_type(), arrow_schema::DataType::Int64), + "Column x should be Int64" + ); + assert!( + matches!(schema.field(2).data_type(), arrow_schema::DataType::Int64), + "Column x (duplicate) should be Int64" + ); + assert!( + matches!(schema.field(3).data_type(), arrow_schema::DataType::Int64), + "Column y should be Int64" + ); + assert!( + matches!(schema.field(5).data_type(), arrow_schema::DataType::Int64), + "Column y (duplicate) should be Int64" + ); + assert!( + matches!( + schema.field(1).data_type(), + arrow_schema::DataType::RunEndEncoded(_, _) + ), + "_file column should use RunEndEncoded type" + ); + assert!( + matches!( + schema.field(4).data_type(), + arrow_schema::DataType::RunEndEncoded(_, _) + ), + "_file column (duplicate) should use RunEndEncoded type" + ); + } }