Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
9116c74
commit reader
OussamaSaoudi Nov 18, 2025
7c04cd7
improve commit
OussamaSaoudi Nov 18, 2025
20c2a95
cleanup
OussamaSaoudi Nov 24, 2025
703dc4e
commit reader
OussamaSaoudi Nov 24, 2025
fc9c3b6
manifest v1
OussamaSaoudi Nov 18, 2025
b2bf9d3
manifest
OussamaSaoudi Nov 18, 2025
8ccc3a6
allow_unused
OussamaSaoudi Nov 18, 2025
e63e7a8
improve test
OussamaSaoudi Nov 19, 2025
f95ab27
fix manifest
OussamaSaoudi Nov 24, 2025
3ac0de8
more cleanup
OussamaSaoudi Nov 24, 2025
cba5141
address pr reviews
OussamaSaoudi Nov 24, 2025
bc84caa
rename to CheckpointManifestReader
OussamaSaoudi Nov 24, 2025
1bcc46e
address comments
OussamaSaoudi Nov 25, 2025
a534df5
simplify
OussamaSaoudi Dec 1, 2025
9bb5cd4
implement driver
OussamaSaoudi Nov 19, 2025
702dd25
driver
OussamaSaoudi Nov 19, 2025
85a9a0c
driver
OussamaSaoudi Nov 21, 2025
5be2f17
fix driver state management
OussamaSaoudi Nov 21, 2025
cb35c69
update sidecars
OussamaSaoudi Nov 25, 2025
cc9d2c7
simplify driver
OussamaSaoudi Nov 25, 2025
759972a
improve driver
OussamaSaoudi Nov 25, 2025
46d94ef
cleanup driver
OussamaSaoudi Nov 25, 2025
d56179d
more cleanup
OussamaSaoudi Nov 26, 2025
ac86389
address comments
OussamaSaoudi Nov 26, 2025
322260d
format
OussamaSaoudi Nov 26, 2025
1399a80
cleanup
OussamaSaoudi Dec 1, 2025
a14814d
remove unnecessary change
OussamaSaoudi Dec 1, 2025
c155bf2
remove Arc<LogSegment>
OussamaSaoudi Dec 2, 2025
f8978c6
simplify driver
OussamaSaoudi Nov 25, 2025
953944a
serde first draft
OussamaSaoudi Nov 19, 2025
ffd094d
remove old driver naming
OussamaSaoudi Nov 26, 2025
b7d918a
remove public-facing api
OussamaSaoudi Dec 2, 2025
745077f
add back send
OussamaSaoudi Dec 2, 2025
720a46c
cleanup
OussamaSaoudi Dec 2, 2025
74cdbd4
fix format
OussamaSaoudi Dec 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kernel/src/distributed/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub(crate) mod sequential_phase;
366 changes: 366 additions & 0 deletions kernel/src/distributed/sequential_phase.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,366 @@
//! Sequential log replay processor that happens before the distributed phase.
//!
//! This module provides sequential phase log replay that processes commits and
//! single-part checkpoint manifests, then returns the processor and any files (sidecars or
//! multi-part checkpoint parts) for parallel processing by the distributed phase. This phase
//! must be completed before the distributed phase can start.
//!
//! For multi-part checkpoints, the sequential phase skips manifest processing and returns
//! the checkpoint parts for parallel processing.

use std::sync::Arc;

use itertools::Itertools;

use crate::actions::get_commit_schema;
use crate::log_reader::checkpoint_manifest::CheckpointManifestReader;
use crate::log_reader::commit::CommitReader;
use crate::log_replay::LogReplayProcessor;
use crate::log_segment::LogSegment;
use crate::utils::require;
use crate::{DeltaResult, Engine, Error, FileMeta};

/// Sequential log replay processor for distributed execution.
///
/// This iterator processes log replay sequentially:
/// 1. Commit files (JSON)
/// 2. Manifest (single-part checkpoint, if present)
///
/// After exhaustion, call `finish()` to extract:
/// - The processor (for serialization and distribution)
/// - Files (sidecars or multi-part checkpoint parts) for parallel processing
///
/// # Type Parameters
/// - `P`: A [`LogReplayProcessor`] implementation that processes action batches
///
/// # Example
///
/// ```ignore
/// let mut sequential = SequentialPhase::try_new(processor, &log_segment, engine)?;
///
/// // Iterate over sequential batches
/// for batch in sequential.by_ref() {
/// let metadata = batch?;
/// // Process metadata
/// }
///
/// // Extract processor and files for distribution (if needed)
/// match sequential.finish()? {
/// AfterSequential::Distributed { processor, files } => {
/// // Distributed phase needed - distribute files for parallel processing.
/// // If crossing the network boundary, the processor must be serialized.
/// let serialized = processor.serialize()?;
/// let partitions = partition_files(files, num_workers);
/// for (worker, partition) in partitions {
/// worker.send(serialized.clone(), partition)?;
/// }
/// }
/// AfterSequential::Done(processor) => {
/// // No distributed phase needed - all processing complete sequentially
/// println!("Log replay complete");
/// }
/// }
/// ```
#[allow(unused)]
pub(crate) struct SequentialPhase<P: LogReplayProcessor> {
// The processor that will be used to process the action batches
processor: P,
// The commit reader that will be used to read the commit files
commit_phase: CommitReader,
// The checkpoint manifest reader that will be used to read the checkpoint manifest files.
// If the checkpoint is single-part, this will be Some(CheckpointManifestReader).
checkpoint_manifest_phase: Option<CheckpointManifestReader>,
// Whether the iterator has been fully exhausted
is_finished: bool,
// The log segment that is being processed
checkpoint_parts: Vec<FileMeta>,
}

unsafe impl<P: LogReplayProcessor> Send for SequentialPhase<P> {}

/// Result of sequential log replay processing.
#[allow(unused)]
pub(crate) enum AfterSequential<P: LogReplayProcessor> {
/// All processing complete sequentially - no distributed phase needed.
Done(P),
/// Distributed phase needed - distribute files for parallel processing.
Distributed { processor: P, files: Vec<FileMeta> },
}

impl<P: LogReplayProcessor> SequentialPhase<P> {
/// Create a new sequential phase log replay.
///
/// # Parameters
/// - `processor`: The log replay processor
/// - `log_segment`: The log segment to process
/// - `engine`: Engine for reading files
#[allow(unused)]
pub(crate) fn try_new(
processor: P,
log_segment: &LogSegment,
engine: Arc<dyn Engine>,
) -> DeltaResult<Self> {
let commit_phase =
CommitReader::try_new(engine.as_ref(), &log_segment, get_commit_schema().clone())?;

Check failure on line 104 in kernel/src/distributed/sequential_phase.rs

View workflow job for this annotation

GitHub Actions / build (macOS-latest)

this expression creates a reference which is immediately dereferenced by the compiler

Check failure on line 104 in kernel/src/distributed/sequential_phase.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

this expression creates a reference which is immediately dereferenced by the compiler

// Concurrently start reading the checkpoint manifest. Only create a checkpoint manifest
// reader if the checkpoint is single-part.
let checkpoint_manifest_phase =
if let [single_part] = log_segment.checkpoint_parts.as_slice() {
Some(CheckpointManifestReader::try_new(
engine,
single_part,
log_segment.log_root.clone(),
)?)
} else {
None
};

let checkpoint_parts = log_segment
.checkpoint_parts
.iter()
.map(|path| path.location.clone())
.collect_vec();

Ok(Self {
processor,
commit_phase,
checkpoint_manifest_phase,
is_finished: false,
checkpoint_parts,
})
}

/// Complete sequential phase and extract processor + files for distribution.
///
/// Must be called after the iterator is exhausted.
///
/// # Returns
/// - `Done`: All processing done sequentially - no distributed phase needed
/// - `Distributed`: Distributed phase needed. The resulting files may be processed
/// in parallel.
///
/// # Errors
/// Returns an error if called before iterator exhaustion.
#[allow(unused)]
pub(crate) fn finish(self) -> DeltaResult<AfterSequential<P>> {
if !self.is_finished {
return Err(Error::generic(
"Must exhaust iterator before calling finish()",
));
}

let distributed_files = match self.checkpoint_manifest_phase {
Some(manifest_reader) => manifest_reader.extract_sidecars()?,
None => {
let parts = self.checkpoint_parts;
require!(
parts.len() != 1,
Error::generic(
"Invariant violation: If there is exactly one checkpoint part,
there must be a manifest reader"
)
);
// If this is a multi-part checkpoint, use the checkpoint parts for distributed phase
parts
}
};

if distributed_files.is_empty() {
Ok(AfterSequential::Done(self.processor))
} else {
Ok(AfterSequential::Distributed {
processor: self.processor,
files: distributed_files,
})
}
}
}

impl<P: LogReplayProcessor> Iterator for SequentialPhase<P> {
type Item = DeltaResult<P::Output>;

fn next(&mut self) -> Option<Self::Item> {
let next = self
.commit_phase
.next()
.or_else(|| self.checkpoint_manifest_phase.as_mut()?.next());

if next.is_none() {
self.is_finished = true;
}

next.map(|batch_res| {
batch_res.and_then(|batch| self.processor.process_actions_batch(batch))
})
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::scan::log_replay::ScanLogReplayProcessor;
use crate::scan::state_info::StateInfo;
use crate::utils::test_utils::{assert_result_error_with_message, load_test_table};
use std::sync::Arc;

/// Core helper function to verify sequential processing with expected adds and sidecars.
fn verify_sequential_processing(
table_name: &str,
expected_adds: &[&str],
expected_sidecars: &[&str],
) -> DeltaResult<()> {
let (engine, snapshot, _tempdir) = load_test_table(table_name)?;
let log_segment = snapshot.log_segment();

let state_info = Arc::new(StateInfo::try_new(
snapshot.schema(),
snapshot.table_configuration(),
None,
(),
)?);

let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?;
let mut sequential = SequentialPhase::try_new(processor, &log_segment, engine.clone())?;

// Process all batches and collect Add file paths
let mut file_paths = Vec::new();
for result in sequential.by_ref() {
let metadata = result?;
file_paths = metadata.visit_scan_files(
file_paths,
|ps: &mut Vec<String>, path, _, _, _, _, _| {
ps.push(path.to_string());
},
)?;
}

// Assert collected adds match expected
file_paths.sort();
assert_eq!(
file_paths, expected_adds,
"Sequential phase should collect expected Add file paths"
);

// Call finish() and verify result based on expected sidecars
let result = sequential.finish()?;
match (expected_sidecars, result) {
(sidecars, AfterSequential::Done(_)) => {
assert!(
sidecars.is_empty(),
"Expected Done but got sidecars {:?}",
sidecars
);
}
(expected_sidecars, AfterSequential::Distributed { files, .. }) => {
assert_eq!(
files.len(),
expected_sidecars.len(),
"Should collect exactly {} sidecar files",
expected_sidecars.len()
);

// Extract and verify sidecar file paths
let mut collected_paths = files
.iter()
.map(|fm| {
fm.location
.path_segments()
.and_then(|mut segments| segments.next_back())
.unwrap_or("")
.to_string()
})
.collect_vec();

collected_paths.sort();
assert_eq!(collected_paths, expected_sidecars);
}
}

Ok(())
}

#[test]
fn test_sequential_v2_with_commits_only() -> DeltaResult<()> {
verify_sequential_processing(
"table-without-dv-small",
&["part-00000-517f5d32-9c95-48e8-82b4-0229cc194867-c000.snappy.parquet"],
&[], // No sidecars
)
}

#[test]
fn test_sequential_v2_with_sidecars() -> DeltaResult<()> {
verify_sequential_processing(
"v2-checkpoints-json-with-sidecars",
&[], // No adds in sequential phase (all in checkpoint sidecars)
&[
"00000000000000000006.checkpoint.0000000001.0000000002.19af1366-a425-47f4-8fa6-8d6865625573.parquet",
"00000000000000000006.checkpoint.0000000002.0000000002.5008b69f-aa8a-4a66-9299-0733a56a7e63.parquet",
],
)
}

#[test]
fn test_sequential_finish_before_exhaustion_error() -> DeltaResult<()> {
let (engine, snapshot, _tempdir) = load_test_table("table-without-dv-small")?;
let log_segment = snapshot.log_segment();

let state_info = Arc::new(StateInfo::try_new(
snapshot.schema(),
snapshot.table_configuration(),
None,
(),
)?);

let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info)?;
let mut sequential = SequentialPhase::try_new(processor, &log_segment, engine.clone())?;

// Call next() once but don't exhaust the iterator
if let Some(result) = sequential.next() {
result?;
}

// Try to call finish() before exhausting the iterator
let result = sequential.finish();
assert_result_error_with_message(result, "Must exhaust iterator before calling finish()");

Ok(())
}

#[test]
fn test_sequential_checkpoint_without_sidecars() -> DeltaResult<()> {
verify_sequential_processing(
"v2-checkpoints-json-without-sidecars",
&[
// Adds from checkpoint manifest processed in sequential phase
"test%25file%25prefix-part-00000-0e32f92c-e232-4daa-b734-369d1a800502-c000.snappy.parquet",
"test%25file%25prefix-part-00000-91daf7c5-9ba0-4f76-aefd-0c3b21d33c6c-c000.snappy.parquet",
"test%25file%25prefix-part-00001-a5c41be1-ded0-4b18-a638-a927d233876e-c000.snappy.parquet",
],
&[], // No sidecars
)
}

#[test]
fn test_sequential_parquet_checkpoint_with_sidecars() -> DeltaResult<()> {
verify_sequential_processing(
"v2-checkpoints-parquet-with-sidecars",
&[], // No adds in sequential phase
&[
// Expected sidecars
"00000000000000000006.checkpoint.0000000001.0000000002.76931b15-ead3-480d-b86c-afe55a577fc3.parquet",
"00000000000000000006.checkpoint.0000000002.0000000002.4367b29c-0e87-447f-8e81-9814cc01ad1f.parquet",
],
)
}

#[test]
fn test_sequential_checkpoint_no_commits() -> DeltaResult<()> {
verify_sequential_processing(
"with_checkpoint_no_last_checkpoint",
&["part-00000-70b1dcdf-0236-4f63-a072-124cdbafd8a0-c000.snappy.parquet"], // Add from commit 3
&[], // No sidecars
)
}
}
2 changes: 2 additions & 0 deletions kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,13 @@ mod action_reconciliation;
pub mod actions;
pub mod checkpoint;
pub mod committer;
mod distributed;
pub mod engine_data;
pub mod error;
pub mod expressions;
mod log_compaction;
mod log_path;
mod log_reader;
pub mod metrics;
pub mod scan;
pub mod schema;
Expand Down
Loading
Loading