Skip to content

Commit b986734

Browse files
committed
fix driver state management
1 parent 22673f9 commit b986734

File tree

1 file changed

+8
-4
lines changed

1 file changed

+8
-4
lines changed

kernel/src/distributed/driver.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ use crate::{DeltaResult, Engine, Error, FileMeta};
5353
/// ```
5454
pub(crate) struct DriverPhase<P> {
5555
processor: P,
56-
state: DriverState,
56+
state: Option<DriverState>,
5757
/// Pre-computed next state after commit for concurrent IO
5858
after_commit: Option<DriverState>,
5959
/// Whether the iterator has been fully exhausted
@@ -101,7 +101,7 @@ impl<P: LogReplayProcessor> DriverPhase<P> {
101101

102102
Ok(Self {
103103
processor,
104-
state: DriverState::Commit(commit),
104+
state: Some(DriverState::Commit(commit)),
105105
after_commit: Some(after_commit),
106106
is_finished: false,
107107
})
@@ -144,7 +144,7 @@ impl<P: LogReplayProcessor> Iterator for DriverPhase<P> {
144144
fn next(&mut self) -> Option<Self::Item> {
145145
loop {
146146
// Try to get item from current phase
147-
let batch_result = match &mut self.state {
147+
let batch_result = match self.state.as_mut()? {
148148
DriverState::Commit(phase) => phase.next(),
149149
DriverState::Manifest(phase) => phase.next(),
150150
DriverState::ExecutorPhase { .. } | DriverState::Done => {
@@ -160,9 +160,12 @@ impl<P: LogReplayProcessor> Iterator for DriverPhase<P> {
160160
}
161161
Some(Err(e)) => return Some(Err(e)),
162162
None => {
163+
let Some(state) = self.state.take() else {
164+
return Some(Err(Error::generic("invalid")));
165+
};
163166
// Transition to next state after commit. If there is no next state, this state
164167
// machine is done
165-
self.state = match self.state {
168+
let new_state = match state {
166169
DriverState::Commit(_) => {
167170
self.after_commit.take().unwrap_or(DriverState::Done)
168171
}
@@ -178,6 +181,7 @@ impl<P: LogReplayProcessor> Iterator for DriverPhase<P> {
178181
DriverState::ExecutorPhase { files }
179182
}
180183
};
184+
self.state = Some(new_state);
181185
}
182186
}
183187
}

0 commit comments

Comments
 (0)