@@ -14,14 +14,15 @@ use crate::actions::deletion_vector::{
1414 deletion_treemap_to_bools, split_vector, DeletionVectorDescriptor ,
1515} ;
1616use crate :: actions:: { get_commit_schema, ADD_NAME , REMOVE_NAME } ;
17+ use crate :: distributed:: sequential_phase:: SequentialPhase ;
1718use crate :: engine_data:: FilteredEngineData ;
1819use crate :: expressions:: transforms:: ExpressionTransform ;
1920use crate :: expressions:: { ColumnName , ExpressionRef , Predicate , PredicateRef , Scalar } ;
2021use crate :: kernel_predicates:: { DefaultKernelPredicateEvaluator , EmptyColumnResolver } ;
2122use crate :: listed_log_files:: ListedLogFiles ;
2223use crate :: log_replay:: { ActionsBatch , HasSelectionVector } ;
2324use crate :: log_segment:: LogSegment ;
24- use crate :: scan:: log_replay:: BASE_ROW_ID_NAME ;
25+ use crate :: scan:: log_replay:: { ScanLogReplayProcessor , BASE_ROW_ID_NAME } ;
2526use crate :: scan:: state:: { DvInfo , Stats } ;
2627use crate :: scan:: state_info:: StateInfo ;
2728use crate :: schema:: {
@@ -150,7 +151,9 @@ pub(crate) enum PhysicalPredicate {
150151#[ derive( serde:: Serialize , serde:: Deserialize , Clone ) ]
151152enum SerializablePhysicalPredicate {
152153 /// Has a predicate and schema (but we skip serializing the predicate itself)
153- Some { schema : crate :: schema:: StructType } ,
154+ Some {
155+ schema : crate :: schema:: StructType ,
156+ } ,
154157 StaticSkipAll ,
155158 None ,
156159}
@@ -458,7 +461,7 @@ impl Scan {
458461 /// DriverPhaseResult::NeedsExecutorPhase { processor, files } => {
459462 /// // Serialize for distribution
460463 /// let (state, deduplicator) = processor.serialize()?;
461- ///
464+ ///
462465 /// // Partition files and distribute to executors
463466 /// for (executor, file_partition) in partition(files) {
464467 /// executor.send(state.clone(), deduplicator.clone(), file_partition);
@@ -471,20 +474,15 @@ impl Scan {
471474 pub ( crate ) fn scan_metadata_distributed (
472475 & self ,
473476 engine : Arc < dyn Engine > ,
474- ) -> DeltaResult < crate :: distributed:: DriverPhase < log_replay:: ScanLogReplayProcessor > > {
475- use crate :: distributed:: DriverPhase ;
476-
477+ ) -> DeltaResult < SequentialPhase < ScanLogReplayProcessor > > {
477478 // Create the processor
478- let processor = log_replay:: ScanLogReplayProcessor :: new (
479- engine. as_ref ( ) ,
480- self . state_info . clone ( ) ,
481- ) ?;
482-
479+ let processor = ScanLogReplayProcessor :: new ( engine. as_ref ( ) , self . state_info . clone ( ) ) ?;
480+
483481 // Get log segment
484482 let log_segment = Arc :: new ( self . snapshot . log_segment ( ) . clone ( ) ) ;
485-
483+
486484 // Create and return driver phase
487- DriverPhase :: try_new ( processor, log_segment, engine)
485+ SequentialPhase :: try_new ( processor, log_segment, engine)
488486 }
489487
490488 /// Get an updated iterator of [`ScanMetadata`]s based on an existing iterator of [`EngineData`]s.
0 commit comments