Skip to content

Commit 856597b

Browse files
authored
feat(writer): Add clustered and fanout writer (apache#1735)
## Which issue does this PR close? - Closes apache#1572 apache#1573 ## What changes are included in this PR? New: - Added new `partitioning` module with `PartitioningWriter` trait - `ClusteredWriter`: Optimized for pre-sorted data, requires writing in partition order - `FanoutWriter`: Flexible writer that can handle data from any partition at any time Modification: - (BREAKING) Modified `DataFileWriterBuilder` to support dynamic partition assignment - Updated DataFusion integration to use the new writer API ## Are these changes tested? Added unit tests
1 parent 273991e commit 856597b

File tree

12 files changed

+1024
-84
lines changed

12 files changed

+1024
-84
lines changed

crates/iceberg/src/writer/base_writer/data_file_writer.rs

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use crate::{Error, ErrorKind, Result};
3030
#[derive(Clone, Debug)]
3131
pub struct DataFileWriterBuilder<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator> {
3232
inner: RollingFileWriterBuilder<B, L, F>,
33-
partition_key: Option<PartitionKey>,
3433
}
3534

3635
impl<B, L, F> DataFileWriterBuilder<B, L, F>
@@ -40,14 +39,8 @@ where
4039
F: FileNameGenerator,
4140
{
4241
/// Create a new `DataFileWriterBuilder` using a `RollingFileWriterBuilder`.
43-
pub fn new(
44-
inner_builder: RollingFileWriterBuilder<B, L, F>,
45-
partition_key: Option<PartitionKey>,
46-
) -> Self {
47-
Self {
48-
inner: inner_builder,
49-
partition_key,
50-
}
42+
pub fn new(inner: RollingFileWriterBuilder<B, L, F>) -> Self {
43+
Self { inner }
5144
}
5245
}
5346

@@ -60,10 +53,10 @@ where
6053
{
6154
type R = DataFileWriter<B, L, F>;
6255

63-
async fn build(self) -> Result<Self::R> {
56+
async fn build(self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
6457
Ok(DataFileWriter {
6558
inner: Some(self.inner.clone().build()),
66-
partition_key: self.partition_key,
59+
partition_key,
6760
})
6861
}
6962
}
@@ -194,8 +187,8 @@ mod test {
194187
file_name_gen,
195188
);
196189

197-
let mut data_file_writer = DataFileWriterBuilder::new(rolling_file_writer_builder, None)
198-
.build()
190+
let mut data_file_writer = DataFileWriterBuilder::new(rolling_file_writer_builder)
191+
.build(None)
199192
.await
200193
.unwrap();
201194

@@ -280,10 +273,9 @@ mod test {
280273
file_name_gen,
281274
);
282275

283-
let mut data_file_writer =
284-
DataFileWriterBuilder::new(rolling_file_writer_builder, Some(partition_key))
285-
.build()
286-
.await?;
276+
let mut data_file_writer = DataFileWriterBuilder::new(rolling_file_writer_builder)
277+
.build(Some(partition_key))
278+
.await?;
287279

288280
let arrow_schema = arrow_schema::Schema::new(vec![
289281
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(

crates/iceberg/src/writer/base_writer/equality_delete_writer.rs

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -66,16 +66,11 @@ pub struct EqualityDeleteWriterConfig {
6666
equality_ids: Vec<i32>,
6767
// Projector used to project the data chunk into specific fields.
6868
projector: RecordBatchProjector,
69-
partition_key: Option<PartitionKey>,
7069
}
7170

7271
impl EqualityDeleteWriterConfig {
7372
/// Create a new `DataFileWriterConfig` with equality ids.
74-
pub fn new(
75-
equality_ids: Vec<i32>,
76-
original_schema: SchemaRef,
77-
partition_key: Option<PartitionKey>,
78-
) -> Result<Self> {
73+
pub fn new(equality_ids: Vec<i32>, original_schema: SchemaRef) -> Result<Self> {
7974
let original_arrow_schema = Arc::new(schema_to_arrow_schema(&original_schema)?);
8075
let projector = RecordBatchProjector::new(
8176
original_arrow_schema,
@@ -110,7 +105,6 @@ impl EqualityDeleteWriterConfig {
110105
Ok(Self {
111106
equality_ids,
112107
projector,
113-
partition_key,
114108
})
115109
}
116110

@@ -129,12 +123,12 @@ where
129123
{
130124
type R = EqualityDeleteFileWriter<B, L, F>;
131125

132-
async fn build(self) -> Result<Self::R> {
126+
async fn build(self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
133127
Ok(EqualityDeleteFileWriter {
134-
inner: Some(self.inner.clone().build()), // todo revisit this, probably still need a builder for rolling writer
128+
inner: Some(self.inner.clone().build()),
135129
projector: self.config.projector,
136130
equality_ids: self.config.equality_ids,
137-
partition_key: self.config.partition_key,
131+
partition_key,
138132
})
139133
}
140134
}
@@ -428,7 +422,7 @@ mod test {
428422

429423
let equality_ids = vec![0_i32, 8];
430424
let equality_config =
431-
EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap();
425+
EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema)).unwrap();
432426
let delete_schema =
433427
arrow_schema_to_schema(equality_config.projected_arrow_schema_ref()).unwrap();
434428
let projector = equality_config.projector.clone();
@@ -444,7 +438,7 @@ mod test {
444438
);
445439
let mut equality_delete_writer =
446440
EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, equality_config)
447-
.build()
441+
.build(None)
448442
.await?;
449443

450444
// write
@@ -531,19 +525,19 @@ mod test {
531525
.unwrap(),
532526
);
533527
// Float and Double are not allowed to be used for equality delete
534-
assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone(), None).is_err());
535-
assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone(), None).is_err());
528+
assert!(EqualityDeleteWriterConfig::new(vec![0], schema.clone()).is_err());
529+
assert!(EqualityDeleteWriterConfig::new(vec![1], schema.clone()).is_err());
536530
// Struct is not allowed to be used for equality delete
537-
assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone(), None).is_err());
531+
assert!(EqualityDeleteWriterConfig::new(vec![3], schema.clone()).is_err());
538532
// Nested field of struct is allowed to be used for equality delete
539-
assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone(), None).is_ok());
533+
assert!(EqualityDeleteWriterConfig::new(vec![4], schema.clone()).is_ok());
540534
// Nested field of map is not allowed to be used for equality delete
541-
assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone(), None).is_err());
542-
assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone(), None).is_err());
543-
assert!(EqualityDeleteWriterConfig::new(vec![9], schema.clone(), None).is_err());
535+
assert!(EqualityDeleteWriterConfig::new(vec![7], schema.clone()).is_err());
536+
assert!(EqualityDeleteWriterConfig::new(vec![8], schema.clone()).is_err());
537+
assert!(EqualityDeleteWriterConfig::new(vec![9], schema.clone()).is_err());
544538
// Nested field of list is not allowed to be used for equality delete
545-
assert!(EqualityDeleteWriterConfig::new(vec![10], schema.clone(), None).is_err());
546-
assert!(EqualityDeleteWriterConfig::new(vec![11], schema.clone(), None).is_err());
539+
assert!(EqualityDeleteWriterConfig::new(vec![10], schema.clone()).is_err());
540+
assert!(EqualityDeleteWriterConfig::new(vec![11], schema.clone()).is_err());
547541

548542
Ok(())
549543
}
@@ -597,7 +591,7 @@ mod test {
597591
.unwrap(),
598592
);
599593
let equality_ids = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13];
600-
let config = EqualityDeleteWriterConfig::new(equality_ids, schema.clone(), None).unwrap();
594+
let config = EqualityDeleteWriterConfig::new(equality_ids, schema.clone()).unwrap();
601595
let delete_arrow_schema = config.projected_arrow_schema_ref().clone();
602596
let delete_schema = arrow_schema_to_schema(&delete_arrow_schema).unwrap();
603597

@@ -611,7 +605,7 @@ mod test {
611605
);
612606
let mut equality_delete_writer =
613607
EqualityDeleteFileWriterBuilder::new(rolling_writer_builder, config)
614-
.build()
608+
.build(None)
615609
.await?;
616610

617611
// prepare data
@@ -795,7 +789,7 @@ mod test {
795789
let to_write = RecordBatch::try_new(arrow_schema.clone(), columns).unwrap();
796790
let equality_ids = vec![0_i32, 2, 5];
797791
let equality_config =
798-
EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema), None).unwrap();
792+
EqualityDeleteWriterConfig::new(equality_ids, Arc::new(schema)).unwrap();
799793
let projector = equality_config.projector.clone();
800794

801795
// check

crates/iceberg/src/writer/file_writer/rolling_writer.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -329,11 +329,10 @@ mod tests {
329329
file_name_gen,
330330
);
331331

332-
let data_file_writer_builder =
333-
DataFileWriterBuilder::new(rolling_file_writer_builder, None);
332+
let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder);
334333

335334
// Create writer
336-
let mut writer = data_file_writer_builder.build().await?;
335+
let mut writer = data_file_writer_builder.build(None).await?;
337336

338337
// Create test data
339338
let arrow_schema = make_test_arrow_schema();
@@ -388,10 +387,10 @@ mod tests {
388387
file_name_gen,
389388
);
390389

391-
let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder, None);
390+
let data_file_writer_builder = DataFileWriterBuilder::new(rolling_writer_builder);
392391

393392
// Create writer
394-
let mut writer = data_file_writer_builder.build().await?;
393+
let mut writer = data_file_writer_builder.build(None).await?;
395394

396395
// Create test data
397396
let arrow_schema = make_test_arrow_schema();

crates/iceberg/src/writer/mod.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,9 @@
100100
//! );
101101
//!
102102
//! // Create a data file writer using parquet file writer builder.
103-
//! let data_file_writer_builder =
104-
//! DataFileWriterBuilder::new(rolling_file_writer_builder, None);
103+
//! let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder);
105104
//! // Build the data file writer
106-
//! let mut data_file_writer = data_file_writer_builder.build().await?;
105+
//! let mut data_file_writer = data_file_writer_builder.build(None).await?;
107106
//!
108107
//! // Write the data using data_file_writer...
109108
//!
@@ -122,7 +121,7 @@
122121
//! use arrow_array::RecordBatch;
123122
//! use iceberg::io::FileIOBuilder;
124123
//! use iceberg::memory::MemoryCatalogBuilder;
125-
//! use iceberg::spec::DataFile;
124+
//! use iceberg::spec::{DataFile, PartitionKey};
126125
//! use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
127126
//! use iceberg::writer::file_writer::ParquetWriterBuilder;
128127
//! use iceberg::writer::file_writer::location_generator::{
@@ -149,9 +148,9 @@
149148
//! impl<B: IcebergWriterBuilder> IcebergWriterBuilder for LatencyRecordWriterBuilder<B> {
150149
//! type R = LatencyRecordWriter<B::R>;
151150
//!
152-
//! async fn build(self) -> Result<Self::R> {
151+
//! async fn build(self, partition_key: Option<PartitionKey>) -> Result<Self::R> {
153152
//! Ok(LatencyRecordWriter {
154-
//! inner_writer: self.inner_writer_builder.build().await?,
153+
//! inner_writer: self.inner_writer_builder.build(partition_key).await?,
155154
//! })
156155
//! }
157156
//! }
@@ -231,24 +230,29 @@
231230
//! );
232231
//!
233232
//! // Create a data file writer builder using rolling file writer.
234-
//! let data_file_writer_builder =
235-
//! DataFileWriterBuilder::new(rolling_file_writer_builder, Some(partition_key));
233+
//! let data_file_writer_builder = DataFileWriterBuilder::new(rolling_file_writer_builder);
236234
//! // Create latency record writer using data file writer builder.
237235
//! let latency_record_builder = LatencyRecordWriterBuilder::new(data_file_writer_builder);
238236
//! // Build the final writer
239-
//! let mut latency_record_data_file_writer = latency_record_builder.build().await.unwrap();
237+
//! let mut latency_record_data_file_writer = latency_record_builder
238+
//! .build(Some(partition_key))
239+
//! .await
240+
//! .unwrap();
240241
//!
241242
//! Ok(())
242243
//! }
243244
//! ```
244245
245246
pub mod base_writer;
246247
pub mod file_writer;
248+
/// Provides partition-aware writers
249+
/// TODO examples
250+
pub mod partitioning;
247251

248252
use arrow_array::RecordBatch;
249253

250254
use crate::Result;
251-
use crate::spec::DataFile;
255+
use crate::spec::{DataFile, PartitionKey};
252256

253257
type DefaultInput = RecordBatch;
254258
type DefaultOutput = Vec<DataFile>;
@@ -260,8 +264,8 @@ pub trait IcebergWriterBuilder<I = DefaultInput, O = DefaultOutput>:
260264
{
261265
/// The associated writer type.
262266
type R: IcebergWriter<I, O>;
263-
/// Build the iceberg writer.
264-
async fn build(self) -> Result<Self::R>;
267+
/// Build the iceberg writer with an optional partition key.
268+
async fn build(self, partition_key: Option<PartitionKey>) -> Result<Self::R>;
265269
}
266270

267271
/// The iceberg writer used to write data to iceberg table.

0 commit comments

Comments
 (0)