Skip to content

Commit c155bf2

Browse files
committed
remove Arc<LogSegment>
1 parent a14814d commit c155bf2

File tree

1 file changed

+16
-10
lines changed

1 file changed

+16
-10
lines changed

kernel/src/distributed/sequential_phase.rs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use crate::{DeltaResult, Engine, Error, FileMeta};
3636
/// # Example
3737
///
3838
/// ```ignore
39-
/// let mut sequential = SequentialPhase::try_new(processor, log_segment, engine)?;
39+
/// let mut sequential = SequentialPhase::try_new(processor, &log_segment, engine)?;
4040
///
4141
/// // Iterate over sequential batches
4242
/// for batch in sequential.by_ref() {
@@ -73,7 +73,7 @@ pub(crate) struct SequentialPhase<P: LogReplayProcessor> {
7373
// Whether the iterator has been fully exhausted
7474
is_finished: bool,
7575
// The log segment that is being processed
76-
log_segment: Arc<LogSegment>,
76+
checkpoint_parts: Vec<FileMeta>,
7777
}
7878

7979
unsafe impl<P: LogReplayProcessor> Send for SequentialPhase<P> {}
@@ -97,7 +97,7 @@ impl<P: LogReplayProcessor> SequentialPhase<P> {
9797
#[allow(unused)]
9898
pub(crate) fn try_new(
9999
processor: P,
100-
log_segment: Arc<LogSegment>,
100+
log_segment: &LogSegment,
101101
engine: Arc<dyn Engine>,
102102
) -> DeltaResult<Self> {
103103
let commit_phase =
@@ -116,12 +116,18 @@ impl<P: LogReplayProcessor> SequentialPhase<P> {
116116
None
117117
};
118118

119+
let checkpoint_parts = log_segment
120+
.checkpoint_parts
121+
.iter()
122+
.map(|path| path.location.clone())
123+
.collect_vec();
124+
119125
Ok(Self {
120126
processor,
121127
commit_phase,
122128
checkpoint_manifest_phase,
123129
is_finished: false,
124-
log_segment,
130+
checkpoint_parts,
125131
})
126132
}
127133

@@ -147,7 +153,7 @@ impl<P: LogReplayProcessor> SequentialPhase<P> {
147153
let distributed_files = match self.checkpoint_manifest_phase {
148154
Some(manifest_reader) => manifest_reader.extract_sidecars()?,
149155
None => {
150-
let parts = &self.log_segment.checkpoint_parts;
156+
let parts = self.checkpoint_parts;
151157
require!(
152158
parts.len() != 1,
153159
Error::generic(
@@ -156,7 +162,7 @@ impl<P: LogReplayProcessor> SequentialPhase<P> {
156162
)
157163
);
158164
// If this is a multi-part checkpoint, use the checkpoint parts for distributed phase
159-
parts.iter().map(|path| path.location.clone()).collect_vec()
165+
parts
160166
}
161167
};
162168

@@ -205,7 +211,7 @@ mod tests {
205211
expected_sidecars: &[&str],
206212
) -> DeltaResult<()> {
207213
let (engine, snapshot, _tempdir) = load_test_table(table_name)?;
208-
let log_segment = Arc::new(snapshot.log_segment().clone());
214+
let log_segment = snapshot.log_segment();
209215

210216
let state_info = Arc::new(StateInfo::try_new(
211217
snapshot.schema(),
@@ -215,7 +221,7 @@ mod tests {
215221
)?);
216222

217223
let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?;
218-
let mut sequential = SequentialPhase::try_new(processor, log_segment, engine.clone())?;
224+
let mut sequential = SequentialPhase::try_new(processor, &log_segment, engine.clone())?;
219225

220226
// Process all batches and collect Add file paths
221227
let mut file_paths = Vec::new();
@@ -298,7 +304,7 @@ mod tests {
298304
#[test]
299305
fn test_sequential_finish_before_exhaustion_error() -> DeltaResult<()> {
300306
let (engine, snapshot, _tempdir) = load_test_table("table-without-dv-small")?;
301-
let log_segment = Arc::new(snapshot.log_segment().clone());
307+
let log_segment = snapshot.log_segment();
302308

303309
let state_info = Arc::new(StateInfo::try_new(
304310
snapshot.schema(),
@@ -308,7 +314,7 @@ mod tests {
308314
)?);
309315

310316
let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?;
311-
let mut sequential = SequentialPhase::try_new(processor, log_segment, engine.clone())?;
317+
let mut sequential = SequentialPhase::try_new(processor, &log_segment, engine.clone())?;
312318

313319
// Call next() once but don't exhaust the iterator
314320
if let Some(result) = sequential.next() {

0 commit comments

Comments
 (0)