From 1251891df56cb7c6a9b73250e89732c64f4d5d08 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 6 Nov 2025 00:00:10 -0800 Subject: [PATCH 1/9] initialize partition splitter in the constructor of task writer --- .../datafusion/src/task_writer.rs | 42 ++++++++----------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/crates/integrations/datafusion/src/task_writer.rs b/crates/integrations/datafusion/src/task_writer.rs index d27b2e6fbf..8ff658977a 100644 --- a/crates/integrations/datafusion/src/task_writer.rs +++ b/crates/integrations/datafusion/src/task_writer.rs @@ -34,7 +34,7 @@ use iceberg::writer::partitioning::unpartitioned_writer::UnpartitionedWriter; /// /// TaskWriter coordinates writing data to Iceberg tables by: /// - Selecting the appropriate partitioning strategy (unpartitioned, fanout, or clustered) -/// - Lazily initializing the partition splitter on first write +/// - Initializing the partition splitter in the constructor for partitioned tables /// - Routing data to the underlying writer /// - Collecting all written data files /// @@ -67,7 +67,7 @@ use iceberg::writer::partitioning::unpartitioned_writer::UnpartitionedWriter; pub(crate) struct TaskWriter { /// The underlying writer (UnpartitionedWriter, FanoutWriter, or ClusteredWriter) writer: SupportedWriter, - /// Lazily initialized partition splitter for partitioned tables + /// Partition splitter for partitioned tables (initialized in constructor) partition_splitter: Option, /// Iceberg schema reference schema: SchemaRef, @@ -139,9 +139,22 @@ impl TaskWriter { SupportedWriter::Clustered(ClusteredWriter::new(writer_builder)) }; + // Initialize partition splitter in constructor for partitioned tables + let partition_splitter = if !partition_spec.is_unpartitioned() { + Some( + RecordBatchPartitionSplitter::new_with_precomputed_values( + schema.clone(), + partition_spec.clone(), + ) + .expect("Failed to create partition splitter"), + ) + } else { + None + }; + Self { writer, - partition_splitter: None, + partition_splitter, schema, partition_spec, } @@ -149,7 +162,7 @@ impl TaskWriter { /// Write a RecordBatch to the TaskWriter. /// - /// For the first write to a partitioned table, this method initializes the partition splitter. + /// For partitioned tables, the partition splitter is already initialized in the constructor. /// For unpartitioned tables, data is written directly without splitting. /// /// # Parameters @@ -163,7 +176,6 @@ impl TaskWriter { /// # Errors /// /// This method will return an error if: - /// - Partition splitter initialization fails /// - Splitting the batch by partition fails /// - Writing to the underlying writer fails /// @@ -183,29 +195,9 @@ impl TaskWriter { writer.write(batch).await } SupportedWriter::Fanout(writer) => { - // Initialize splitter on first write if needed - if self.partition_splitter.is_none() { - self.partition_splitter = - Some(RecordBatchPartitionSplitter::new_with_precomputed_values( - self.schema.clone(), - self.partition_spec.clone(), - )?); - } - - // Split and write partitioned data Self::write_partitioned_batches(writer, &self.partition_splitter, &batch).await } SupportedWriter::Clustered(writer) => { - // Initialize splitter on first write if needed - if self.partition_splitter.is_none() { - self.partition_splitter = - Some(RecordBatchPartitionSplitter::new_with_precomputed_values( - self.schema.clone(), - self.partition_spec.clone(), - )?); - } - - // Split and write partitioned data Self::write_partitioned_batches(writer, &self.partition_splitter, &batch).await } } From 8745cea8d3cc7ec97c4d9839361b944406f01fa8 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 6 Nov 2025 00:13:20 -0800 Subject: [PATCH 2/9] project and repartition for insert into --- .../src/physical_plan/repartition.rs | 1 - .../datafusion/src/physical_plan/write.rs | 45 +++++++++---------- .../integrations/datafusion/src/table/mod.rs | 43 +++++++++++------- 3 files changed, 49 insertions(+), 40 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/repartition.rs b/crates/integrations/datafusion/src/physical_plan/repartition.rs index 95cdc8472e..8ad87fd1cc 100644 --- a/crates/integrations/datafusion/src/physical_plan/repartition.rs +++ b/crates/integrations/datafusion/src/physical_plan/repartition.rs @@ -86,7 +86,6 @@ use iceberg::spec::{TableMetadata, TableMetadataRef, Transform}; /// NonZeroUsize::new(4).unwrap(), /// )?; /// ``` -#[allow(dead_code)] pub(crate) fn repartition( input: Arc, table_metadata: TableMetadataRef, diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index b9d1f02d1f..4953182ca5 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -44,12 +44,12 @@ use iceberg::writer::file_writer::location_generator::{ DefaultFileNameGenerator, DefaultLocationGenerator, }; use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; -use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Error, ErrorKind}; use parquet::file::properties::WriterProperties; use uuid::Uuid; use crate::physical_plan::DATA_FILES_COL_NAME; +use crate::task_writer::TaskWriter; use crate::to_datafusion_error; /// An execution plan node that writes data to an Iceberg table. @@ -205,18 +205,6 @@ impl ExecutionPlan for IcebergWriteExec { partition: usize, context: Arc, ) -> DFResult { - if !self - .table - .metadata() - .default_partition_spec() - .is_unpartitioned() - { - // TODO add support for partitioned tables - return Err(DataFusionError::NotImplemented( - "IcebergWriteExec does not support partitioned tables yet".to_string(), - )); - } - let partition_type = self.table.metadata().default_partition_type().clone(); let format_version = self.table.metadata().format_version(); @@ -277,31 +265,40 @@ impl ExecutionPlan for IcebergWriteExec { ); let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder); + // Create TaskWriter + // TODO: Make fanout_enabled configurable via table properties + let fanout_enabled = true; + let schema = self.table.metadata().current_schema().clone(); + let partition_spec = self.table.metadata().default_partition_spec().clone(); + let task_writer = TaskWriter::new( + data_file_writer_builder, + fanout_enabled, + schema.clone(), + partition_spec, + ); + // Get input data let data = execute_input_stream( Arc::clone(&self.input), - Arc::new( - schema_to_arrow_schema(self.table.metadata().current_schema()) - .map_err(to_datafusion_error)?, - ), + Arc::new(schema_to_arrow_schema(&schema).map_err(to_datafusion_error)?), partition, Arc::clone(&context), )?; // Create write stream let stream = futures::stream::once(async move { - let mut writer = data_file_writer_builder - // todo specify partition key when partitioning writer is supported - .build(None) - .await - .map_err(to_datafusion_error)?; + let mut task_writer = task_writer; let mut input_stream = data; while let Some(batch) = input_stream.next().await { - writer.write(batch?).await.map_err(to_datafusion_error)?; + let batch = batch?; + task_writer + .write(batch) + .await + .map_err(to_datafusion_error)?; } - let data_files = writer.close().await.map_err(to_datafusion_error)?; + let data_files = task_writer.close().await.map_err(to_datafusion_error)?; // Convert builders to data files and then to JSON strings let data_files_strs: Vec = data_files diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index a8c49837c3..0b5a62fab2 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -19,6 +19,7 @@ pub mod metadata_table; pub mod table_provider_factory; use std::any::Any; +use std::num::NonZeroUsize; use std::sync::Arc; use async_trait::async_trait; @@ -38,6 +39,8 @@ use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; use metadata_table::IcebergMetadataTableProvider; use crate::physical_plan::commit::IcebergCommitExec; +use crate::physical_plan::project::project_with_partition; +use crate::physical_plan::repartition::repartition; use crate::physical_plan::scan::IcebergTableScan; use crate::physical_plan::write::IcebergWriteExec; @@ -170,32 +173,42 @@ impl TableProvider for IcebergTableProvider { async fn insert_into( &self, - _state: &dyn Session, + state: &dyn Session, input: Arc, _insert_op: InsertOp, ) -> DFResult> { - if !self - .table - .metadata() - .default_partition_spec() - .is_unpartitioned() - { - // TODO add insert into support for partitioned tables - return Err(DataFusionError::NotImplemented( - "IcebergTableProvider::insert_into does not support partitioned tables yet" - .to_string(), - )); - } - let Some(catalog) = self.catalog.clone() else { return Err(DataFusionError::Execution( "Catalog cannot be none for insert_into".to_string(), )); }; + let metadata = self.table.metadata(); + let partition_spec = metadata.default_partition_spec(); + + // Step 1: Project partition values for partitioned tables + let plan_with_partition = if !partition_spec.is_unpartitioned() { + project_with_partition(input, &self.table)? + } else { + input + }; + + // Step 2: Repartition for parallel processing + let target_partitions = NonZeroUsize::new(state.config().target_partitions()).ok_or_else(|| { + DataFusionError::Configuration( + "target_partitions must be greater than 0".to_string(), + ) + })?; + + let repartitioned_plan = repartition( + plan_with_partition, + self.table.metadata_ref(), + target_partitions, + )?; + let write_plan = Arc::new(IcebergWriteExec::new( self.table.clone(), - input, + repartitioned_plan, self.schema.clone(), )); From 9c571649f98aef597fd21bba58d648e32807e6e1 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 6 Nov 2025 00:22:50 -0800 Subject: [PATCH 3/9] minor --- crates/integrations/datafusion/src/table/mod.rs | 11 ++++++----- crates/integrations/datafusion/src/task_writer.rs | 3 --- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 0b5a62fab2..9f491d70f6 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -194,11 +194,12 @@ impl TableProvider for IcebergTableProvider { }; // Step 2: Repartition for parallel processing - let target_partitions = NonZeroUsize::new(state.config().target_partitions()).ok_or_else(|| { - DataFusionError::Configuration( - "target_partitions must be greater than 0".to_string(), - ) - })?; + let target_partitions = + NonZeroUsize::new(state.config().target_partitions()).ok_or_else(|| { + DataFusionError::Configuration( + "target_partitions must be greater than 0".to_string(), + ) + })?; let repartitioned_plan = repartition( plan_with_partition, diff --git a/crates/integrations/datafusion/src/task_writer.rs b/crates/integrations/datafusion/src/task_writer.rs index 8ff658977a..b4c21a6871 100644 --- a/crates/integrations/datafusion/src/task_writer.rs +++ b/crates/integrations/datafusion/src/task_writer.rs @@ -63,7 +63,6 @@ use iceberg::writer::partitioning::unpartitioned_writer::UnpartitionedWriter; /// // Close and get data files /// let data_files = task_writer.close().await?; /// ``` -#[allow(dead_code)] pub(crate) struct TaskWriter { /// The underlying writer (UnpartitionedWriter, FanoutWriter, or ClusteredWriter) writer: SupportedWriter, @@ -79,7 +78,6 @@ pub(crate) struct TaskWriter { /// /// This enum allows TaskWriter to work with different partitioning strategies /// while maintaining a unified interface. -#[allow(dead_code)] enum SupportedWriter { /// Writer for unpartitioned tables Unpartitioned(UnpartitionedWriter), @@ -89,7 +87,6 @@ enum SupportedWriter { Clustered(ClusteredWriter), } -#[allow(dead_code)] impl TaskWriter { /// Create a new TaskWriter. /// From 971747367daaa63c8baedc9c88cf92e8916866cf Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 6 Nov 2025 00:30:10 -0800 Subject: [PATCH 4/9] do not store schema and par spec in task writer --- crates/integrations/datafusion/src/task_writer.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/crates/integrations/datafusion/src/task_writer.rs b/crates/integrations/datafusion/src/task_writer.rs index b4c21a6871..d376766c50 100644 --- a/crates/integrations/datafusion/src/task_writer.rs +++ b/crates/integrations/datafusion/src/task_writer.rs @@ -68,10 +68,6 @@ pub(crate) struct TaskWriter { writer: SupportedWriter, /// Partition splitter for partitioned tables (initialized in constructor) partition_splitter: Option, - /// Iceberg schema reference - schema: SchemaRef, - /// Partition specification reference - partition_spec: PartitionSpecRef, } /// Internal enum to hold the different writer types. @@ -152,14 +148,13 @@ impl TaskWriter { Self { writer, partition_splitter, - schema, - partition_spec, } } /// Write a RecordBatch to the TaskWriter. /// - /// For partitioned tables, the partition splitter is already initialized in the constructor. + /// For partitioned tables, uses the partition splitter to split + /// the batch by partition key and route each partition to the underlying writer. /// For unpartitioned tables, data is written directly without splitting. /// /// # Parameters @@ -203,13 +198,13 @@ impl TaskWriter { /// Helper method to split and write partitioned data. /// /// This method handles the common logic for both FanoutWriter and ClusteredWriter: - /// - Splits the batch by partition key using the provided splitter - /// - Writes each partition to the underlying writer + /// - Splits the batch by partition key using the partition splitter + /// - Writes each partition to the underlying writer with its corresponding partition key /// /// # Parameters /// /// * `writer` - The underlying PartitioningWriter (FanoutWriter or ClusteredWriter) - /// * `partition_splitter` - The partition splitter (must be initialized) + /// * `partition_splitter` - The partition splitter /// * `batch` - The RecordBatch to write /// /// # Returns From 47f15edd03b6f379afcf072a32d017af9fa830dd Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 6 Nov 2025 07:17:44 -0800 Subject: [PATCH 5/9] add ut --- .../datafusion/src/physical_plan/write.rs | 4 +- .../tests/integration_datafusion_test.rs | 152 +++++++++++++++++- 2 files changed, 152 insertions(+), 4 deletions(-) diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 4953182ca5..c272fa8bb5 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -35,7 +35,7 @@ use datafusion::physical_plan::{ execute_input_stream, }; use futures::StreamExt; -use iceberg::arrow::{FieldMatchMode, schema_to_arrow_schema}; +use iceberg::arrow::FieldMatchMode; use iceberg::spec::{DataFileFormat, TableProperties, serialize_data_file_to_json}; use iceberg::table::Table; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; @@ -280,7 +280,7 @@ impl ExecutionPlan for IcebergWriteExec { // Get input data let data = execute_input_stream( Arc::clone(&self.input), - Arc::new(schema_to_arrow_schema(&schema).map_err(to_datafusion_error)?), + self.input.schema(), // input schema may have projected column `_partition` partition, Arc::clone(&context), )?; diff --git a/crates/integrations/datafusion/tests/integration_datafusion_test.rs b/crates/integrations/datafusion/tests/integration_datafusion_test.rs index cb4987a973..fdf5b17d18 100644 --- a/crates/integrations/datafusion/tests/integration_datafusion_test.rs +++ b/crates/integrations/datafusion/tests/integration_datafusion_test.rs @@ -27,9 +27,13 @@ use datafusion::execution::context::SessionContext; use datafusion::parquet::arrow::PARQUET_FIELD_ID_META_KEY; use expect_test::expect; use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; -use iceberg::spec::{NestedField, PrimitiveType, Schema, StructType, Type}; +use iceberg::spec::{ + NestedField, PrimitiveType, Schema, StructType, Transform, Type, UnboundPartitionSpec, +}; use iceberg::test_utils::check_record_batches; -use iceberg::{Catalog, CatalogBuilder, MemoryCatalog, NamespaceIdent, Result, TableCreation}; +use iceberg::{ + Catalog, CatalogBuilder, MemoryCatalog, NamespaceIdent, Result, TableCreation, TableIdent, +}; use iceberg_datafusion::IcebergCatalogProvider; use tempfile::TempDir; @@ -810,3 +814,147 @@ async fn test_insert_into_nested() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_insert_into_partitioned() -> Result<()> { + let iceberg_catalog = get_iceberg_catalog().await; + let namespace = NamespaceIdent::new("test_partitioned_write".to_string()); + set_test_namespace(&iceberg_catalog, &namespace).await?; + + // Create a schema with a partition column + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "category", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "value", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?; + + // Create partition spec with identity transform on category + let partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(0) + .add_partition_field(2, "category", Transform::Identity)? + .build(); + + // Create the partitioned table + let creation = TableCreation::builder() + .name("partitioned_table".to_string()) + .location(temp_path()) + .schema(schema) + .partition_spec(partition_spec) + .properties(HashMap::new()) + .build(); + + iceberg_catalog.create_table(&namespace, creation).await?; + + let client = Arc::new(iceberg_catalog); + let catalog = Arc::new(IcebergCatalogProvider::try_new(client.clone()).await?); + + let ctx = SessionContext::new(); + ctx.register_catalog("catalog", catalog); + + // Insert data with multiple partition values in a single batch + let df = ctx + .sql( + r#" + INSERT INTO catalog.test_partitioned_write.partitioned_table + VALUES + (1, 'electronics', 'laptop'), + (2, 'electronics', 'phone'), + (3, 'books', 'novel'), + (4, 'books', 'textbook'), + (5, 'clothing', 'shirt') + "#, + ) + .await + .unwrap(); + + let batches = df.collect().await.unwrap(); + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + let rows_inserted = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(rows_inserted.value(0), 5); + + // Refresh catalog to get updated table + let catalog = Arc::new(IcebergCatalogProvider::try_new(client.clone()).await?); + ctx.register_catalog("catalog", catalog); + + // Query the table to verify data + let df = ctx + .sql("SELECT * FROM catalog.test_partitioned_write.partitioned_table ORDER BY id") + .await + .unwrap(); + + let batches = df.collect().await.unwrap(); + + // Verify the data - note that _partition column should NOT be present + check_record_batches( + batches, + expect![[r#" + Field { name: "id", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} }, + Field { name: "category", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} }, + Field { name: "value", data_type: Utf8, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} }"#]], + expect![[r#" + id: PrimitiveArray + [ + 1, + 2, + 3, + 4, + 5, + ], + category: StringArray + [ + "electronics", + "electronics", + "books", + "books", + "clothing", + ], + value: StringArray + [ + "laptop", + "phone", + "novel", + "textbook", + "shirt", + ]"#]], + &[], + Some("id"), + ); + + // Verify that data files exist under correct partition paths + let table_ident = TableIdent::new(namespace.clone(), "partitioned_table".to_string()); + let table = client.load_table(&table_ident).await?; + let table_location = table.metadata().location(); + let file_io = table.file_io(); + + // List files under each expected partition path + let electronics_path = format!("{}/data/category=electronics", table_location); + let books_path = format!("{}/data/category=books", table_location); + let clothing_path = format!("{}/data/category=clothing", table_location); + + // Verify partition directories exist and contain data files + assert!( + file_io.exists(&electronics_path).await?, + "Expected partition directory: {}", + electronics_path + ); + assert!( + file_io.exists(&books_path).await?, + "Expected partition directory: {}", + books_path + ); + assert!( + file_io.exists(&clothing_path).await?, + "Expected partition directory: {}", + clothing_path + ); + + Ok(()) +} From 5e53c39b846327849c7edae01bbb218388561e36 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 6 Nov 2025 07:24:49 -0800 Subject: [PATCH 6/9] minor --- crates/integrations/datafusion/src/table/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 9f491d70f6..42a3baad3b 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -183,8 +183,7 @@ impl TableProvider for IcebergTableProvider { )); }; - let metadata = self.table.metadata(); - let partition_spec = metadata.default_partition_spec(); + let partition_spec = self.table.metadata().default_partition_spec(); // Step 1: Project partition values for partitioned tables let plan_with_partition = if !partition_spec.is_unpartitioned() { From 73697f8dac399396574f9467c7886b0166d766d3 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 6 Nov 2025 20:42:32 -0800 Subject: [PATCH 7/9] trigger CI, rest fixture issue? From c37edb48a249ac62442643f85a0db0dde7a56d7d Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Mon, 10 Nov 2025 22:42:13 -0800 Subject: [PATCH 8/9] handle error properly --- .../arrow/record_batch_partition_splitter.rs | 20 ++++++++++--------- .../datafusion/src/physical_plan/write.rs | 5 +++-- .../datafusion/src/task_writer.rs | 19 +++++++++--------- 3 files changed, 23 insertions(+), 21 deletions(-) diff --git a/crates/iceberg/src/arrow/record_batch_partition_splitter.rs b/crates/iceberg/src/arrow/record_batch_partition_splitter.rs index dcdd9c68ee..7b83621f2d 100644 --- a/crates/iceberg/src/arrow/record_batch_partition_splitter.rs +++ b/crates/iceberg/src/arrow/record_batch_partition_splitter.rs @@ -59,7 +59,7 @@ impl RecordBatchPartitionSplitter { /// # Returns /// /// Returns a new `RecordBatchPartitionSplitter` instance or an error if initialization fails. - pub fn new( + pub fn try_new( iceberg_schema: SchemaRef, partition_spec: PartitionSpecRef, calculator: Option, @@ -87,12 +87,12 @@ impl RecordBatchPartitionSplitter { /// # Returns /// /// Returns a new `RecordBatchPartitionSplitter` instance or an error if initialization fails. - pub fn new_with_computed_values( + pub fn try_new_with_computed_values( iceberg_schema: SchemaRef, partition_spec: PartitionSpecRef, ) -> Result { let calculator = PartitionValueCalculator::try_new(&partition_spec, &iceberg_schema)?; - Self::new(iceberg_schema, partition_spec, Some(calculator)) + Self::try_new(iceberg_schema, partition_spec, Some(calculator)) } /// Create a new RecordBatchPartitionSplitter expecting pre-computed partition values. @@ -108,11 +108,11 @@ impl RecordBatchPartitionSplitter { /// # Returns /// /// Returns a new `RecordBatchPartitionSplitter` instance or an error if initialization fails. - pub fn new_with_precomputed_values( + pub fn try_new_with_precomputed_values( iceberg_schema: SchemaRef, partition_spec: PartitionSpecRef, ) -> Result { - Self::new(iceberg_schema, partition_spec, None) + Self::try_new(iceberg_schema, partition_spec, None) } /// Split the record batch into multiple record batches based on the partition spec. @@ -261,9 +261,11 @@ mod tests { .build() .unwrap(), ); - let partition_splitter = - RecordBatchPartitionSplitter::new_with_computed_values(schema.clone(), partition_spec) - .expect("Failed to create splitter"); + let partition_splitter = RecordBatchPartitionSplitter::try_new_with_computed_values( + schema.clone(), + partition_spec, + ) + .expect("Failed to create splitter"); let arrow_schema = Arc::new(schema_to_arrow_schema(&schema).unwrap()); let id_array = Int32Array::from(vec![1, 2, 1, 3, 2, 3, 1]); @@ -392,7 +394,7 @@ mod tests { ])); // Create splitter expecting pre-computed partition column - let partition_splitter = RecordBatchPartitionSplitter::new_with_precomputed_values( + let partition_splitter = RecordBatchPartitionSplitter::try_new_with_precomputed_values( schema.clone(), partition_spec, ) diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index c272fa8bb5..9eb53c235f 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -270,12 +270,13 @@ impl ExecutionPlan for IcebergWriteExec { let fanout_enabled = true; let schema = self.table.metadata().current_schema().clone(); let partition_spec = self.table.metadata().default_partition_spec().clone(); - let task_writer = TaskWriter::new( + let task_writer = TaskWriter::try_new( data_file_writer_builder, fanout_enabled, schema.clone(), partition_spec, - ); + ) + .map_err(to_datafusion_error)?; // Get input data let data = execute_input_stream( diff --git a/crates/integrations/datafusion/src/task_writer.rs b/crates/integrations/datafusion/src/task_writer.rs index d376766c50..5329f26458 100644 --- a/crates/integrations/datafusion/src/task_writer.rs +++ b/crates/integrations/datafusion/src/task_writer.rs @@ -118,12 +118,12 @@ impl TaskWriter { /// partition_spec, /// ); /// ``` - pub fn new( + pub fn try_new( writer_builder: B, fanout_enabled: bool, schema: SchemaRef, partition_spec: PartitionSpecRef, - ) -> Self { + ) -> Result { let writer = if partition_spec.is_unpartitioned() { SupportedWriter::Unpartitioned(UnpartitionedWriter::new(writer_builder)) } else if fanout_enabled { @@ -135,20 +135,19 @@ impl TaskWriter { // Initialize partition splitter in constructor for partitioned tables let partition_splitter = if !partition_spec.is_unpartitioned() { Some( - RecordBatchPartitionSplitter::new_with_precomputed_values( + RecordBatchPartitionSplitter::try_new_with_precomputed_values( schema.clone(), partition_spec.clone(), - ) - .expect("Failed to create partition splitter"), + )?, ) } else { None }; - Self { + Ok(Self { writer, partition_splitter, - } + }) } /// Write a RecordBatch to the TaskWriter. @@ -377,7 +376,7 @@ mod tests { let partition_spec = Arc::new(PartitionSpec::builder(schema.clone()).build()?); let writer_builder = create_writer_builder(&temp_dir, schema.clone())?; - let mut task_writer = TaskWriter::new(writer_builder, false, schema, partition_spec); + let mut task_writer = TaskWriter::try_new(writer_builder, false, schema, partition_spec)?; // Write data let batch = RecordBatch::try_new(arrow_schema, vec![ @@ -443,7 +442,7 @@ mod tests { ); let writer_builder = create_writer_builder(&temp_dir, schema.clone())?; - let mut task_writer = TaskWriter::new(writer_builder, true, schema, partition_spec); + let mut task_writer = TaskWriter::try_new(writer_builder, true, schema, partition_spec)?; // Create partition column let partition_field = Field::new("region", DataType::Utf8, false).with_metadata( @@ -486,7 +485,7 @@ mod tests { ); let writer_builder = create_writer_builder(&temp_dir, schema.clone())?; - let mut task_writer = TaskWriter::new(writer_builder, false, schema, partition_spec); + let mut task_writer = TaskWriter::try_new(writer_builder, false, schema, partition_spec)?; // Create partition column let partition_field = Field::new("region", DataType::Utf8, false).with_metadata( From 2f2584b35325d4f9bf90800276fb1986baabd416 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 11 Nov 2025 04:39:41 -0800 Subject: [PATCH 9/9] trigger CI