Skip to content

Commit c998b0f

Browse files
committed
improve driver
1 parent 39cce52 commit c998b0f

File tree

1 file changed

+10
-15
lines changed

1 file changed

+10
-15
lines changed

kernel/src/distributed/driver.rs

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -116,23 +116,18 @@ impl<P: LogReplayProcessor> Iterator for DriverProcessor<P> {
116116
type Item = DeltaResult<P::Output>;
117117

118118
fn next(&mut self) -> Option<Self::Item> {
119-
let next = if let Some(next) = self.commit_phase.next() {
120-
Some(next)
121-
} else if let Some(manifest_reader) = self.checkpoint_manifest_phase.as_mut() {
122-
manifest_reader.next()
123-
} else {
124-
None
125-
};
119+
let next = self
120+
.commit_phase
121+
.next()
122+
.or_else(|| self.checkpoint_manifest_phase.as_mut()?.next());
126123

127-
match next {
128-
Some(batch_res) => {
129-
Some(batch_res.and_then(|batch| self.processor.process_actions_batch(batch)))
130-
}
131-
None => {
132-
self.is_finished = true;
133-
None
134-
}
124+
if next.is_none() {
125+
self.is_finished = true;
135126
}
127+
128+
next.map(|batch_res| {
129+
batch_res.and_then(|batch| self.processor.process_actions_batch(batch))
130+
})
136131
}
137132
}
138133

0 commit comments

Comments
 (0)