Skip to content

Commit 953944a

Browse files
committed
serde first draft
1 parent f8978c6 commit 953944a

File tree

4 files changed

+197
-34
lines changed

4 files changed

+197
-34
lines changed

kernel/src/log_replay.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use tracing::debug;
2626

2727
/// The subset of file action fields that uniquely identifies it in the log, used for deduplication
2828
/// of adds and removes during log replay.
29-
#[derive(Debug, Hash, Eq, PartialEq, Clone)]
29+
#[derive(Debug, Hash, Eq, PartialEq, serde::Serialize, serde::Deserialize, Clone)]
3030
pub(crate) struct FileActionKey {
3131
pub(crate) path: String,
3232
pub(crate) dv_unique_id: Option<String>,

kernel/src/scan/log_replay.rs

Lines changed: 111 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,34 @@ use crate::expressions::{column_name, ColumnName, Expression, ExpressionRef, Pre
1212
use crate::kernel_predicates::{DefaultKernelPredicateEvaluator, KernelPredicateEvaluator as _};
1313
use crate::log_replay::{ActionsBatch, FileActionDeduplicator, FileActionKey, LogReplayProcessor};
1414
use crate::scan::Scalar;
15-
use crate::schema::ToSchema as _;
15+
use crate::schema::{SchemaRef, ToSchema as _};
1616
use crate::schema::{ColumnNamesAndTypes, DataType, MapType, StructField, StructType};
17-
use crate::transforms::{get_transform_expr, parse_partition_values};
17+
use crate::table_features::ColumnMappingMode;
18+
use crate::transforms::{get_transform_expr, parse_partition_values, TransformSpec};
1819
use crate::utils::require;
1920
use crate::{DeltaResult, Engine, Error, ExpressionEvaluator};
2021

22+
/// Internal serializable state (schemas, transform spec, column mapping, etc.)
23+
/// This is opaque to the engine - just passed through as a blob.
24+
#[derive(serde::Serialize, serde::Deserialize, Clone)]
25+
struct InternalState {
26+
logical_schema: StructType,
27+
physical_schema: StructType,
28+
transform_spec: Option<TransformSpec>,
29+
column_mapping_mode: ColumnMappingMode,
30+
}
31+
32+
/// Public-facing serialized processor state for distribution to executors.
33+
/// The engine passes everything needed as Arc references, plus an opaque internal state blob.
34+
#[derive(Clone)]
35+
pub struct SerializedScanState {
36+
/// Optional predicate for data skipping (if provided)
37+
pub predicate: Option<PredicateRef>,
38+
/// Opaque internal state blob (JSON for now)
39+
pub internal_state_blob: Vec<u8>,
40+
}
41+
42+
2143
/// [`ScanLogReplayProcessor`] performs log replay (processes actions) specifically for doing a table scan.
2244
///
2345
/// During a table scan, the processor reads batches of log actions (in reverse chronological order)
@@ -44,11 +66,11 @@ pub(crate) struct ScanLogReplayProcessor {
4466
partition_filter: Option<PredicateRef>,
4567
data_skipping_filter: Option<DataSkippingFilter>,
4668
add_transform: Arc<dyn ExpressionEvaluator>,
47-
state_info: Arc<StateInfo>,
69+
pub(crate) state_info: Arc<StateInfo>,
4870
/// A set of (data file path, dv_unique_id) pairs that have been seen thus
4971
/// far in the log. This is used to filter out files with Remove actions as
5072
/// well as duplicate entries in the log.
51-
seen_file_keys: HashSet<FileActionKey>,
73+
pub(crate) seen_file_keys: HashSet<FileActionKey>,
5274
}
5375

5476
impl ScanLogReplayProcessor {
@@ -84,6 +106,91 @@ impl ScanLogReplayProcessor {
84106
state_info,
85107
})
86108
}
109+
110+
/// Serialize the processor state for distribution to executors.
111+
///
112+
/// Consumes the processor and returns:
113+
/// - `SerializedScanState`: Public-facing state with predicate and internal blob
114+
/// - `HashSet<FileActionKey>`: The deduplication set (moved for independent use on executors)
115+
///
116+
/// Executors can use `from_serialized` to reconstruct the processor with this state.
117+
pub(crate) fn serialize(self) -> DeltaResult<(SerializedScanState, HashSet<FileActionKey>)> {
118+
// Serialize internal state to JSON blob (schemas, transform spec, and column mapping mode)
119+
let internal_state = InternalState {
120+
logical_schema: (*self.state_info.logical_schema).clone(),
121+
physical_schema: (*self.state_info.physical_schema).clone(),
122+
transform_spec: self.state_info.transform_spec.as_ref().map(|ts| (**ts).clone()),
123+
column_mapping_mode: self.state_info.column_mapping_mode,
124+
};
125+
let internal_state_blob = serde_json::to_vec(&internal_state)
126+
.map_err(|e| Error::generic(format!("Failed to serialize internal state: {}", e)))?;
127+
128+
// Extract predicate from PhysicalPredicate
129+
let predicate = match &self.state_info.physical_predicate {
130+
PhysicalPredicate::Some(pred, _schema) => Some(pred.clone()),
131+
_ => None,
132+
};
133+
134+
let state = SerializedScanState {
135+
predicate,
136+
internal_state_blob,
137+
};
138+
139+
Ok((state, self.seen_file_keys))
140+
}
141+
142+
/// Reconstruct a processor from serialized state.
143+
///
144+
/// Creates a new processor with the provided state and seen_file_keys.
145+
/// All other fields (partition_filter, data_skipping_filter, add_transform) are
146+
/// reconstructed from the state and engine.
147+
///
148+
/// # Parameters
149+
/// - `engine`: Engine for creating evaluators and filters
150+
/// - `state`: The serialized state from serialization
151+
/// - `seen_file_keys`: The deduplication set from serialization
152+
pub(crate) fn from_serialized(
153+
engine: &dyn Engine,
154+
state: SerializedScanState,
155+
seen_file_keys: HashSet<FileActionKey>,
156+
) -> DeltaResult<Self> {
157+
// Deserialize internal state
158+
let internal_state: InternalState = serde_json::from_slice(&state.internal_state_blob)
159+
.map_err(|e| Error::generic(format!("Failed to deserialize internal state: {}", e)))?;
160+
161+
// Convert schemas to Arc
162+
let logical_schema = Arc::new(internal_state.logical_schema);
163+
let physical_schema = Arc::new(internal_state.physical_schema);
164+
165+
// Reconstruct PhysicalPredicate from predicate and schema
166+
let physical_predicate = match state.predicate {
167+
Some(pred) => PhysicalPredicate::Some(pred, physical_schema.clone()),
168+
None => PhysicalPredicate::None,
169+
};
170+
171+
// Reconstruct StateInfo
172+
let state_info = Arc::new(StateInfo {
173+
logical_schema,
174+
physical_schema,
175+
physical_predicate,
176+
transform_spec: internal_state.transform_spec.map(Arc::new),
177+
column_mapping_mode: internal_state.column_mapping_mode,
178+
});
179+
180+
// Create processor and set seen_file_keys
181+
let mut processor = Self::new(engine, state_info)?;
182+
processor.seen_file_keys = seen_file_keys;
183+
Ok(processor)
184+
}
185+
186+
/// Get the projected schema needed to read checkpoint/sidecar files.
187+
///
188+
/// Returns the schema that should be used when reading leaf checkpoint files
189+
/// or sidecars during the executor phase.
190+
pub(crate) fn get_projected_schema(&self) -> DeltaResult<crate::schema::SchemaRef> {
191+
use crate::actions::ADD_NAME;
192+
get_log_add_schema().project(&[ADD_NAME])
193+
}
87194
}
88195

89196
/// A visitor that deduplicates a stream of add and remove actions into a stream of valid adds. Log

kernel/src/scan/mod.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,27 @@ pub(crate) enum PhysicalPredicate {
146146
None,
147147
}
148148

149+
/// Serializable representation of PhysicalPredicate (skipping the predicate expression itself)
150+
#[derive(serde::Serialize, serde::Deserialize, Clone)]
151+
enum SerializablePhysicalPredicate {
152+
/// Has a predicate and schema (but we skip serializing the predicate itself)
153+
Some { schema: crate::schema::StructType },
154+
StaticSkipAll,
155+
None,
156+
}
157+
158+
impl From<&PhysicalPredicate> for SerializablePhysicalPredicate {
159+
fn from(pred: &PhysicalPredicate) -> Self {
160+
match pred {
161+
PhysicalPredicate::Some(_pred, schema) => SerializablePhysicalPredicate::Some {
162+
schema: (**schema).clone(),
163+
},
164+
PhysicalPredicate::StaticSkipAll => SerializablePhysicalPredicate::StaticSkipAll,
165+
PhysicalPredicate::None => SerializablePhysicalPredicate::None,
166+
}
167+
}
168+
}
169+
149170
impl PhysicalPredicate {
150171
/// If we have a predicate, verify the columns it references and apply column mapping. First, get
151172
/// the set of references; use that to filter the schema to only the columns of interest (and
@@ -404,6 +425,68 @@ impl Scan {
404425
self.scan_metadata_inner(engine, self.replay_for_scan_metadata(engine)?)
405426
}
406427

428+
/// Get a distributed driver phase for processing scan metadata.
429+
///
430+
/// This method returns a [`DriverPhase`] that can be used for distributed execution.
431+
/// The driver processes commits and single-part checkpoint manifests, then returns
432+
/// the processor state and files that need to be distributed to executors.
433+
///
434+
/// # Distributed Execution Pattern
435+
///
436+
/// 1. **Driver phase**: Process commits and manifests on the coordinator
437+
/// 2. **Serialize state**: Extract processor state and deduplicator
438+
/// 3. **Distribute**: Send state + files to executors
439+
/// 4. **Executor phase**: Each executor processes its assigned files
440+
///
441+
/// # Example
442+
///
443+
/// ```ignore
444+
/// // On driver/coordinator
445+
/// let mut driver = scan.scan_metadata_distributed(engine.clone())?;
446+
///
447+
/// // Process driver-side batches
448+
/// for batch in driver.by_ref() {
449+
/// let scan_metadata = batch?;
450+
/// // Handle scan metadata from commits/manifest
451+
/// }
452+
///
453+
/// // Check if executor phase is needed
454+
/// match driver.finish()? {
455+
/// DriverPhaseResult::Complete(_processor) => {
456+
/// // All done! No executor phase needed
457+
/// }
458+
/// DriverPhaseResult::NeedsExecutorPhase { processor, files } => {
459+
/// // Serialize for distribution
460+
/// let (state, deduplicator) = processor.serialize()?;
461+
///
462+
/// // Partition files and distribute to executors
463+
/// for (executor, file_partition) in partition(files) {
464+
/// executor.send(state.clone(), deduplicator.clone(), file_partition);
465+
/// }
466+
/// }
467+
/// }
468+
/// ```
469+
///
470+
/// [`DriverPhase`]: crate::distributed::DriverPhase
471+
pub(crate) fn scan_metadata_distributed(
472+
&self,
473+
engine: Arc<dyn Engine>,
474+
) -> DeltaResult<crate::distributed::DriverPhase<log_replay::ScanLogReplayProcessor>> {
475+
use crate::distributed::DriverPhase;
476+
477+
// Create the processor
478+
let processor = log_replay::ScanLogReplayProcessor::new(
479+
engine.as_ref(),
480+
self.state_info.clone(),
481+
)?;
482+
483+
// Get log segment
484+
let log_segment = Arc::new(self.snapshot.log_segment().clone());
485+
486+
// Create and return driver phase
487+
DriverPhase::try_new(processor, log_segment, engine)
488+
}
489+
407490
/// Get an updated iterator of [`ScanMetadata`]s based on an existing iterator of [`EngineData`]s.
408491
///
409492
/// The existing iterator is assumed to contain data from a previous call to `scan_metadata`.

kernel/src/transforms.rs

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,8 @@ pub(crate) type TransformSpec = Vec<FieldTransformSpec>;
2323
///
2424
/// These transformations are "sparse" - they only specify what changes, while unchanged fields
2525
/// pass through implicitly in their original order.
26-
#[derive(Debug)]
26+
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
2727
pub(crate) enum FieldTransformSpec {
28-
/// Insert the given expression after the named input column (None = prepend instead)
29-
// NOTE: It's quite likely we will sometimes need to reorder columns for one reason or another,
30-
// which would usually be expressed as a drop+insert pair of transforms.
31-
#[allow(unused)]
32-
StaticInsert {
33-
insert_after: Option<String>,
34-
expr: ExpressionRef,
35-
},
3628
/// Drops the named input column
3729
// NOTE: Row tracking needs to drop metadata columns that were used to compute rowids, since
3830
// they should not appear in the query's output.
@@ -102,7 +94,6 @@ pub(crate) fn parse_partition_values(
10294
))
10395
}
10496
FieldTransformSpec::DynamicColumn { .. }
105-
| FieldTransformSpec::StaticInsert { .. }
10697
| FieldTransformSpec::GenerateRowId { .. }
10798
| FieldTransformSpec::StaticDrop { .. } => None,
10899
})
@@ -125,9 +116,6 @@ pub(crate) fn get_transform_expr(
125116
for field_transform in transform_spec {
126117
use FieldTransformSpec::*;
127118
transform = match field_transform {
128-
StaticInsert { insert_after, expr } => {
129-
transform.with_inserted_field(insert_after.clone(), expr.clone())
130-
}
131119
StaticDrop { field_name } => transform.with_dropped_field(field_name.clone()),
132120
GenerateRowId {
133121
field_name,
@@ -363,13 +351,8 @@ mod tests {
363351
}
364352

365353
#[test]
366-
fn test_get_transform_expr_static_transforms() {
367-
let expr = Arc::new(Expression::literal(42));
354+
fn test_get_transform_expr_static_drop() {
368355
let transform_spec = vec![
369-
FieldTransformSpec::StaticInsert {
370-
insert_after: Some("col1".to_string()),
371-
expr: expr.clone(),
372-
},
373356
FieldTransformSpec::StaticDrop {
374357
field_name: "col2".to_string(),
375358
},
@@ -393,16 +376,6 @@ mod tests {
393376
panic!("Expected Transform expression");
394377
};
395378

396-
// Verify StaticInsert: should insert after col1
397-
assert!(transform.field_transforms.contains_key("col1"));
398-
assert!(!transform.field_transforms["col1"].is_replace);
399-
assert_eq!(transform.field_transforms["col1"].exprs.len(), 1);
400-
let Expression::Literal(scalar) = transform.field_transforms["col1"].exprs[0].as_ref()
401-
else {
402-
panic!("Expected literal expression for insert");
403-
};
404-
assert_eq!(scalar, &Scalar::Integer(42));
405-
406379
// Verify StaticDrop: should drop col2 (empty expressions and is_replace = true)
407380
assert!(transform.field_transforms.contains_key("col2"));
408381
assert!(transform.field_transforms["col2"].is_replace);

0 commit comments

Comments
 (0)