@@ -224,14 +224,12 @@ impl CachingDeleteFileLoader {
224224 let ( sender, receiver) = channel ( ) ;
225225 del_filter. insert_equality_delete ( & task. file_path , receiver) ;
226226
227+ // Equality deletes intentionally have partial schemas. Schema evolution would add
228+ // NULL values for missing REQUIRED columns, causing Arrow validation to fail.
227229 Ok ( DeleteFileContext :: FreshEqDel {
228- batch_stream : BasicDeleteFileLoader :: evolve_schema (
229- basic_delete_file_loader
230- . parquet_to_batch_stream ( & task. file_path )
231- . await ?,
232- schema,
233- )
234- . await ?,
230+ batch_stream : basic_delete_file_loader
231+ . parquet_to_batch_stream ( & task. file_path )
232+ . await ?,
235233 sender,
236234 equality_ids : HashSet :: from_iter ( task. equality_ids . clone ( ) . unwrap ( ) ) ,
237235 } )
@@ -687,9 +685,93 @@ mod tests {
687685 assert ! ( result. is_none( ) ) ; // no pos dels for file 3
688686 }
689687
690- /// Test loading a FileScanTask with both positional and equality deletes.
691- /// Verifies the fix for the inverted condition that caused "Missing predicate for equality
692- /// delete file" errors first encountered when running Iceberg Java's TestSparkExecutorCache
688+ /// Verifies that evolve_schema on partial-schema equality deletes fails with Arrow
689+ /// validation errors when missing REQUIRED columns are filled with NULLs.
690+ ///
691+ /// Reproduces the issue that caused 14 TestSparkReaderDeletes failures in Iceberg Java.
692+ #[ tokio:: test]
693+ async fn test_partial_schema_equality_deletes_evolve_fails ( ) {
694+ let tmp_dir = TempDir :: new ( ) . unwrap ( ) ;
695+ let table_location = tmp_dir. path ( ) . as_os_str ( ) . to_str ( ) . unwrap ( ) ;
696+
697+ // Create table schema with REQUIRED fields
698+ let table_schema = Arc :: new (
699+ Schema :: builder ( )
700+ . with_schema_id ( 1 )
701+ . with_fields ( vec ! [
702+ crate :: spec:: NestedField :: required(
703+ 1 ,
704+ "id" ,
705+ crate :: spec:: Type :: Primitive ( crate :: spec:: PrimitiveType :: Int ) ,
706+ )
707+ . into( ) ,
708+ crate :: spec:: NestedField :: required(
709+ 2 ,
710+ "data" ,
711+ crate :: spec:: Type :: Primitive ( crate :: spec:: PrimitiveType :: String ) ,
712+ )
713+ . into( ) ,
714+ ] )
715+ . build ( )
716+ . unwrap ( ) ,
717+ ) ;
718+
719+ // Write equality delete file with PARTIAL schema (only 'data' column)
720+ let delete_file_path = {
721+ let data_vals = vec ! [ "a" , "d" , "g" ] ;
722+ let data_col = Arc :: new ( StringArray :: from ( data_vals) ) as ArrayRef ;
723+
724+ let delete_schema = Arc :: new ( arrow_schema:: Schema :: new ( vec ! [ simple_field(
725+ "data" ,
726+ DataType :: Utf8 ,
727+ false ,
728+ "2" , // field ID
729+ ) ] ) ) ;
730+
731+ let delete_batch = RecordBatch :: try_new ( delete_schema. clone ( ) , vec ! [ data_col] ) . unwrap ( ) ;
732+
733+ let path = format ! ( "{}/partial-eq-deletes.parquet" , & table_location) ;
734+ let file = File :: create ( & path) . unwrap ( ) ;
735+ let props = WriterProperties :: builder ( )
736+ . set_compression ( Compression :: SNAPPY )
737+ . build ( ) ;
738+ let mut writer =
739+ ArrowWriter :: try_new ( file, delete_batch. schema ( ) , Some ( props) ) . unwrap ( ) ;
740+ writer. write ( & delete_batch) . expect ( "Writing batch" ) ;
741+ writer. close ( ) . unwrap ( ) ;
742+ path
743+ } ;
744+
745+ let file_io = FileIO :: from_path ( table_location) . unwrap ( ) . build ( ) . unwrap ( ) ;
746+ let basic_delete_file_loader = BasicDeleteFileLoader :: new ( file_io. clone ( ) ) ;
747+
748+ let batch_stream = basic_delete_file_loader
749+ . parquet_to_batch_stream ( & delete_file_path)
750+ . await
751+ . unwrap ( ) ;
752+
753+ let mut evolved_stream = BasicDeleteFileLoader :: evolve_schema ( batch_stream, table_schema)
754+ . await
755+ . unwrap ( ) ;
756+
757+ let result = evolved_stream. next ( ) . await . unwrap ( ) ;
758+
759+ assert ! (
760+ result. is_err( ) ,
761+ "Expected error from evolve_schema adding NULL to non-nullable column"
762+ ) ;
763+
764+ let err = result. unwrap_err ( ) ;
765+ let err_msg = err. to_string ( ) ;
766+ assert ! (
767+ err_msg. contains( "non-nullable" ) || err_msg. contains( "null values" ) ,
768+ "Expected null value error, got: {}" ,
769+ err_msg
770+ ) ;
771+ }
772+
773+ /// Test loading a FileScanTask with BOTH positional and equality deletes.
774+ /// Verifies the fix for the inverted condition that caused "Missing predicate for equality delete file" errors.
693775 #[ tokio:: test]
694776 async fn test_load_deletes_with_mixed_types ( ) {
695777 use crate :: scan:: FileScanTask ;
@@ -702,16 +784,28 @@ mod tests {
702784 . build ( )
703785 . unwrap ( ) ;
704786
787+ // Create the data file schema
705788 let data_file_schema = Arc :: new (
706789 Schema :: builder ( )
707790 . with_fields ( vec ! [
708- NestedField :: optional( 2 , "y" , Type :: Primitive ( PrimitiveType :: Long ) ) . into( ) ,
709- NestedField :: optional( 3 , "z" , Type :: Primitive ( PrimitiveType :: Long ) ) . into( ) ,
791+ crate :: spec:: NestedField :: optional(
792+ 2 ,
793+ "y" ,
794+ crate :: spec:: Type :: Primitive ( crate :: spec:: PrimitiveType :: Long ) ,
795+ )
796+ . into( ) ,
797+ crate :: spec:: NestedField :: optional(
798+ 3 ,
799+ "z" ,
800+ crate :: spec:: Type :: Primitive ( crate :: spec:: PrimitiveType :: Long ) ,
801+ )
802+ . into( ) ,
710803 ] )
711804 . build ( )
712805 . unwrap ( ) ,
713806 ) ;
714807
808+ // Write positional delete file
715809 let positional_delete_schema = crate :: arrow:: delete_filter:: tests:: create_pos_del_schema ( ) ;
716810 let file_path_values =
717811 vec ! [ format!( "{}/data-1.parquet" , table_location. to_str( ) . unwrap( ) ) ; 4 ] ;
@@ -740,6 +834,7 @@ mod tests {
740834 writer. write ( & positional_deletes_to_write) . unwrap ( ) ;
741835 writer. close ( ) . unwrap ( ) ;
742836
837+ // Write equality delete file
743838 let eq_delete_path = setup_write_equality_delete_file_1 ( table_location. to_str ( ) . unwrap ( ) ) ;
744839
745840 // Create FileScanTask with BOTH positional and equality deletes
@@ -754,7 +849,7 @@ mod tests {
754849 file_path : eq_delete_path. clone ( ) ,
755850 file_type : DataContentType :: EqualityDeletes ,
756851 partition_spec_id : 0 ,
757- equality_ids : Some ( vec ! [ 2 , 3 , 4 , 6 ] ) , // field IDs from the equality delete schema
852+ equality_ids : Some ( vec ! [ 2 , 3 ] ) , // Only use field IDs that exist in both schemas
758853 } ;
759854
760855 let file_scan_task = FileScanTask {
0 commit comments