@@ -23,10 +23,8 @@ use uuid::Uuid;
2323
2424use super :: snapshot:: { DefaultManifestProcess , SnapshotProduceOperation , SnapshotProducer } ;
2525use super :: { ActionCommit , TransactionAction } ;
26- use crate :: error:: Result ;
27- use crate :: spec:: {
28- DataFile , ManifestContentType , ManifestEntry , ManifestFile , ManifestStatus , Operation ,
29- } ;
26+ use crate :: error:: { Error , ErrorKind , Result } ;
27+ use crate :: spec:: { DataFile , ManifestContentType , ManifestEntry , ManifestEntryRef , ManifestFile , ManifestStatus , Operation } ;
3028use crate :: table:: Table ;
3129
3230/// Transaction action for rewriting files.
@@ -108,6 +106,32 @@ impl TransactionAction for RewriteFilesAction {
108106 }
109107}
110108
109+ fn set_deleted_status ( entry : & ManifestEntryRef ) -> Result < ManifestEntry > {
110+ let builder = ManifestEntry :: builder ( )
111+ . status ( ManifestStatus :: Deleted )
112+ . snapshot_id ( entry. snapshot_id ( ) . ok_or_else ( || {
113+ Error :: new (
114+ ErrorKind :: DataInvalid ,
115+ format ! ( "Missing snapshot_id for entry with file path: {}" , entry. file_path( ) ) ,
116+ )
117+ } ) ?)
118+ . sequence_number ( entry. sequence_number ( ) . ok_or_else ( || {
119+ Error :: new (
120+ ErrorKind :: DataInvalid ,
121+ format ! ( "Missing sequence_number for entry with file path: {}" , entry. file_path( ) ) ,
122+ )
123+ } ) ?)
124+ . file_sequence_number ( entry. file_sequence_number ( ) . ok_or_else ( || {
125+ Error :: new (
126+ ErrorKind :: DataInvalid ,
127+ format ! ( "Missing file_sequence_number for entry with file path: {}" , entry. file_path( ) ) ,
128+ )
129+ } ) ?)
130+ . data_file ( entry. data_file ( ) . clone ( ) ) ;
131+
132+ Ok ( builder. build ( ) )
133+ }
134+
111135impl SnapshotProduceOperation for RewriteFilesOperation {
112136 fn operation ( & self ) -> Operation {
113137 Operation :: Replace
@@ -121,18 +145,6 @@ impl SnapshotProduceOperation for RewriteFilesOperation {
121145 let snapshot = snapshot_producer. table . metadata ( ) . current_snapshot ( ) ;
122146
123147 if let Some ( snapshot) = snapshot {
124- let gen_manifest_entry = |old_entry : & Arc < ManifestEntry > | {
125- // todo should not unwrap
126- let builder = ManifestEntry :: builder ( )
127- . status ( ManifestStatus :: Deleted )
128- . snapshot_id ( old_entry. snapshot_id ( ) . unwrap ( ) )
129- . sequence_number ( old_entry. sequence_number ( ) . unwrap ( ) )
130- . file_sequence_number ( old_entry. file_sequence_number ( ) . unwrap ( ) )
131- . data_file ( old_entry. data_file ( ) . clone ( ) ) ;
132-
133- builder. build ( )
134- } ;
135-
136148 let manifest_list = snapshot
137149 . load_manifest_list (
138150 snapshot_producer. table . file_io ( ) ,
@@ -153,7 +165,7 @@ impl SnapshotProduceOperation for RewriteFilesOperation {
153165 . iter ( )
154166 . any ( |f| f. file_path == entry. data_file ( ) . file_path )
155167 {
156- deleted_entries. push ( gen_manifest_entry ( entry) ) ;
168+ deleted_entries. push ( set_deleted_status ( entry) ? ) ;
157169 }
158170 }
159171 }
0 commit comments