Skip to content

Commit 09cf641

Browse files
committed
serde first draft
1 parent b986734 commit 09cf641

File tree

6 files changed

+410
-44
lines changed

6 files changed

+410
-44
lines changed

kernel/src/distributed/driver.rs

Lines changed: 210 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
88
use std::sync::Arc;
99

10+
use delta_kernel_derive::internal_api;
11+
1012
use crate::actions::get_commit_schema;
1113
use crate::log_reader::commit::CommitReader;
1214
use crate::log_reader::manifest::{AfterManifest, ManifestPhase};
@@ -232,13 +234,12 @@ mod tests {
232234
use crate::scan::state_info::StateInfo;
233235
use object_store::local::LocalFileSystem;
234236
use std::path::PathBuf;
235-
use std::sync::Arc as StdArc;
236237

237238
fn load_test_table(
238239
table_name: &str,
239240
) -> DeltaResult<(
240-
StdArc<DefaultEngine<TokioBackgroundExecutor>>,
241-
StdArc<crate::Snapshot>,
241+
Arc<DefaultEngine<TokioBackgroundExecutor>>,
242+
Arc<crate::Snapshot>,
242243
url::Url,
243244
)> {
244245
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
@@ -251,8 +252,8 @@ mod tests {
251252
let url = url::Url::from_directory_path(path)
252253
.map_err(|_| crate::Error::Generic("Failed to create URL from path".to_string()))?;
253254

254-
let store = StdArc::new(LocalFileSystem::new());
255-
let engine = StdArc::new(DefaultEngine::new(store));
255+
let store = Arc::new(LocalFileSystem::new());
256+
let engine = Arc::new(DefaultEngine::new(store));
256257
let snapshot = crate::Snapshot::builder_for(url.clone()).build(engine.as_ref())?;
257258

258259
Ok((engine, snapshot, url))
@@ -261,9 +262,9 @@ mod tests {
261262
#[test]
262263
fn test_driver_v2_with_commits_only() -> DeltaResult<()> {
263264
let (engine, snapshot, _url) = load_test_table("table-without-dv-small")?;
264-
let log_segment = StdArc::new(snapshot.log_segment().clone());
265+
let log_segment = Arc::new(snapshot.log_segment().clone());
265266

266-
let state_info = StdArc::new(StateInfo::try_new(
267+
let state_info = Arc::new(StateInfo::try_new(
267268
snapshot.schema(),
268269
snapshot.table_configuration(),
269270
None,
@@ -319,9 +320,9 @@ mod tests {
319320
#[test]
320321
fn test_driver_v2_with_sidecars() -> DeltaResult<()> {
321322
let (engine, snapshot, _url) = load_test_table("v2-checkpoints-json-with-sidecars")?;
322-
let log_segment = StdArc::new(snapshot.log_segment().clone());
323+
let log_segment = Arc::new(snapshot.log_segment().clone());
323324

324-
let state_info = StdArc::new(StateInfo::try_new(
325+
let state_info = Arc::new(StateInfo::try_new(
325326
snapshot.schema(),
326327
snapshot.table_configuration(),
327328
None,
@@ -402,4 +403,204 @@ mod tests {
402403

403404
Ok(())
404405
}
406+
407+
#[test]
408+
fn test_distributed_scan_serialization() -> DeltaResult<()> {
409+
let (engine, snapshot, _url) = load_test_table("table-without-dv-small")?;
410+
let log_segment = Arc::new(snapshot.log_segment().clone());
411+
412+
let state_info = Arc::new(StateInfo::try_new(
413+
snapshot.schema(),
414+
snapshot.table_configuration(),
415+
None,
416+
(),
417+
)?);
418+
419+
let processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info.clone())?;
420+
421+
// Serialize the processor (takes ownership)
422+
let (serialized_state, deduplicator) = processor.serialize()?;
423+
424+
// Verify we can reconstruct the processor
425+
let reconstructed = ScanLogReplayProcessor::from_serialized(
426+
engine.as_ref(),
427+
serialized_state,
428+
deduplicator,
429+
)?;
430+
431+
// Verify schemas match (compare against original state_info)
432+
assert_eq!(
433+
state_info.logical_schema, reconstructed.state_info.logical_schema,
434+
"Logical schemas should match after serialization"
435+
);
436+
assert_eq!(
437+
state_info.physical_schema, reconstructed.state_info.physical_schema,
438+
"Physical schemas should match after serialization"
439+
);
440+
441+
// Verify transform spec matches
442+
match (
443+
&state_info.transform_spec,
444+
&reconstructed.state_info.transform_spec,
445+
) {
446+
(Some(original), Some(reconstructed)) => {
447+
assert_eq!(
448+
**original, **reconstructed,
449+
"Transform spec should be equal after serialization"
450+
);
451+
}
452+
(None, None) => {
453+
// Both None - correct
454+
}
455+
_ => panic!("Transform spec presence mismatch after serialization"),
456+
}
457+
458+
// Verify column mapping mode matches
459+
assert_eq!(
460+
state_info.column_mapping_mode, reconstructed.state_info.column_mapping_mode,
461+
"Column mapping mode should match after serialization"
462+
);
463+
464+
Ok(())
465+
}
466+
467+
#[test]
468+
fn test_distributed_scan_with_sidecars() -> DeltaResult<()> {
469+
let (engine, snapshot, _url) = load_test_table("v2-checkpoints-json-with-sidecars")?;
470+
471+
// Create a scan
472+
let scan = crate::scan::ScanBuilder::new(snapshot.clone()).build()?;
473+
474+
// Get distributed driver
475+
let mut driver = scan.scan_metadata_distributed(engine.clone())?;
476+
477+
let mut driver_batch_count = 0;
478+
let mut driver_file_paths = Vec::new();
479+
480+
// Process driver-side batches
481+
while let Some(result) = driver.next() {
482+
let metadata = result?;
483+
let paths = metadata.visit_scan_files(
484+
vec![],
485+
|ps: &mut Vec<String>, path, _, _, _, _, _| {
486+
ps.push(path.to_string());
487+
},
488+
)?;
489+
driver_file_paths.extend(paths);
490+
driver_batch_count += 1;
491+
}
492+
493+
// Driver should process commits but find no files (all in sidecars)
494+
assert_eq!(
495+
driver_file_paths.len(),
496+
0,
497+
"Driver should find 0 files (all adds are in checkpoint sidecars)"
498+
);
499+
500+
// Should have executor phase with sidecars
501+
let result = driver.finish()?;
502+
match result {
503+
crate::distributed::DriverPhaseResult::NeedsExecutorPhase { processor, files } => {
504+
assert_eq!(
505+
files.len(),
506+
2,
507+
"Should have exactly 2 sidecar files for distribution"
508+
);
509+
510+
// Serialize processor for distribution
511+
let (serialized_state, deduplicator) = processor.serialize()?;
512+
513+
// Verify the serialized state can be reconstructed
514+
let _reconstructed = ScanLogReplayProcessor::from_serialized(
515+
engine.as_ref(),
516+
serialized_state,
517+
deduplicator,
518+
)?;
519+
520+
// Verify sidecar file paths
521+
let mut collected_paths: Vec<String> = files
522+
.iter()
523+
.map(|fm| {
524+
fm.location
525+
.path_segments()
526+
.and_then(|segments| segments.last())
527+
.unwrap_or("")
528+
.to_string()
529+
})
530+
.collect();
531+
532+
collected_paths.sort();
533+
534+
assert_eq!(collected_paths[0], "00000000000000000006.checkpoint.0000000001.0000000002.19af1366-a425-47f4-8fa6-8d6865625573.parquet");
535+
assert_eq!(collected_paths[1], "00000000000000000006.checkpoint.0000000002.0000000002.5008b69f-aa8a-4a66-9299-0733a56a7e63.parquet");
536+
}
537+
crate::distributed::DriverPhaseResult::Complete(_processor) => {
538+
panic!("Expected NeedsExecutorPhase for table with sidecars");
539+
}
540+
}
541+
542+
Ok(())
543+
}
544+
545+
#[test]
546+
fn test_deduplicator_state_preserved() -> DeltaResult<()> {
547+
let (engine, snapshot, _url) = load_test_table("v2-checkpoints-json-with-sidecars")?;
548+
let log_segment = Arc::new(snapshot.log_segment().clone());
549+
550+
let state_info = Arc::new(StateInfo::try_new(
551+
snapshot.schema(),
552+
snapshot.table_configuration(),
553+
None,
554+
(),
555+
)?);
556+
557+
let mut processor = ScanLogReplayProcessor::new(engine.as_ref(), state_info.clone())?;
558+
559+
// Process some actions to populate the deduplicator
560+
let mut driver = DriverPhase::try_new(processor, log_segment, engine.clone())?;
561+
562+
// Process all driver batches
563+
while let Some(_) = driver.next() {}
564+
565+
let result = driver.finish()?;
566+
let processor = match result {
567+
crate::distributed::DriverPhaseResult::Complete(p) => p,
568+
crate::distributed::DriverPhaseResult::NeedsExecutorPhase { processor, .. } => {
569+
processor
570+
}
571+
};
572+
573+
let initial_dedup_count = processor.seen_file_keys.len();
574+
575+
// Serialize and reconstruct (serialize takes ownership)
576+
let (serialized_state, deduplicator) = processor.serialize()?;
577+
assert_eq!(
578+
deduplicator.len(),
579+
initial_dedup_count,
580+
"Deduplicator size should be preserved during serialization"
581+
);
582+
583+
let reconstructed = ScanLogReplayProcessor::from_serialized(
584+
engine.as_ref(),
585+
serialized_state,
586+
deduplicator.clone(),
587+
)?;
588+
589+
assert_eq!(
590+
reconstructed.seen_file_keys.len(),
591+
initial_dedup_count,
592+
"Reconstructed processor should have same deduplicator size"
593+
);
594+
595+
// Verify the deduplicator contents match (compare against returned deduplicator)
596+
for key in &deduplicator {
597+
assert!(
598+
reconstructed.seen_file_keys.contains(key),
599+
"Reconstructed deduplicator should contain key: {:?}",
600+
key
601+
);
602+
}
603+
604+
Ok(())
605+
}
405606
}

kernel/src/distributed/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
pub(crate) mod driver;
1+
pub(crate) mod driver;
2+
3+
pub(crate) use driver::{DriverPhase, DriverPhaseResult};

kernel/src/log_replay.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use tracing::debug;
2626

2727
/// The subset of file action fields that uniquely identifies it in the log, used for deduplication
2828
/// of adds and removes during log replay.
29-
#[derive(Debug, Hash, Eq, PartialEq, Clone)]
29+
#[derive(Debug, Hash, Eq, PartialEq, serde::Serialize, serde::Deserialize, Clone)]
3030
pub(crate) struct FileActionKey {
3131
pub(crate) path: String,
3232
pub(crate) dv_unique_id: Option<String>,

0 commit comments

Comments
 (0)