1515// specific language governing permissions and limitations
1616// under the License.
1717
18+ use std:: collections:: HashSet ;
1819use std:: sync:: Arc ;
1920
2021use futures:: channel:: mpsc:: Sender ;
2122use futures:: { SinkExt , TryFutureExt } ;
23+ use itertools:: Itertools ;
2224
2325use crate :: delete_file_index:: DeleteFileIndex ;
2426use crate :: expr:: { Bind , BoundPredicate , Predicate } ;
@@ -28,11 +30,12 @@ use crate::scan::{
2830 PartitionFilterCache ,
2931} ;
3032use crate :: spec:: {
31- ManifestContentType , ManifestEntryRef , ManifestFile , ManifestList , SchemaRef , SnapshotRef ,
32- TableMetadataRef ,
33+ DataContentType , ManifestContentType , ManifestEntryRef , ManifestFile , ManifestList ,
34+ ManifestStatus , Operation , SchemaRef , SnapshotRef , TableMetadataRef ,
3335} ;
3436use crate :: { Error , ErrorKind , Result } ;
3537
38+ type ManifestEntryFilterFn = dyn Fn ( & ManifestEntryRef ) -> bool + Send + Sync ;
3639/// Wraps a [`ManifestFile`] alongside the objects that are needed
3740/// to process it in a thread-safe manner
3841pub ( crate ) struct ManifestFileContext {
@@ -45,7 +48,11 @@ pub(crate) struct ManifestFileContext {
4548 object_cache : Arc < ObjectCache > ,
4649 snapshot_schema : SchemaRef ,
4750 expression_evaluator_cache : Arc < ExpressionEvaluatorCache > ,
48- delete_file_index : DeleteFileIndex ,
51+ delete_file_index : Option < DeleteFileIndex > ,
52+
53+ /// filter manifest entries.
54+ /// Used for different kind of scans, e.g., only scan newly added files without delete files.
55+ filter_fn : Option < Arc < ManifestEntryFilterFn > > ,
4956}
5057
5158/// Wraps a [`ManifestEntryRef`] alongside the objects that are needed
@@ -58,7 +65,7 @@ pub(crate) struct ManifestEntryContext {
5865 pub bound_predicates : Option < Arc < BoundPredicates > > ,
5966 pub partition_spec_id : i32 ,
6067 pub snapshot_schema : SchemaRef ,
61- pub delete_file_index : DeleteFileIndex ,
68+ pub delete_file_index : Option < DeleteFileIndex > ,
6269}
6370
6471impl ManifestFileContext {
@@ -74,12 +81,13 @@ impl ManifestFileContext {
7481 mut sender,
7582 expression_evaluator_cache,
7683 delete_file_index,
77- ..
84+ filter_fn ,
7885 } = self ;
86+ let filter_fn = filter_fn. unwrap_or_else ( || Arc :: new ( |_| true ) ) ;
7987
8088 let manifest = object_cache. get_manifest ( & manifest_file) . await ?;
8189
82- for manifest_entry in manifest. entries ( ) {
90+ for manifest_entry in manifest. entries ( ) . iter ( ) . filter ( |e| filter_fn ( e ) ) {
8391 let manifest_entry_context = ManifestEntryContext {
8492 // TODO: refactor to avoid the expensive ManifestEntry clone
8593 manifest_entry : manifest_entry. clone ( ) ,
@@ -105,13 +113,16 @@ impl ManifestEntryContext {
105113 /// consume this `ManifestEntryContext`, returning a `FileScanTask`
106114 /// created from it
107115 pub ( crate ) async fn into_file_scan_task ( self ) -> Result < FileScanTask > {
108- let deletes = self
109- . delete_file_index
110- . get_deletes_for_data_file (
111- self . manifest_entry . data_file ( ) ,
112- self . manifest_entry . sequence_number ( ) ,
113- )
114- . await ;
116+ let deletes = if let Some ( delete_file_index) = self . delete_file_index {
117+ delete_file_index
118+ . get_deletes_for_data_file (
119+ self . manifest_entry . data_file ( ) ,
120+ self . manifest_entry . sequence_number ( ) ,
121+ )
122+ . await
123+ } else {
124+ vec ! [ ]
125+ } ;
115126
116127 Ok ( FileScanTask {
117128 start : 0 ,
@@ -150,6 +161,11 @@ pub(crate) struct PlanContext {
150161 pub partition_filter_cache : Arc < PartitionFilterCache > ,
151162 pub manifest_evaluator_cache : Arc < ManifestEvaluatorCache > ,
152163 pub expression_evaluator_cache : Arc < ExpressionEvaluatorCache > ,
164+
165+ // for incremental scan.
166+ // If `to_snapshot_id` is set, it means incremental scan. `from_snapshot_id` can be `None`.
167+ pub from_snapshot_id : Option < i64 > ,
168+ pub to_snapshot_id : Option < i64 > ,
153169}
154170
155171impl PlanContext {
@@ -181,23 +197,84 @@ impl PlanContext {
181197 Ok ( partition_filter)
182198 }
183199
184- pub ( crate ) fn build_manifest_file_contexts (
200+ pub ( crate ) async fn build_manifest_file_contexts (
185201 & self ,
186- manifest_list : Arc < ManifestList > ,
187202 tx_data : Sender < ManifestEntryContext > ,
188- delete_file_idx : DeleteFileIndex ,
189- delete_file_tx : Sender < ManifestEntryContext > ,
203+ delete_file_idx_and_tx : Option < ( DeleteFileIndex , Sender < ManifestEntryContext > ) > ,
190204 ) -> Result < Box < impl Iterator < Item = Result < ManifestFileContext > > + ' static > > {
191- let manifest_files = manifest_list. entries ( ) . iter ( ) ;
205+ let mut filter_fn: Option < Arc < ManifestEntryFilterFn > > = None ;
206+ let manifest_files = {
207+ if let Some ( to_snapshot_id) = self . to_snapshot_id {
208+ // Incremental scan mode:
209+ // Get all added files between two snapshots.
210+ // - data files in `Append` and `Overwrite` snapshots are included.
211+ // - delete files are ignored
212+ // - `Replace` snapshots (e.g., compaction) are ignored.
213+ //
214+ // `latest_snapshot_id` is inclusive, `oldest_snapshot_id` is exclusive.
215+
216+ // prevent misuse
217+ assert ! (
218+ delete_file_idx_and_tx. is_none( ) ,
219+ "delete file is not supported in incremental scan mode"
220+ ) ;
221+
222+ let snapshots =
223+ ancestors_between ( & self . table_metadata , to_snapshot_id, self . from_snapshot_id )
224+ . filter ( |snapshot| {
225+ matches ! (
226+ snapshot. summary( ) . operation,
227+ Operation :: Append | Operation :: Overwrite
228+ )
229+ } )
230+ . collect_vec ( ) ;
231+ let snapshot_ids: HashSet < i64 > = snapshots
232+ . iter ( )
233+ . map ( |snapshot| snapshot. snapshot_id ( ) )
234+ . collect ( ) ;
235+
236+ let mut manifest_files = vec ! [ ] ;
237+ for snapshot in snapshots {
238+ let manifest_list = self
239+ . object_cache
240+ . get_manifest_list ( & snapshot, & self . table_metadata )
241+ . await ?;
242+ for entry in manifest_list. entries ( ) {
243+ if !snapshot_ids. contains ( & entry. added_snapshot_id ) {
244+ continue ;
245+ }
246+ manifest_files. push ( entry. clone ( ) ) ;
247+ }
248+ }
249+
250+ filter_fn = Some ( Arc :: new ( move |entry : & ManifestEntryRef | {
251+ matches ! ( entry. status( ) , ManifestStatus :: Added )
252+ && matches ! ( entry. data_file( ) . content_type( ) , DataContentType :: Data )
253+ && (
254+ // Is it possible that the snapshot id here is not contained?
255+ entry. snapshot_id ( ) . is_none ( )
256+ || snapshot_ids. contains ( & entry. snapshot_id ( ) . unwrap ( ) )
257+ )
258+ } ) ) ;
259+
260+ manifest_files
261+ } else {
262+ let manifest_list = self . get_manifest_list ( ) . await ?;
263+ manifest_list. entries ( ) . to_vec ( )
264+ }
265+ } ;
192266
193267 // TODO: Ideally we could ditch this intermediate Vec as we return an iterator.
194268 let mut filtered_mfcs = vec ! [ ] ;
195269
196- for manifest_file in manifest_files {
197- let tx = if manifest_file. content == ManifestContentType :: Deletes {
198- delete_file_tx. clone ( )
270+ for manifest_file in & manifest_files {
271+ let ( delete_file_idx, tx) = if manifest_file. content == ManifestContentType :: Deletes {
272+ let Some ( ( delete_file_idx, tx) ) = delete_file_idx_and_tx. as_ref ( ) else {
273+ continue ;
274+ } ;
275+ ( Some ( delete_file_idx. clone ( ) ) , tx. clone ( ) )
199276 } else {
200- tx_data. clone ( )
277+ ( delete_file_idx_and_tx . as_ref ( ) . map ( | ( idx , _ ) | idx . clone ( ) ) , tx_data. clone ( ) )
201278 } ;
202279
203280 let partition_bound_predicate = if self . predicate . is_some ( ) {
@@ -225,7 +302,8 @@ impl PlanContext {
225302 manifest_file,
226303 partition_bound_predicate,
227304 tx,
228- delete_file_idx. clone ( ) ,
305+ delete_file_idx,
306+ filter_fn. clone ( ) ,
229307 ) ;
230308
231309 filtered_mfcs. push ( Ok ( mfc) ) ;
@@ -239,7 +317,8 @@ impl PlanContext {
239317 manifest_file : & ManifestFile ,
240318 partition_filter : Option < Arc < BoundPredicate > > ,
241319 sender : Sender < ManifestEntryContext > ,
242- delete_file_index : DeleteFileIndex ,
320+ delete_file_index : Option < DeleteFileIndex > ,
321+ filter_fn : Option < Arc < ManifestEntryFilterFn > > ,
243322 ) -> ManifestFileContext {
244323 let bound_predicates =
245324 if let ( Some ( ref partition_bound_predicate) , Some ( snapshot_bound_predicate) ) =
@@ -262,6 +341,61 @@ impl PlanContext {
262341 field_ids : self . field_ids . clone ( ) ,
263342 expression_evaluator_cache : self . expression_evaluator_cache . clone ( ) ,
264343 delete_file_index,
344+ filter_fn,
265345 }
266346 }
267347}
348+
349+ struct Ancestors {
350+ next : Option < SnapshotRef > ,
351+ get_snapshot : Box < dyn Fn ( i64 ) -> Option < SnapshotRef > + Send > ,
352+ }
353+
354+ impl Iterator for Ancestors {
355+ type Item = SnapshotRef ;
356+
357+ fn next ( & mut self ) -> Option < Self :: Item > {
358+ let snapshot = self . next . take ( ) ?;
359+ let result = snapshot. clone ( ) ;
360+ self . next = snapshot
361+ . parent_snapshot_id ( )
362+ . and_then ( |id| ( self . get_snapshot ) ( id) ) ;
363+ Some ( result)
364+ }
365+ }
366+
367+ /// Iterate starting from `snapshot` (inclusive) to the root snapshot.
368+ fn ancestors_of (
369+ table_metadata : & TableMetadataRef ,
370+ snapshot : i64 ,
371+ ) -> Box < dyn Iterator < Item = SnapshotRef > + Send > {
372+ if let Some ( snapshot) = table_metadata. snapshot_by_id ( snapshot) {
373+ let table_metadata = table_metadata. clone ( ) ;
374+ Box :: new ( Ancestors {
375+ next : Some ( snapshot. clone ( ) ) ,
376+ get_snapshot : Box :: new ( move |id| table_metadata. snapshot_by_id ( id) . cloned ( ) ) ,
377+ } )
378+ } else {
379+ Box :: new ( std:: iter:: empty ( ) )
380+ }
381+ }
382+
383+ /// Iterate starting from `snapshot` (inclusive) to `oldest_snapshot_id` (exclusive).
384+ fn ancestors_between (
385+ table_metadata : & TableMetadataRef ,
386+ latest_snapshot_id : i64 ,
387+ oldest_snapshot_id : Option < i64 > ,
388+ ) -> Box < dyn Iterator < Item = SnapshotRef > + Send > {
389+ let Some ( oldest_snapshot_id) = oldest_snapshot_id else {
390+ return Box :: new ( ancestors_of ( table_metadata, latest_snapshot_id) ) ;
391+ } ;
392+
393+ if latest_snapshot_id == oldest_snapshot_id {
394+ return Box :: new ( std:: iter:: empty ( ) ) ;
395+ }
396+
397+ Box :: new (
398+ ancestors_of ( table_metadata, latest_snapshot_id)
399+ . take_while ( move |snapshot| snapshot. snapshot_id ( ) != oldest_snapshot_id) ,
400+ )
401+ }
0 commit comments