@@ -428,63 +428,6 @@ impl Scan {
428428 self . scan_metadata_inner ( engine, self . replay_for_scan_metadata ( engine) ?)
429429 }
430430
431- /// Get a distributed driver phase for processing scan metadata.
432- ///
433- /// This method returns a [`DriverPhase`] that can be used for distributed execution.
434- /// The driver processes commits and single-part checkpoint manifests, then returns
435- /// the processor state and files that need to be distributed to executors.
436- ///
437- /// # Distributed Execution Pattern
438- ///
439- /// 1. **Driver phase**: Process commits and manifests on the coordinator
440- /// 2. **Serialize state**: Extract processor state and deduplicator
441- /// 3. **Distribute**: Send state + files to executors
442- /// 4. **Executor phase**: Each executor processes its assigned files
443- ///
444- /// # Example
445- ///
446- /// ```ignore
447- /// // On driver/coordinator
448- /// let mut driver = scan.scan_metadata_distributed(engine.clone())?;
449- ///
450- /// // Process driver-side batches
451- /// for batch in driver.by_ref() {
452- /// let scan_metadata = batch?;
453- /// // Handle scan metadata from commits/manifest
454- /// }
455- ///
456- /// // Check if executor phase is needed
457- /// match driver.finish()? {
458- /// DriverPhaseResult::Complete(_processor) => {
459- /// // All done! No executor phase needed
460- /// }
461- /// DriverPhaseResult::NeedsExecutorPhase { processor, files } => {
462- /// // Serialize for distribution
463- /// let (state, deduplicator) = processor.serialize()?;
464- ///
465- /// // Partition files and distribute to executors
466- /// for (executor, file_partition) in partition(files) {
467- /// executor.send(state.clone(), deduplicator.clone(), file_partition);
468- /// }
469- /// }
470- /// }
471- /// ```
472- ///
473- /// [`DriverPhase`]: crate::distributed::DriverPhase
474- pub ( crate ) fn scan_metadata_distributed (
475- & self ,
476- engine : Arc < dyn Engine > ,
477- ) -> DeltaResult < SequentialPhase < ScanLogReplayProcessor > > {
478- // Create the processor
479- let processor = ScanLogReplayProcessor :: new ( engine. as_ref ( ) , self . state_info . clone ( ) ) ?;
480-
481- // Get log segment
482- let log_segment = Arc :: new ( self . snapshot . log_segment ( ) . clone ( ) ) ;
483-
484- // Create and return driver phase
485- SequentialPhase :: try_new ( processor, log_segment, engine)
486- }
487-
488431 /// Get an updated iterator of [`ScanMetadata`]s based on an existing iterator of [`EngineData`]s.
489432 ///
490433 /// The existing iterator is assumed to contain data from a previous call to `scan_metadata`.
0 commit comments