Skip to content

Commit 22673f9

Browse files
committed
driver
1 parent bd7f846 commit 22673f9

File tree

1 file changed

+44
-66
lines changed

1 file changed

+44
-66
lines changed

kernel/src/distributed/driver.rs

Lines changed: 44 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::actions::get_commit_schema;
1111
use crate::log_reader::commit::CommitReader;
1212
use crate::log_reader::manifest::{AfterManifest, ManifestPhase};
1313
use crate::log_replay::LogReplayProcessor;
14-
use crate::log_segment::LogSegment;
14+
use crate::log_segment::{self, LogSegment};
1515
use crate::{DeltaResult, Engine, Error, FileMeta};
1616

1717
/// Driver-side log replay (Phase 1) for distributed execution.
@@ -53,9 +53,9 @@ use crate::{DeltaResult, Engine, Error, FileMeta};
5353
/// ```
5454
pub(crate) struct DriverPhase<P> {
5555
processor: P,
56-
state: Option<DriverState>,
56+
state: DriverState,
5757
/// Pre-computed next state after commit for concurrent IO
58-
next_state_after_commit: Option<DriverState>,
58+
after_commit: Option<DriverState>,
5959
/// Whether the iterator has been fully exhausted
6060
is_finished: bool,
6161
}
@@ -93,22 +93,22 @@ impl<P: LogReplayProcessor> DriverPhase<P> {
9393
log_segment: Arc<LogSegment>,
9494
engine: Arc<dyn Engine>,
9595
) -> DeltaResult<Self> {
96-
let commit_schema = get_commit_schema();
97-
let commit = CommitReader::try_new(engine.as_ref(), &log_segment, commit_schema.clone())?;
96+
let commit =
97+
CommitReader::try_new(engine.as_ref(), &log_segment, get_commit_schema().clone())?;
9898

9999
// Concurrently compute the next state after commit for parallel IO
100-
let next_state_after_commit = Some(Self::compute_state_after_commit(&log_segment, engine.clone())?);
100+
let after_commit = Self::compute_state_after_commit(&log_segment, engine.clone())?;
101101

102102
Ok(Self {
103103
processor,
104-
state: Some(DriverState::Commit(commit)),
105-
next_state_after_commit,
104+
state: DriverState::Commit(commit),
105+
after_commit: Some(after_commit),
106106
is_finished: false,
107107
})
108108
}
109109

110110
/// Compute the next state after CommitReader is exhausted.
111-
///
111+
///
112112
/// This is called during construction to enable concurrent IO initialization.
113113
/// Returns the appropriate DriverState based on checkpoint configuration:
114114
/// - Single-part checkpoint → Manifest phase (pre-initialized)
@@ -118,26 +118,22 @@ impl<P: LogReplayProcessor> DriverPhase<P> {
118118
log_segment: &LogSegment,
119119
engine: Arc<dyn Engine>,
120120
) -> DeltaResult<DriverState> {
121-
if log_segment.checkpoint_parts.is_empty() {
122-
// No checkpoint
123-
Ok(DriverState::Done)
124-
} else if log_segment.checkpoint_parts.len() == 1 {
125-
// Single-part checkpoint: create manifest phase
126-
let checkpoint_part = &log_segment.checkpoint_parts[0];
127-
let manifest = ManifestPhase::new(
128-
checkpoint_part.location.clone(),
129-
log_segment.log_root.clone(),
130-
engine,
131-
)?;
132-
Ok(DriverState::Manifest(manifest))
133-
} else {
134-
// Multi-part checkpoint: all parts are leaf files
135-
let files: Vec<_> = log_segment
136-
.checkpoint_parts
137-
.iter()
138-
.map(|p| p.location.clone())
139-
.collect();
140-
Ok(DriverState::ExecutorPhase { files })
121+
match log_segment.checkpoint_parts.as_slice() {
122+
[] => Ok(DriverState::Done),
123+
[single_part] => {
124+
// Single-part checkpoint: create manifest phase
125+
let manifest = ManifestPhase::new(
126+
single_part.location.clone(),
127+
log_segment.log_root.clone(),
128+
engine,
129+
)?;
130+
Ok(DriverState::Manifest(manifest))
131+
}
132+
multi_part => {
133+
// Multi-part checkpoint: all parts are leaf files
134+
let files: Vec<_> = multi_part.iter().map(|p| p.location.clone()).collect();
135+
Ok(DriverState::ExecutorPhase { files })
136+
}
141137
}
142138
}
143139
}
@@ -148,7 +144,7 @@ impl<P: LogReplayProcessor> Iterator for DriverPhase<P> {
148144
fn next(&mut self) -> Option<Self::Item> {
149145
loop {
150146
// Try to get item from current phase
151-
let batch_result = match self.state.as_mut()? {
147+
let batch_result = match &mut self.state {
152148
DriverState::Commit(phase) => phase.next(),
153149
DriverState::Manifest(phase) => phase.next(),
154150
DriverState::ExecutorPhase { .. } | DriverState::Done => {
@@ -164,46 +160,30 @@ impl<P: LogReplayProcessor> Iterator for DriverPhase<P> {
164160
}
165161
Some(Err(e)) => return Some(Err(e)),
166162
None => {
167-
// Phase exhausted - transition
168-
let old_state = self.state.take()?;
169-
match self.transition(old_state) {
170-
Ok(new_state) => self.state = Some(new_state),
171-
Err(e) => return Some(Err(e)),
172-
}
163+
// Transition to next state after commit. If there is no next state, this state
164+
// machine is done
165+
self.state = match self.state {
166+
DriverState::Commit(_) => {
167+
self.after_commit.take().unwrap_or(DriverState::Done)
168+
}
169+
DriverState::Manifest(manifest_phase) => match manifest_phase.finalize() {
170+
Ok(AfterManifest::Done) => DriverState::Done,
171+
Ok(AfterManifest::Sidecars { sidecars }) => {
172+
DriverState::ExecutorPhase { files: sidecars }
173+
}
174+
Err(err) => return Some(Err(err)),
175+
},
176+
DriverState::Done => DriverState::Done,
177+
DriverState::ExecutorPhase { files } => {
178+
DriverState::ExecutorPhase { files }
179+
}
180+
};
173181
}
174182
}
175183
}
176184
}
177185
}
178186

179-
impl<P> DriverPhase<P> {
180-
fn transition(&mut self, state: DriverState) -> DeltaResult<DriverState> {
181-
match state {
182-
DriverState::Commit(_) => {
183-
// Use pre-computed state (always initialized in constructor)
184-
self.next_state_after_commit.take().ok_or_else(|| {
185-
Error::generic("next_state_after_commit should be initialized in constructor")
186-
})
187-
}
188-
189-
DriverState::Manifest(manifest) => {
190-
// After ManifestPhase exhausted, check for sidecars
191-
match manifest.finalize()? {
192-
AfterManifest::Sidecars { sidecars } => {
193-
Ok(DriverState::ExecutorPhase { files: sidecars })
194-
}
195-
AfterManifest::Done => Ok(DriverState::Done),
196-
}
197-
}
198-
199-
// These states are terminal and should never be transitioned from
200-
DriverState::ExecutorPhase { .. } | DriverState::Done => {
201-
Err(Error::generic("Invalid state transition: terminal state reached"))
202-
}
203-
}
204-
}
205-
}
206-
207187
// ============================================================================
208188
// Streaming API: available when P implements LogReplayProcessor
209189
// ============================================================================
@@ -239,7 +219,6 @@ impl<P: LogReplayProcessor> DriverPhase<P> {
239219
}
240220
}
241221

242-
243222
#[cfg(test)]
244223
mod tests {
245224
use super::*;
@@ -419,5 +398,4 @@ mod tests {
419398

420399
Ok(())
421400
}
422-
423401
}

0 commit comments

Comments
 (0)