Skip to content

Commit 77e7975

Browse files
committed
Merge branch 'partial_schema_eq_deletes' into df50
# Conflicts: # crates/iceberg/src/arrow/caching_delete_file_loader.rs
2 parents 147103b + b272671 commit 77e7975

File tree

1 file changed

+90
-7
lines changed

1 file changed

+90
-7
lines changed

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 90 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -224,14 +224,12 @@ impl CachingDeleteFileLoader {
224224
let (sender, receiver) = channel();
225225
del_filter.insert_equality_delete(&task.file_path, receiver);
226226

227+
// Equality deletes intentionally have partial schemas. Schema evolution would add
228+
// NULL values for missing REQUIRED columns, causing Arrow validation to fail.
227229
Ok(DeleteFileContext::FreshEqDel {
228-
batch_stream: BasicDeleteFileLoader::evolve_schema(
229-
basic_delete_file_loader
230-
.parquet_to_batch_stream(&task.file_path)
231-
.await?,
232-
schema,
233-
)
234-
.await?,
230+
batch_stream: basic_delete_file_loader
231+
.parquet_to_batch_stream(&task.file_path)
232+
.await?,
235233
sender,
236234
equality_ids: HashSet::from_iter(task.equality_ids.clone().unwrap()),
237235
})
@@ -687,6 +685,91 @@ mod tests {
687685
assert!(result.is_none()); // no pos dels for file 3
688686
}
689687

688+
/// Verifies that evolve_schema on partial-schema equality deletes fails with Arrow
689+
/// validation errors when missing REQUIRED columns are filled with NULLs.
690+
///
691+
/// Reproduces the issue that caused 14 TestSparkReaderDeletes failures in Iceberg Java.
692+
#[tokio::test]
693+
async fn test_partial_schema_equality_deletes_evolve_fails() {
694+
let tmp_dir = TempDir::new().unwrap();
695+
let table_location = tmp_dir.path().as_os_str().to_str().unwrap();
696+
697+
// Create table schema with REQUIRED fields
698+
let table_schema = Arc::new(
699+
Schema::builder()
700+
.with_schema_id(1)
701+
.with_fields(vec![
702+
crate::spec::NestedField::required(
703+
1,
704+
"id",
705+
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Int),
706+
)
707+
.into(),
708+
crate::spec::NestedField::required(
709+
2,
710+
"data",
711+
crate::spec::Type::Primitive(crate::spec::PrimitiveType::String),
712+
)
713+
.into(),
714+
])
715+
.build()
716+
.unwrap(),
717+
);
718+
719+
// Write equality delete file with PARTIAL schema (only 'data' column)
720+
let delete_file_path = {
721+
let data_vals = vec!["a", "d", "g"];
722+
let data_col = Arc::new(StringArray::from(data_vals)) as ArrayRef;
723+
724+
let delete_schema = Arc::new(arrow_schema::Schema::new(vec![simple_field(
725+
"data",
726+
DataType::Utf8,
727+
false,
728+
"2", // field ID
729+
)]));
730+
731+
let delete_batch = RecordBatch::try_new(delete_schema.clone(), vec![data_col]).unwrap();
732+
733+
let path = format!("{}/partial-eq-deletes.parquet", &table_location);
734+
let file = File::create(&path).unwrap();
735+
let props = WriterProperties::builder()
736+
.set_compression(Compression::SNAPPY)
737+
.build();
738+
let mut writer =
739+
ArrowWriter::try_new(file, delete_batch.schema(), Some(props)).unwrap();
740+
writer.write(&delete_batch).expect("Writing batch");
741+
writer.close().unwrap();
742+
path
743+
};
744+
745+
let file_io = FileIO::from_path(table_location).unwrap().build().unwrap();
746+
let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone());
747+
748+
let batch_stream = basic_delete_file_loader
749+
.parquet_to_batch_stream(&delete_file_path)
750+
.await
751+
.unwrap();
752+
753+
let mut evolved_stream = BasicDeleteFileLoader::evolve_schema(batch_stream, table_schema)
754+
.await
755+
.unwrap();
756+
757+
let result = evolved_stream.next().await.unwrap();
758+
759+
assert!(
760+
result.is_err(),
761+
"Expected error from evolve_schema adding NULL to non-nullable column"
762+
);
763+
764+
let err = result.unwrap_err();
765+
let err_msg = err.to_string();
766+
assert!(
767+
err_msg.contains("non-nullable") || err_msg.contains("null values"),
768+
"Expected null value error, got: {}",
769+
err_msg
770+
);
771+
}
772+
690773
/// Test loading a FileScanTask with BOTH positional and equality deletes.
691774
/// Verifies the fix for the inverted condition that caused "Missing predicate for equality delete file" errors.
692775
#[tokio::test]

0 commit comments

Comments
 (0)