@@ -554,7 +554,7 @@ impl FileWriter for ParquetWriter {
554554 Ok ( ( ) )
555555 }
556556
557- async fn close ( mut self ) -> crate :: Result < Vec < crate :: spec :: DataFileBuilder > > {
557+ async fn close ( mut self ) -> Result < Vec < DataFileBuilder > > {
558558 let writer = match self . inner_writer . take ( ) {
559559 Some ( writer) => writer,
560560 None => return Ok ( vec ! [ ] ) ,
@@ -566,22 +566,33 @@ impl FileWriter for ParquetWriter {
566566
567567 let written_size = self . written_size . load ( std:: sync:: atomic:: Ordering :: Relaxed ) ;
568568
569- let parquet_metadata =
570- Arc :: new ( self . thrift_to_parquet_metadata ( metadata ) . map_err ( |err| {
569+ if self . current_row_num == 0 {
570+ self . out_file . delete ( ) . await . map_err ( |err| {
571571 Error :: new (
572572 ErrorKind :: Unexpected ,
573- "Failed to convert metadata from thrift to parquet." ,
573+ "Failed to delete empty parquet file ." ,
574574 )
575575 . with_source ( err)
576- } ) ?) ;
577-
578- Ok ( vec ! [ Self :: parquet_to_data_file_builder(
579- self . schema,
580- parquet_metadata,
581- written_size as usize ,
582- self . out_file. location( ) . to_string( ) ,
583- self . nan_value_count_visitor. nan_value_counts,
584- ) ?] )
576+ } ) ?;
577+ Ok ( vec ! [ ] )
578+ } else {
579+ let parquet_metadata =
580+ Arc :: new ( self . thrift_to_parquet_metadata ( metadata) . map_err ( |err| {
581+ Error :: new (
582+ ErrorKind :: Unexpected ,
583+ "Failed to convert metadata from thrift to parquet." ,
584+ )
585+ . with_source ( err)
586+ } ) ?) ;
587+
588+ Ok ( vec ! [ Self :: parquet_to_data_file_builder(
589+ self . schema,
590+ parquet_metadata,
591+ written_size as usize ,
592+ self . out_file. location( ) . to_string( ) ,
593+ self . nan_value_count_visitor. nan_value_counts,
594+ ) ?] )
595+ }
585596 }
586597}
587598
@@ -2218,4 +2229,44 @@ mod tests {
22182229
22192230 Ok ( ( ) )
22202231 }
2232+
2233+ #[ tokio:: test]
2234+ async fn test_write_empty_parquet_file ( ) {
2235+ let temp_dir = TempDir :: new ( ) . unwrap ( ) ;
2236+ let file_io = FileIOBuilder :: new_fs_io ( ) . build ( ) . unwrap ( ) ;
2237+ let location_gen =
2238+ MockLocationGenerator :: new ( temp_dir. path ( ) . to_str ( ) . unwrap ( ) . to_string ( ) ) ;
2239+ let file_name_gen =
2240+ DefaultFileNameGenerator :: new ( "test" . to_string ( ) , None , DataFileFormat :: Parquet ) ;
2241+
2242+ // write data
2243+ let pw = ParquetWriterBuilder :: new (
2244+ WriterProperties :: builder ( ) . build ( ) ,
2245+ Arc :: new (
2246+ Schema :: builder ( )
2247+ . with_schema_id ( 1 )
2248+ . with_fields ( vec ! [ NestedField :: required(
2249+ 0 ,
2250+ "col" ,
2251+ Type :: Primitive ( PrimitiveType :: Long ) ,
2252+ )
2253+ . with_id( 0 )
2254+ . into( ) ] )
2255+ . build ( )
2256+ . expect ( "Failed to create schema" ) ,
2257+ ) ,
2258+ file_io. clone ( ) ,
2259+ location_gen,
2260+ file_name_gen,
2261+ )
2262+ . build ( )
2263+ . await
2264+ . unwrap ( ) ;
2265+
2266+ let res = pw. close ( ) . await . unwrap ( ) ;
2267+ assert_eq ! ( res. len( ) , 0 ) ;
2268+
2269+ // Check that file should have been deleted.
2270+ assert_eq ! ( std:: fs:: read_dir( temp_dir. path( ) ) . unwrap( ) . count( ) , 0 ) ;
2271+ }
22212272}
0 commit comments