@@ -42,7 +42,7 @@ enum DeleteFileIndexState {
4242#[ derive( Debug ) ]
4343struct PopulatedDeleteFileIndex {
4444 #[ allow( dead_code) ]
45- global_deletes : Vec < Arc < DeleteFileContext > > ,
45+ global_equality_deletes : Vec < Arc < DeleteFileContext > > ,
4646 eq_deletes_by_partition : HashMap < Struct , Vec < Arc < DeleteFileContext > > > ,
4747 pos_deletes_by_partition : HashMap < Struct , Vec < Arc < DeleteFileContext > > > ,
4848 // TODO: do we need this?
@@ -65,7 +65,8 @@ impl DeleteFileIndex {
6565 spawn ( {
6666 let state = state. clone ( ) ;
6767 async move {
68- let delete_files = delete_file_stream. collect :: < Vec < _ > > ( ) . await ;
68+ let delete_files: Vec < DeleteFileContext > =
69+ delete_file_stream. collect :: < Vec < _ > > ( ) . await ;
6970
7071 let populated_delete_file_index = PopulatedDeleteFileIndex :: new ( delete_files) ;
7172
@@ -114,15 +115,15 @@ impl PopulatedDeleteFileIndex {
114115 ///
115116 /// 1. The partition information is extracted from each delete file's manifest entry.
116117 /// 2. If the partition is empty and the delete file is not a positional delete,
117- /// it is added to the `global_deletes ` vector
118+ /// it is added to the `global_equality_deletes ` vector
118119 /// 3. Otherwise, the delete file is added to one of two hash maps based on its content type.
119120 fn new ( files : Vec < DeleteFileContext > ) -> PopulatedDeleteFileIndex {
120121 let mut eq_deletes_by_partition: HashMap < Struct , Vec < Arc < DeleteFileContext > > > =
121122 HashMap :: default ( ) ;
122123 let mut pos_deletes_by_partition: HashMap < Struct , Vec < Arc < DeleteFileContext > > > =
123124 HashMap :: default ( ) ;
124125
125- let mut global_deletes : Vec < Arc < DeleteFileContext > > = vec ! [ ] ;
126+ let mut global_equality_deletes : Vec < Arc < DeleteFileContext > > = vec ! [ ] ;
126127
127128 files. into_iter ( ) . for_each ( |ctx| {
128129 let arc_ctx = Arc :: new ( ctx) ;
@@ -133,7 +134,7 @@ impl PopulatedDeleteFileIndex {
133134 if partition. fields ( ) . is_empty ( ) {
134135 // TODO: confirm we're good to skip here if we encounter a pos del
135136 if arc_ctx. manifest_entry . content_type ( ) != DataContentType :: PositionDeletes {
136- global_deletes . push ( arc_ctx) ;
137+ global_equality_deletes . push ( arc_ctx) ;
137138 return ;
138139 }
139140 }
@@ -153,7 +154,7 @@ impl PopulatedDeleteFileIndex {
153154 } ) ;
154155
155156 PopulatedDeleteFileIndex {
156- global_deletes ,
157+ global_equality_deletes ,
157158 eq_deletes_by_partition,
158159 pos_deletes_by_partition,
159160 }
@@ -167,12 +168,12 @@ impl PopulatedDeleteFileIndex {
167168 ) -> Vec < FileScanTaskDeleteFile > {
168169 let mut results = vec ! [ ] ;
169170
170- self . global_deletes
171+ self . global_equality_deletes
171172 . iter ( )
172- // filter that returns true if the provided delete file's sequence number is **greater than or equal to ** `seq_num`
173+ // filter that returns true if the provided delete file's sequence number is **greater than** `seq_num`
173174 . filter ( |& delete| {
174175 seq_num
175- . map ( |seq_num| delete. manifest_entry . sequence_number ( ) >= Some ( seq_num) )
176+ . map ( |seq_num| delete. manifest_entry . sequence_number ( ) > Some ( seq_num) )
176177 . unwrap_or_else ( || true )
177178 } )
178179 . for_each ( |delete| results. push ( delete. as_ref ( ) . into ( ) ) ) ;
@@ -185,6 +186,7 @@ impl PopulatedDeleteFileIndex {
185186 seq_num
186187 . map ( |seq_num| delete. manifest_entry . sequence_number ( ) > Some ( seq_num) )
187188 . unwrap_or_else ( || true )
189+ && data_file. partition_spec_id == delete. partition_spec_id
188190 } )
189191 . for_each ( |delete| results. push ( delete. as_ref ( ) . into ( ) ) ) ;
190192 }
@@ -201,10 +203,282 @@ impl PopulatedDeleteFileIndex {
201203 seq_num
202204 . map ( |seq_num| delete. manifest_entry . sequence_number ( ) >= Some ( seq_num) )
203205 . unwrap_or_else ( || true )
206+ && data_file. partition_spec_id == delete. partition_spec_id
204207 } )
205208 . for_each ( |delete| results. push ( delete. as_ref ( ) . into ( ) ) ) ;
206209 }
207210
208211 results
209212 }
210213}
214+
215+ #[ cfg( test) ]
216+ mod tests {
217+ use uuid:: Uuid ;
218+
219+ use super :: * ;
220+ use crate :: spec:: {
221+ DataContentType , DataFileBuilder , DataFileFormat , Literal , ManifestEntry , ManifestStatus ,
222+ Struct ,
223+ } ;
224+
225+ #[ test]
226+ fn test_delete_file_index_unpartitioned ( ) {
227+ let deletes: Vec < ManifestEntry > = vec ! [
228+ build_added_manifest_entry( 4 , & build_unpartitioned_eq_delete( ) ) ,
229+ build_added_manifest_entry( 6 , & build_unpartitioned_eq_delete( ) ) ,
230+ build_added_manifest_entry( 5 , & build_unpartitioned_pos_delete( ) ) ,
231+ build_added_manifest_entry( 6 , & build_unpartitioned_pos_delete( ) ) ,
232+ ] ;
233+
234+ let delete_file_paths: Vec < String > = deletes
235+ . iter ( )
236+ . map ( |file| file. file_path ( ) . to_string ( ) )
237+ . collect ( ) ;
238+
239+ let delete_contexts: Vec < DeleteFileContext > = deletes
240+ . into_iter ( )
241+ . map ( |entry| DeleteFileContext {
242+ manifest_entry : entry. into ( ) ,
243+ partition_spec_id : 0 ,
244+ } )
245+ . collect ( ) ;
246+
247+ let delete_file_index = PopulatedDeleteFileIndex :: new ( delete_contexts) ;
248+
249+ let data_file = build_unpartitioned_data_file ( ) ;
250+
251+ // All deletes apply to sequence 0
252+ let delete_files_to_apply_for_seq_0 =
253+ delete_file_index. get_deletes_for_data_file ( & data_file, Some ( 0 ) ) ;
254+ assert_eq ! ( delete_files_to_apply_for_seq_0. len( ) , 4 ) ;
255+
256+ // All deletes apply to sequence 3
257+ let delete_files_to_apply_for_seq_3 =
258+ delete_file_index. get_deletes_for_data_file ( & data_file, Some ( 3 ) ) ;
259+ assert_eq ! ( delete_files_to_apply_for_seq_3. len( ) , 4 ) ;
260+
261+ // Last 3 deletes apply to sequence 4
262+ let delete_files_to_apply_for_seq_4 =
263+ delete_file_index. get_deletes_for_data_file ( & data_file, Some ( 4 ) ) ;
264+ let actual_paths_to_apply_for_seq_4: Vec < String > = delete_files_to_apply_for_seq_4
265+ . into_iter ( )
266+ . map ( |file| file. file_path )
267+ . collect ( ) ;
268+
269+ assert_eq ! (
270+ actual_paths_to_apply_for_seq_4,
271+ delete_file_paths[ delete_file_paths. len( ) - 3 ..]
272+ ) ;
273+
274+ // Last 3 deletes apply to sequence 5
275+ let delete_files_to_apply_for_seq_5 =
276+ delete_file_index. get_deletes_for_data_file ( & data_file, Some ( 5 ) ) ;
277+ let actual_paths_to_apply_for_seq_5: Vec < String > = delete_files_to_apply_for_seq_5
278+ . into_iter ( )
279+ . map ( |file| file. file_path )
280+ . collect ( ) ;
281+ assert_eq ! (
282+ actual_paths_to_apply_for_seq_5,
283+ delete_file_paths[ delete_file_paths. len( ) - 3 ..]
284+ ) ;
285+
286+ // Only the last position delete applies to sequence 6
287+ let delete_files_to_apply_for_seq_6 =
288+ delete_file_index. get_deletes_for_data_file ( & data_file, Some ( 6 ) ) ;
289+ let actual_paths_to_apply_for_seq_6: Vec < String > = delete_files_to_apply_for_seq_6
290+ . into_iter ( )
291+ . map ( |file| file. file_path )
292+ . collect ( ) ;
293+ assert_eq ! (
294+ actual_paths_to_apply_for_seq_6,
295+ delete_file_paths[ delete_file_paths. len( ) - 1 ..]
296+ ) ;
297+
298+ // The 2 global equality deletes should match against any partitioned file
299+ let partitioned_file =
300+ build_partitioned_data_file ( & Struct :: from_iter ( [ Some ( Literal :: long ( 100 ) ) ] ) , 1 ) ;
301+
302+ let delete_files_to_apply_for_partitioned_file =
303+ delete_file_index. get_deletes_for_data_file ( & partitioned_file, Some ( 0 ) ) ;
304+ let actual_paths_to_apply_for_partitioned_file: Vec < String > =
305+ delete_files_to_apply_for_partitioned_file
306+ . into_iter ( )
307+ . map ( |file| file. file_path )
308+ . collect ( ) ;
309+ assert_eq ! (
310+ actual_paths_to_apply_for_partitioned_file,
311+ delete_file_paths[ ..2 ]
312+ ) ;
313+ }
314+
315+ #[ test]
316+ fn test_delete_file_index_partitioned ( ) {
317+ let partition_one = Struct :: from_iter ( [ Some ( Literal :: long ( 100 ) ) ] ) ;
318+ let spec_id = 1 ;
319+ let deletes: Vec < ManifestEntry > = vec ! [
320+ build_added_manifest_entry( 4 , & build_partitioned_eq_delete( & partition_one, spec_id) ) ,
321+ build_added_manifest_entry( 6 , & build_partitioned_eq_delete( & partition_one, spec_id) ) ,
322+ build_added_manifest_entry( 5 , & build_partitioned_pos_delete( & partition_one, spec_id) ) ,
323+ build_added_manifest_entry( 6 , & build_partitioned_pos_delete( & partition_one, spec_id) ) ,
324+ ] ;
325+
326+ let delete_file_paths: Vec < String > = deletes
327+ . iter ( )
328+ . map ( |file| file. file_path ( ) . to_string ( ) )
329+ . collect ( ) ;
330+
331+ let delete_contexts: Vec < DeleteFileContext > = deletes
332+ . into_iter ( )
333+ . map ( |entry| DeleteFileContext {
334+ manifest_entry : entry. into ( ) ,
335+ partition_spec_id : spec_id,
336+ } )
337+ . collect ( ) ;
338+
339+ let delete_file_index = PopulatedDeleteFileIndex :: new ( delete_contexts) ;
340+
341+ let partitioned_file =
342+ build_partitioned_data_file ( & Struct :: from_iter ( [ Some ( Literal :: long ( 100 ) ) ] ) , spec_id) ;
343+
344+ // All deletes apply to sequence 0
345+ let delete_files_to_apply_for_seq_0 =
346+ delete_file_index. get_deletes_for_data_file ( & partitioned_file, Some ( 0 ) ) ;
347+ assert_eq ! ( delete_files_to_apply_for_seq_0. len( ) , 4 ) ;
348+
349+ // All deletes apply to sequence 3
350+ let delete_files_to_apply_for_seq_3 =
351+ delete_file_index. get_deletes_for_data_file ( & partitioned_file, Some ( 3 ) ) ;
352+ assert_eq ! ( delete_files_to_apply_for_seq_3. len( ) , 4 ) ;
353+
354+ // Last 3 deletes apply to sequence 4
355+ let delete_files_to_apply_for_seq_4 =
356+ delete_file_index. get_deletes_for_data_file ( & partitioned_file, Some ( 4 ) ) ;
357+ let actual_paths_to_apply_for_seq_4: Vec < String > = delete_files_to_apply_for_seq_4
358+ . into_iter ( )
359+ . map ( |file| file. file_path )
360+ . collect ( ) ;
361+
362+ assert_eq ! (
363+ actual_paths_to_apply_for_seq_4,
364+ delete_file_paths[ delete_file_paths. len( ) - 3 ..]
365+ ) ;
366+
367+ // Last 3 deletes apply to sequence 5
368+ let delete_files_to_apply_for_seq_5 =
369+ delete_file_index. get_deletes_for_data_file ( & partitioned_file, Some ( 5 ) ) ;
370+ let actual_paths_to_apply_for_seq_5: Vec < String > = delete_files_to_apply_for_seq_5
371+ . into_iter ( )
372+ . map ( |file| file. file_path )
373+ . collect ( ) ;
374+ assert_eq ! (
375+ actual_paths_to_apply_for_seq_5,
376+ delete_file_paths[ delete_file_paths. len( ) - 3 ..]
377+ ) ;
378+
379+ // Only the last position delete applies to sequence 6
380+ let delete_files_to_apply_for_seq_6 =
381+ delete_file_index. get_deletes_for_data_file ( & partitioned_file, Some ( 6 ) ) ;
382+ let actual_paths_to_apply_for_seq_6: Vec < String > = delete_files_to_apply_for_seq_6
383+ . into_iter ( )
384+ . map ( |file| file. file_path )
385+ . collect ( ) ;
386+ assert_eq ! (
387+ actual_paths_to_apply_for_seq_6,
388+ delete_file_paths[ delete_file_paths. len( ) - 1 ..]
389+ ) ;
390+
391+ // Data file with different partition tuples does not match any delete files
392+ let partitioned_second_file =
393+ build_partitioned_data_file ( & Struct :: from_iter ( [ Some ( Literal :: long ( 200 ) ) ] ) , 1 ) ;
394+ let delete_files_to_apply_for_different_partition =
395+ delete_file_index. get_deletes_for_data_file ( & partitioned_second_file, Some ( 0 ) ) ;
396+ let actual_paths_to_apply_for_different_partition: Vec < String > =
397+ delete_files_to_apply_for_different_partition
398+ . into_iter ( )
399+ . map ( |file| file. file_path )
400+ . collect ( ) ;
401+ assert ! ( actual_paths_to_apply_for_different_partition. is_empty( ) ) ;
402+
403+ // Data file with same tuple but different spec ID does not match any delete files
404+ let partitioned_different_spec = build_partitioned_data_file ( & partition_one, 2 ) ;
405+ let delete_files_to_apply_for_different_spec =
406+ delete_file_index. get_deletes_for_data_file ( & partitioned_different_spec, Some ( 0 ) ) ;
407+ let actual_paths_to_apply_for_different_spec: Vec < String > =
408+ delete_files_to_apply_for_different_spec
409+ . into_iter ( )
410+ . map ( |file| file. file_path )
411+ . collect ( ) ;
412+ assert ! ( actual_paths_to_apply_for_different_spec. is_empty( ) ) ;
413+ }
414+
415+ fn build_unpartitioned_eq_delete ( ) -> DataFile {
416+ build_partitioned_eq_delete ( & Struct :: empty ( ) , 0 )
417+ }
418+
419+ fn build_partitioned_eq_delete ( partition : & Struct , spec_id : i32 ) -> DataFile {
420+ DataFileBuilder :: default ( )
421+ . file_path ( format ! ( "{}_equality_delete.parquet" , Uuid :: new_v4( ) ) )
422+ . file_format ( DataFileFormat :: Parquet )
423+ . content ( DataContentType :: EqualityDeletes )
424+ . equality_ids ( Some ( vec ! [ 1 ] ) )
425+ . record_count ( 1 )
426+ . partition ( partition. clone ( ) )
427+ . partition_spec_id ( spec_id)
428+ . file_size_in_bytes ( 100 )
429+ . build ( )
430+ . unwrap ( )
431+ }
432+
433+ fn build_unpartitioned_pos_delete ( ) -> DataFile {
434+ build_partitioned_pos_delete ( & Struct :: empty ( ) , 0 )
435+ }
436+
437+ fn build_partitioned_pos_delete ( partition : & Struct , spec_id : i32 ) -> DataFile {
438+ DataFileBuilder :: default ( )
439+ . file_path ( format ! ( "{}-pos-delete.parquet" , Uuid :: new_v4( ) ) )
440+ . file_format ( DataFileFormat :: Parquet )
441+ . content ( DataContentType :: PositionDeletes )
442+ . record_count ( 1 )
443+ . referenced_data_file ( Some ( "/some-data-file.parquet" . to_string ( ) ) )
444+ . partition ( partition. clone ( ) )
445+ . partition_spec_id ( spec_id)
446+ . file_size_in_bytes ( 100 )
447+ . build ( )
448+ . unwrap ( )
449+ }
450+
451+ fn build_unpartitioned_data_file ( ) -> DataFile {
452+ DataFileBuilder :: default ( )
453+ . file_path ( format ! ( "{}-data.parquet" , Uuid :: new_v4( ) ) )
454+ . file_format ( DataFileFormat :: Parquet )
455+ . content ( DataContentType :: Data )
456+ . record_count ( 100 )
457+ . partition ( Struct :: empty ( ) )
458+ . partition_spec_id ( 0 )
459+ . file_size_in_bytes ( 100 )
460+ . build ( )
461+ . unwrap ( )
462+ }
463+
464+ fn build_partitioned_data_file ( partition_value : & Struct , spec_id : i32 ) -> DataFile {
465+ DataFileBuilder :: default ( )
466+ . file_path ( format ! ( "{}-data.parquet" , Uuid :: new_v4( ) ) )
467+ . file_format ( DataFileFormat :: Parquet )
468+ . content ( DataContentType :: Data )
469+ . record_count ( 100 )
470+ . partition ( partition_value. clone ( ) )
471+ . partition_spec_id ( spec_id)
472+ . file_size_in_bytes ( 100 )
473+ . build ( )
474+ . unwrap ( )
475+ }
476+
477+ fn build_added_manifest_entry ( data_seq_number : i64 , file : & DataFile ) -> ManifestEntry {
478+ ManifestEntry :: builder ( )
479+ . status ( ManifestStatus :: Added )
480+ . sequence_number ( data_seq_number)
481+ . data_file ( file. clone ( ) )
482+ . build ( )
483+ }
484+ }
0 commit comments