Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
aab78d6
Add REE file column helpers
gbrgr Nov 4, 2025
ee21cab
Add helper tests
gbrgr Nov 4, 2025
37b52e2
Add constants
gbrgr Nov 4, 2025
44463a0
Add support for _file constant
gbrgr Nov 4, 2025
b5449f6
Merge branch 'main' into feature/gb/file-column
gbrgr Nov 4, 2025
e034009
Update tests
gbrgr Nov 4, 2025
4f0a4f1
Fix clippy warning
gbrgr Nov 4, 2025
51f76d3
Fix doc test
gbrgr Nov 4, 2025
d84e16b
Track in field ids
gbrgr Nov 4, 2025
984dacd
Merge branch 'main' into feature/gb/file-column
gbrgr Nov 4, 2025
bd478cb
Add test
gbrgr Nov 4, 2025
8593db0
Merge branch 'feature/gb/file-column' of github.com:RelationalAI/iceb…
gbrgr Nov 4, 2025
9b186c7
Allow repeated virtual file column selection
gbrgr Nov 4, 2025
30ae5fb
Merge branch 'main' into feature/gb/file-column
gbrgr Nov 5, 2025
adf0da0
Refactor into own transformer step
gbrgr Nov 7, 2025
f4336a8
Merge branch 'feature/gb/file-column' of github.com:RelationalAI/iceb…
gbrgr Nov 7, 2025
ef3a965
Revert "Refactor into own transformer step"
gbrgr Nov 7, 2025
534490b
Avoid special casing in batch creation
gbrgr Nov 7, 2025
04bf463
.
gbrgr Nov 7, 2025
9e88edf
Modify record batch transformer to support reserved fields
gbrgr Nov 12, 2025
060b45d
Add metadata column helper functions
gbrgr Nov 12, 2025
8572dae
Store fields instead of constants
gbrgr Nov 12, 2025
f273add
Add comment
gbrgr Nov 12, 2025
5aa92ae
Adapt comment
gbrgr Nov 12, 2025
c05b886
.
gbrgr Nov 12, 2025
33bb0ad
Adapt error message
gbrgr Nov 12, 2025
42167ff
Consider field_id range
gbrgr Nov 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ use crate::expr::visitors::page_index_evaluator::PageIndexEvaluator;
use crate::expr::visitors::row_group_metrics_evaluator::RowGroupMetricsEvaluator;
use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type};
use crate::spec::{Datum, NestedField, PrimitiveLiteral, PrimitiveType, Schema, Type};
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind};

Expand Down Expand Up @@ -220,9 +221,16 @@ impl ArrowReader {
};

// Fallback IDs don't match Parquet's embedded field IDs (since they don't exist),
// Filter out reserved fields for Parquet projection
let project_field_ids_without_reserved: Vec<i32> = task
.project_field_ids
.iter()
.filter(|&&id| !is_metadata_field(id))
.copied()
.collect();
// so we must use position-based projection instead of field-ID matching
let projection_mask = Self::get_arrow_projection_mask(
&task.project_field_ids,
&project_field_ids_without_reserved,
&task.schema,
record_batch_stream_builder.parquet_schema(),
record_batch_stream_builder.schema(),
Expand All @@ -233,10 +241,14 @@ impl ArrowReader {
record_batch_stream_builder.with_projection(projection_mask.clone());

// RecordBatchTransformer performs any transformations required on the RecordBatches
// that come back from the file, such as type promotion, default column insertion
// and column re-ordering
// that come back from the file, such as type promotion, default column insertion,
// column re-ordering, and virtual field addition (like _file)
let mut record_batch_transformer =
RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids());
RecordBatchTransformer::build(task.schema_ref(), &task.project_field_ids)
.with_constant(
RESERVED_FIELD_ID_FILE,
PrimitiveLiteral::String(task.data_file_path.clone()),
)?;

if let Some(batch_size) = batch_size {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
Expand Down Expand Up @@ -374,7 +386,10 @@ impl ArrowReader {
record_batch_stream_builder
.build()?
.map(move |batch| match batch {
Ok(batch) => record_batch_transformer.process_record_batch(batch),
Ok(batch) => {
// Process the record batch (type promotion, column reordering, virtual fields, etc.)
record_batch_transformer.process_record_batch(batch)
}
Err(err) => Err(err.into()),
});

Expand Down
109 changes: 102 additions & 7 deletions crates/iceberg/src/arrow/record_batch_transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ use std::sync::Arc;

use arrow_array::{
Array as ArrowArray, ArrayRef, BinaryArray, BooleanArray, Date32Array, Float32Array,
Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, RecordBatchOptions, StringArray,
Float64Array, Int32Array, Int64Array, NullArray, RecordBatch, RecordBatchOptions, RunArray,
StringArray,
};
use arrow_cast::cast;
use arrow_schema::{
DataType, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef,
DataType, Field, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SchemaRef,
};
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;

use crate::arrow::schema_to_arrow_schema;
use crate::metadata_columns::get_metadata_column_name;
use crate::spec::{Literal, PrimitiveLiteral, Schema as IcebergSchema};
use crate::{Error, ErrorKind, Result};

Expand Down Expand Up @@ -111,6 +113,9 @@ enum SchemaComparison {
pub(crate) struct RecordBatchTransformer {
snapshot_schema: Arc<IcebergSchema>,
projected_iceberg_field_ids: Vec<i32>,
// Pre-computed constant field information: field_id -> (arrow_type, value)
// Avoids duplicate lookups and type conversions during batch processing
constant_fields: HashMap<i32, (DataType, PrimitiveLiteral)>,

// BatchTransform gets lazily constructed based on the schema of
// the first RecordBatch we receive from the file
Expand All @@ -129,10 +134,23 @@ impl RecordBatchTransformer {
Self {
snapshot_schema,
projected_iceberg_field_ids,
constant_fields: HashMap::new(),
batch_transform: None,
}
}

/// Add a constant value for a specific field ID.
/// This is used for virtual/metadata fields like _file that have constant values per batch.
///
/// # Arguments
/// * `field_id` - The field ID to associate with the constant
/// * `value` - The constant value for this field
pub(crate) fn with_constant(mut self, field_id: i32, value: PrimitiveLiteral) -> Result<Self> {
let arrow_type = Self::primitive_literal_to_arrow_type(&value)?;
self.constant_fields.insert(field_id, (arrow_type, value));
Ok(self)
}

pub(crate) fn process_record_batch(
&mut self,
record_batch: RecordBatch,
Expand Down Expand Up @@ -167,6 +185,7 @@ impl RecordBatchTransformer {
record_batch.schema_ref(),
self.snapshot_schema.as_ref(),
&self.projected_iceberg_field_ids,
&self.constant_fields,
)?);

self.process_record_batch(record_batch)?
Expand All @@ -185,6 +204,7 @@ impl RecordBatchTransformer {
source_schema: &ArrowSchemaRef,
snapshot_schema: &IcebergSchema,
projected_iceberg_field_ids: &[i32],
constant_fields: &HashMap<i32, (DataType, PrimitiveLiteral)>,
) -> Result<BatchTransform> {
let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?);
let field_id_to_mapped_schema_map =
Expand All @@ -195,11 +215,25 @@ impl RecordBatchTransformer {
let fields: Result<Vec<_>> = projected_iceberg_field_ids
.iter()
.map(|field_id| {
Ok(field_id_to_mapped_schema_map
.get(field_id)
.ok_or(Error::new(ErrorKind::Unexpected, "field not found"))?
.0
.clone())
// Check if this is a constant/virtual field (pre-computed)
if let Some((arrow_type, _)) = constant_fields.get(field_id) {
// Create a field for the virtual column
let field_name = get_metadata_column_name(*field_id)?;
Ok(Arc::new(
Field::new(field_name, arrow_type.clone(), false).with_metadata(
HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
field_id.to_string(),
)]),
),
))
} else {
Ok(field_id_to_mapped_schema_map
.get(field_id)
.ok_or(Error::new(ErrorKind::Unexpected, "field not found"))?
.0
.clone())
}
})
.collect();

Expand All @@ -214,6 +248,7 @@ impl RecordBatchTransformer {
snapshot_schema,
projected_iceberg_field_ids,
field_id_to_mapped_schema_map,
constant_fields,
)?,
target_schema,
}),
Expand Down Expand Up @@ -270,11 +305,21 @@ impl RecordBatchTransformer {
snapshot_schema: &IcebergSchema,
projected_iceberg_field_ids: &[i32],
field_id_to_mapped_schema_map: HashMap<i32, (FieldRef, usize)>,
constant_fields: &HashMap<i32, (DataType, PrimitiveLiteral)>,
) -> Result<Vec<ColumnSource>> {
let field_id_to_source_schema_map =
Self::build_field_id_to_arrow_schema_map(source_schema)?;

projected_iceberg_field_ids.iter().map(|field_id|{
// Check if this is a constant/virtual field (pre-computed)
if let Some((arrow_type, value)) = constant_fields.get(field_id) {
// This is a virtual field - add it with the constant value
return Ok(ColumnSource::Add {
value: Some(value.clone()),
target_type: arrow_type.clone(),
});
}

let (target_field, _) = field_id_to_mapped_schema_map.get(field_id).ok_or(
Error::new(ErrorKind::Unexpected, "could not find field in schema")
)?;
Expand Down Expand Up @@ -429,6 +474,27 @@ impl RecordBatchTransformer {
let vals: Vec<Option<f64>> = vec![None; num_rows];
Arc::new(Float64Array::from(vals))
}
(DataType::RunEndEncoded(_, _), Some(PrimitiveLiteral::String(value))) => {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we in general encode constant columns as REE? Or should we make this custom per field? For the file path it definitely makes sense to run-end encode.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a bit random to me that only strings use REE in primitive_literal_to_arrow_type below. Yes, they might use the most memory otherwise, but other types also have similar kind of redundancy suitable for the REE.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is why I am pointing it out here. If the reviewers are fine with it, I'd go with REE everywhere.

// Create Run-End Encoded array for constant string values (e.g., file paths)
// This is more memory-efficient than repeating the same value for every row
let run_ends = if num_rows == 0 {
Int32Array::from(Vec::<i32>::new())
} else {
Int32Array::from(vec![num_rows as i32])
};
let values = if num_rows == 0 {
StringArray::from(Vec::<&str>::new())
} else {
StringArray::from(vec![value.as_str()])
};
Arc::new(RunArray::try_new(&run_ends, &values).map_err(|e| {
Error::new(
ErrorKind::Unexpected,
"Failed to create RunArray for constant string",
)
.with_source(e)
})?)
}
(DataType::Utf8, Some(PrimitiveLiteral::String(value))) => {
Arc::new(StringArray::from(vec![value.clone(); num_rows]))
}
Expand All @@ -452,6 +518,35 @@ impl RecordBatchTransformer {
}
})
}

/// Converts a PrimitiveLiteral to its corresponding Arrow DataType.
/// This is used for virtual fields to determine the Arrow type based on the constant value.
fn primitive_literal_to_arrow_type(literal: &PrimitiveLiteral) -> Result<DataType> {
Ok(match literal {
PrimitiveLiteral::Boolean(_) => DataType::Boolean,
PrimitiveLiteral::Int(_) => DataType::Int32,
PrimitiveLiteral::Long(_) => DataType::Int64,
PrimitiveLiteral::Float(_) => DataType::Float32,
PrimitiveLiteral::Double(_) => DataType::Float64,
PrimitiveLiteral::String(_) => {
// Use Run-End Encoding for constant strings (memory efficient)
let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int32, false));
// Note that this is nullable, as Arrow expects this when building the
// final Arrow schema with `RunArray::try_new`.
let values_field = Arc::new(Field::new("values", DataType::Utf8, true));
DataType::RunEndEncoded(run_ends_field, values_field)
}
PrimitiveLiteral::Binary(_) => DataType::Binary,
PrimitiveLiteral::Int128(_) => DataType::Decimal128(38, 0),
PrimitiveLiteral::UInt128(_) => DataType::Decimal128(38, 0),
PrimitiveLiteral::AboveMax | PrimitiveLiteral::BelowMin => {
return Err(Error::new(
ErrorKind::Unexpected,
"Cannot create arrow type for AboveMax/BelowMin literal",
));
}
})
}
}

#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,5 @@ mod utils;
pub mod writer;

mod delete_vector;
pub mod metadata_columns;
pub mod puffin;
97 changes: 97 additions & 0 deletions crates/iceberg/src/metadata_columns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Metadata columns (virtual/reserved fields) for Iceberg tables.
//!
//! This module defines metadata columns that can be requested in projections
//! but are not stored in data files. Instead, they are computed on-the-fly
//! during reading. Examples include the _file column (file path) and future
//! columns like partition values or row numbers.

use crate::{Error, ErrorKind, Result};

/// Reserved field ID for the file path (_file) column per Iceberg spec
pub const RESERVED_FIELD_ID_FILE: i32 = 2147483646;

/// Reserved column name for the file path metadata column
pub const RESERVED_COL_NAME_FILE: &str = "_file";

/// Returns the column name for a metadata field ID.
///
/// # Arguments
/// * `field_id` - The metadata field ID
///
/// # Returns
/// The name of the metadata column, or an error if the field ID is not recognized
pub fn get_metadata_column_name(field_id: i32) -> Result<&'static str> {
match field_id {
RESERVED_FIELD_ID_FILE => Ok(RESERVED_COL_NAME_FILE),
_ => {
if field_id > 2147483447 {
Err(Error::new(
ErrorKind::Unexpected,
format!("Unsupported metadata field ID: {field_id}"),
))
} else {
Err(Error::new(
ErrorKind::Unexpected,
format!("Field ID {field_id} is not a metadata field"),
))
}
}
}
}

/// Returns the field ID for a metadata column name.
///
/// # Arguments
/// * `column_name` - The metadata column name
///
/// # Returns
/// The field ID of the metadata column, or an error if the column name is not recognized
pub fn get_metadata_field_id(column_name: &str) -> Result<i32> {
match column_name {
RESERVED_COL_NAME_FILE => Ok(RESERVED_FIELD_ID_FILE),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wish that we could somehow reuse the mapping from get_metadata_column_name, to form some bidirectional 1-1 mapping. Perhaps some macro magic or something else?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible, but I don't think it pays off for a hand full of fields.

_ => Err(Error::new(
ErrorKind::Unexpected,
format!("Unknown/unsupported metadata column name: {column_name}"),
)),
}
}

/// Checks if a field ID is a metadata field.
///
/// # Arguments
/// * `field_id` - The field ID to check
///
/// # Returns
/// `true` if the field ID is a (currently supported) metadata field, `false` otherwise
pub fn is_metadata_field(field_id: i32) -> bool {
field_id == RESERVED_FIELD_ID_FILE
// Additional metadata fields can be checked here in the future
}

/// Checks if a column name is a metadata column.
///
/// # Arguments
/// * `column_name` - The column name to check
///
/// # Returns
/// `true` if the column name is a metadata column, `false` otherwise
pub fn is_metadata_column_name(column_name: &str) -> bool {
get_metadata_field_id(column_name).is_ok()
}
Loading
Loading