Skip to content

Commit 327401f

Browse files
committed
finished validate_no_new_delete_files_for_data_files
1 parent 5c4a140 commit 327401f

File tree

4 files changed

+97
-37
lines changed

4 files changed

+97
-37
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/iceberg/src/transaction/rewrite_files.rs

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use super::{ActionCommit, TransactionAction};
2626
use crate::error::{Error, ErrorKind, Result};
2727
use crate::spec::{
2828
DataContentType, DataFile, ManifestContentType, ManifestEntry, ManifestEntryRef, ManifestFile,
29-
ManifestStatus, Operation, SnapshotRef,
29+
ManifestStatus, Operation,
3030
};
3131
use crate::table::Table;
3232
use crate::transaction::validate::SnapshotValidator;
@@ -40,7 +40,7 @@ pub struct RewriteFilesAction {
4040
added_delete_files: Vec<DataFile>,
4141
deleted_data_files: Vec<DataFile>,
4242
deleted_delete_files: Vec<DataFile>,
43-
starting_sequence_number: Option<i64>,
43+
data_sequence_number: Option<i64>,
4444
starting_snapshot_id: Option<i64>,
4545
}
4646

@@ -50,6 +50,7 @@ pub struct RewriteFilesOperation {
5050
deleted_data_files: Vec<DataFile>,
5151
deleted_delete_files: Vec<DataFile>,
5252
starting_snapshot_id: Option<i64>,
53+
data_sequence_number: Option<i64>,
5354
}
5455

5556
impl RewriteFilesAction {
@@ -62,7 +63,7 @@ impl RewriteFilesAction {
6263
added_delete_files: vec![],
6364
deleted_data_files: vec![],
6465
deleted_delete_files: vec![],
65-
starting_sequence_number: None,
66+
data_sequence_number: None,
6667
starting_snapshot_id: None,
6768
}
6869
}
@@ -120,8 +121,8 @@ impl RewriteFilesAction {
120121

121122
/// Set the data sequence number for this rewrite operation.
122123
/// The number will be used for all new data files that are added in this rewrite.
123-
pub fn set_starting_sequence_number(mut self, sequence_number: i64) -> Self {
124-
self.starting_sequence_number = Some(sequence_number);
124+
pub fn set_data_sequence_number(mut self, sequence_number: i64) -> Self {
125+
self.data_sequence_number = Some(sequence_number);
125126
self
126127
}
127128

@@ -150,7 +151,7 @@ impl TransactionAction for RewriteFilesAction {
150151
self.added_delete_files.clone(),
151152
self.deleted_data_files.clone(),
152153
self.deleted_delete_files.clone(),
153-
self.starting_sequence_number.clone(),
154+
self.data_sequence_number.clone(),
154155
);
155156

156157
let rewrite_operation = RewriteFilesOperation {
@@ -159,17 +160,18 @@ impl TransactionAction for RewriteFilesAction {
159160
deleted_data_files: self.deleted_data_files.clone(),
160161
deleted_delete_files: self.deleted_delete_files.clone(),
161162
starting_snapshot_id: self.starting_snapshot_id.clone(),
163+
data_sequence_number: self.data_sequence_number.clone(),
162164
};
163165

164-
// todo should be able to configure merge manifest process
166+
// todo should be able to configure to use the merge manifest process
165167
snapshot_producer
166168
.commit(rewrite_operation, DefaultManifestProcess)
167169
.await
168170
}
169171
}
170172

171173
fn copy_with_deleted_status(entry: &ManifestEntryRef) -> Result<ManifestEntry> {
172-
// todo should we fail on missing properties?
174+
// todo should we fail on missing properties or should we ignore them when they don't exist?
173175
let builder = ManifestEntry::builder()
174176
.status(ManifestStatus::Deleted)
175177
.snapshot_id(entry.snapshot_id().ok_or_else(|| {
@@ -197,7 +199,7 @@ fn copy_with_deleted_status(entry: &ManifestEntryRef) -> Result<ManifestEntry> {
197199
}
198200

199201
impl SnapshotValidator for RewriteFilesOperation {
200-
fn validate(&self, _table: &Table, _snapshot: Option<&SnapshotRef>) -> Result<()> {
202+
async fn validate(&self, base: &Table, parent_snapshot_id: Option<i64>) -> Result<()> {
201203
// Validate replaced and added files
202204
if self.deleted_data_files.is_empty() && self.deleted_delete_files.is_empty() {
203205
return Err(Error::new(
@@ -218,9 +220,18 @@ impl SnapshotValidator for RewriteFilesOperation {
218220
));
219221
}
220222

221-
// todo add use_starting_seq_number to help the validation
222-
// todo validate no new deletes since the current base
223-
// if there are replaced data files, there cannot be any new row-level deletes for those data files
223+
// todo add use_starting_seq_number to determine if we want to use data_sequence_number
224+
// If there are replaced data files, there cannot be any new row-level deletes for those data files
225+
if !self.deleted_data_files.is_empty() {
226+
self.validate_no_new_delete_files_for_data_files(
227+
base,
228+
self.starting_snapshot_id,
229+
parent_snapshot_id,
230+
&self.deleted_data_files,
231+
self.data_sequence_number.is_some(),
232+
)
233+
.await?;
234+
}
224235

225236
Ok(())
226237
}

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,8 @@ impl<'a> SnapshotProducer<'a> {
476476
) -> Result<ActionCommit> {
477477
// Validate to avoid conflicts
478478
snapshot_produce_operation
479-
.validate(self.table, self.table.metadata().current_snapshot())?;
479+
.validate(self.table, self.table.metadata().current_snapshot_id)
480+
.await?;
480481

481482
let new_manifests = self
482483
.manifest_file(&snapshot_produce_operation, &process)

crates/iceberg/src/transaction/validate.rs

Lines changed: 72 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@
1818
use std::collections::HashSet;
1919
use std::sync::Arc;
2020

21+
use futures::SinkExt;
2122
use futures::future::try_join_all;
22-
use futures::{Sink, SinkExt};
2323
use once_cell::sync::Lazy;
2424

2525
use crate::delete_file_index::DeleteFileIndex;
2626
use crate::error::Result;
2727
use crate::scan::DeleteFileContext;
28-
use crate::spec::{DataFile, ManifestContentType, ManifestFile, Operation, SnapshotRef};
28+
use crate::spec::{
29+
DataContentType, DataFile, FormatVersion, INITIAL_SEQUENCE_NUMBER, ManifestContentType,
30+
ManifestFile, Operation, SnapshotRef,
31+
};
2932
use crate::table::Table;
3033
use crate::util::snapshot::ancestors_between;
3134
use crate::{Error, ErrorKind};
@@ -39,7 +42,7 @@ pub(crate) trait SnapshotValidator {
3942
// snapshot: parent snapshot
4043
// usually snapshot is the latest snapshot of base table, unless it's non-main branch
4144
// but we don't support writing to branches as of now
42-
fn validate(&self, _table: &Table, _snapshot: Option<&SnapshotRef>) -> Result<()> {
45+
async fn validate(&self, _base: &Table, _parent_snapshot_id: Option<i64>) -> Result<()> {
4346
// todo: add default implementation
4447
Ok(())
4548
}
@@ -48,8 +51,8 @@ pub(crate) trait SnapshotValidator {
4851
async fn validation_history(
4952
&self,
5053
base: &Table,
51-
to_snapshot: SnapshotRef, // todo maybe the naming/variable order can be better, or just snapshot id is better? this is parent
5254
from_snapshot_id: Option<i64>,
55+
to_snapshot_id: i64,
5356
matching_operations: &HashSet<Operation>,
5457
manifest_content_type: ManifestContentType,
5558
) -> Result<(Vec<ManifestFile>, HashSet<i64>)> {
@@ -59,7 +62,7 @@ pub(crate) trait SnapshotValidator {
5962

6063
let snapshots = ancestors_between(
6164
&Arc::new(base.metadata().clone()),
62-
to_snapshot.snapshot_id(),
65+
to_snapshot_id,
6366
from_snapshot_id.clone(),
6467
);
6568

@@ -101,49 +104,95 @@ pub(crate) trait SnapshotValidator {
101104
Ok((manifests, new_snapshots))
102105
}
103106

104-
#[allow(dead_code)]
105107
async fn validate_no_new_delete_files_for_data_files(
106108
&self,
107109
base: &Table,
108110
from_snapshot_id: Option<i64>,
109-
_data_files: &[DataFile],
110-
to_snapshot: SnapshotRef,
111+
to_snapshot_id: Option<i64>,
112+
data_files: &[DataFile],
113+
ignore_equality_deletes: bool,
111114
) -> Result<()> {
115+
// If there is no current table state, no files have been added
116+
if to_snapshot_id.is_none() || base.metadata().format_version() != FormatVersion::V1 {
117+
return Ok(());
118+
}
119+
let to_snapshot_id = to_snapshot_id.unwrap();
120+
112121
// Get matching delete files have been added since the from_snapshot_id
113-
let (delete_manifests, snapshot_ids) = self
122+
let (delete_manifests, _) = self
114123
.validation_history(
115124
base,
116-
to_snapshot,
117125
from_snapshot_id,
126+
to_snapshot_id,
118127
&VALIDATE_ADDED_DELETE_FILES_OPERATIONS,
119128
ManifestContentType::Deletes,
120129
)
121130
.await?;
122131

123-
// Building delete file index
124-
let (_delete_file_index, mut delete_file_tx) = DeleteFileIndex::new();
132+
// Build delete file index
133+
let (delete_file_index, mut delete_file_tx) = DeleteFileIndex::new();
125134
let manifests = try_join_all(
126135
delete_manifests
127136
.iter()
128137
.map(|f| f.load_manifest(base.file_io()))
129138
.collect::<Vec<_>>(),
130139
)
131140
.await?;
132-
133-
let delete_files_ctx = manifests
134-
.iter()
135-
.flat_map(|manifest| manifest.entries())
136-
.map(|entry| DeleteFileContext {
141+
let manifest_entries = manifests.iter().flat_map(|manifest| manifest.entries());
142+
for entry in manifest_entries {
143+
let delete_file_ctx = DeleteFileContext {
137144
manifest_entry: entry.clone(),
138145
partition_spec_id: entry.data_file().partition_spec_id,
139-
})
140-
.collect::<Vec<_>>();
141-
142-
for ctx in delete_files_ctx {
143-
delete_file_tx.send(ctx).await?
146+
};
147+
delete_file_tx.send(delete_file_ctx).await?;
144148
}
145149

146-
// todo validate if there are deletes
150+
// Get starting seq num from starting snapshot if available
151+
let starting_sequence_number = if from_snapshot_id.is_some()
152+
&& base
153+
.metadata()
154+
.snapshots
155+
.get(&from_snapshot_id.unwrap())
156+
.is_some()
157+
{
158+
base.metadata()
159+
.snapshots
160+
.get(&from_snapshot_id.unwrap())
161+
.unwrap()
162+
.sequence_number()
163+
} else {
164+
INITIAL_SEQUENCE_NUMBER
165+
};
166+
167+
// Validate if there are deletes using delete file index
168+
for data_file in data_files {
169+
let delete_files = delete_file_index
170+
.get_deletes_for_data_file(data_file, Some(starting_sequence_number))
171+
.await;
172+
173+
if ignore_equality_deletes {
174+
if delete_files
175+
.iter()
176+
.any(|delete_file| delete_file.file_type == DataContentType::PositionDeletes)
177+
{
178+
return Err(Error::new(
179+
ErrorKind::DataInvalid,
180+
format!(
181+
"Cannot commit, found new positional delete for added data file: {}",
182+
data_file.file_path
183+
),
184+
));
185+
}
186+
} else if !delete_files.is_empty() {
187+
return Err(Error::new(
188+
ErrorKind::DataInvalid,
189+
format!(
190+
"Cannot commit, found new delete for added data file: {}",
191+
data_file.file_path
192+
),
193+
));
194+
}
195+
}
147196

148197
Ok(())
149198
}

0 commit comments

Comments
 (0)