Skip to content

Commit e85bcfb

Browse files
committed
continue on validation, added validate_no_new_delete_files_for_data_files
1 parent 67c36e6 commit e85bcfb

File tree

5 files changed

+144
-10
lines changed

5 files changed

+144
-10
lines changed

crates/iceberg/src/spec/snapshot.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub const UNASSIGNED_SNAPSHOT_ID: i64 = -1;
3838

3939
/// Reference to [`Snapshot`].
4040
pub type SnapshotRef = Arc<Snapshot>;
41-
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
41+
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Hash)]
4242
#[serde(rename_all = "lowercase")]
4343
/// The operation field is used by some operations, like snapshot expiration, to skip processing certain snapshots.
4444
pub enum Operation {

crates/iceberg/src/transaction/append.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ impl TransactionAction for FastAppendAction {
9494
vec![],
9595
vec![],
9696
vec![],
97+
None,
9798
);
9899

99100
// validate added files

crates/iceberg/src/transaction/rewrite_files.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,16 @@ pub struct RewriteFilesAction {
4040
added_delete_files: Vec<DataFile>,
4141
deleted_data_files: Vec<DataFile>,
4242
deleted_delete_files: Vec<DataFile>,
43+
starting_sequence_number: Option<i64>,
44+
starting_snapshot_id: Option<i64>,
4345
}
4446

4547
pub struct RewriteFilesOperation {
4648
added_data_files: Vec<DataFile>,
4749
added_delete_files: Vec<DataFile>,
4850
deleted_data_files: Vec<DataFile>,
4951
deleted_delete_files: Vec<DataFile>,
52+
starting_snapshot_id: Option<i64>,
5053
}
5154

5255
impl RewriteFilesAction {
@@ -59,6 +62,8 @@ impl RewriteFilesAction {
5962
added_delete_files: vec![],
6063
deleted_data_files: vec![],
6164
deleted_delete_files: vec![],
65+
starting_sequence_number: None,
66+
starting_snapshot_id: None,
6267
}
6368
}
6469

@@ -112,6 +117,19 @@ impl RewriteFilesAction {
112117
self.snapshot_properties = snapshot_properties;
113118
self
114119
}
120+
121+
/// Set the data sequence number for this rewrite operation.
122+
/// The number will be used for all new data files that are added in this rewrite.
123+
pub fn set_starting_sequence_number(mut self, sequence_number: i64) -> Self {
124+
self.starting_sequence_number = Some(sequence_number);
125+
self
126+
}
127+
128+
/// Set the snapshot ID used in any reads for this operation.
129+
pub fn set_starting_snapshot_id(mut self, snapshot_id: i64) -> Self {
130+
self.starting_snapshot_id = Some(snapshot_id);
131+
self
132+
}
115133
}
116134

117135
impl Default for RewriteFilesAction {
@@ -132,13 +150,15 @@ impl TransactionAction for RewriteFilesAction {
132150
self.added_delete_files.clone(),
133151
self.deleted_data_files.clone(),
134152
self.deleted_delete_files.clone(),
153+
self.starting_sequence_number.clone(),
135154
);
136155

137156
let rewrite_operation = RewriteFilesOperation {
138157
added_data_files: self.added_data_files.clone(),
139158
added_delete_files: self.added_delete_files.clone(),
140159
deleted_data_files: self.deleted_data_files.clone(),
141160
deleted_delete_files: self.deleted_delete_files.clone(),
161+
starting_snapshot_id: self.starting_snapshot_id.clone(),
142162
};
143163

144164
// todo should be able to configure merge manifest process

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,11 @@ pub(crate) struct SnapshotProducer<'a> {
8282
// It starts from 0 and increments for each new manifest file.
8383
// Note: This counter is limited to the range of (0..u64::MAX).
8484
manifest_counter: RangeFrom<u64>,
85+
starting_sequence_number: Option<i64>,
8586
}
8687

8788
impl<'a> SnapshotProducer<'a> {
89+
// todo add a builder for this to fix the clippy
8890
#[allow(clippy::too_many_arguments)]
8991
pub(crate) fn new(
9092
table: &'a Table,
@@ -95,6 +97,7 @@ impl<'a> SnapshotProducer<'a> {
9597
added_delete_files: Vec<DataFile>,
9698
deleted_data_files: Vec<DataFile>,
9799
deleted_delete_files: Vec<DataFile>,
100+
starting_sequence_number: Option<i64>,
98101
) -> Self {
99102
Self {
100103
table,
@@ -107,6 +110,7 @@ impl<'a> SnapshotProducer<'a> {
107110
deleted_data_files,
108111
deleted_delete_files,
109112
manifest_counter: (0..),
113+
starting_sequence_number,
110114
}
111115
}
112116

crates/iceberg/src/transaction/validate.rs

Lines changed: 118 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,136 @@
1616
// under the License.
1717

1818
use std::collections::HashSet;
19+
use std::sync::Arc;
1920

21+
use futures::{Sink, SinkExt};
22+
use futures::future::try_join_all;
23+
use once_cell::sync::Lazy;
24+
25+
use crate::delete_file_index::DeleteFileIndex;
2026
use crate::error::Result;
21-
use crate::spec::{ManifestContentType, ManifestFile, Operation, SnapshotRef};
27+
use crate::scan::DeleteFileContext;
28+
use crate::spec::{
29+
DataFile, ManifestContentType, ManifestFile, Operation, SnapshotRef,
30+
};
2231
use crate::table::Table;
32+
use crate::util::snapshot::ancestors_between;
33+
use crate::{Error, ErrorKind};
34+
35+
static VALIDATE_ADDED_DELETE_FILES_OPERATIONS: Lazy<HashSet<Operation>> =
36+
Lazy::new(|| HashSet::from([Operation::Overwrite, Operation::Delete]));
2337

2438
pub(crate) trait SnapshotValidator {
39+
// todo doc
40+
// table: base table
41+
// snapshot: parent snapshot
42+
// usually snapshot is the latest snapshot of base table, unless it's non-main branch
43+
// but we don't support writing to branches as of now
2544
fn validate(&self, _table: &Table, _snapshot: Option<&SnapshotRef>) -> Result<()> {
2645
// todo: add default implementation
2746
Ok(())
2847
}
2948

30-
#[allow(dead_code)]
49+
// todo doc
3150
async fn validation_history(
3251
&self,
33-
_base: &Table,
34-
_from_snapshot: Option<&SnapshotRef>,
35-
_to_snapshot: &SnapshotRef,
36-
_matching_operations: HashSet<Operation>,
37-
_manifest_content_type: ManifestContentType,
52+
base: &Table,
53+
to_snapshot: SnapshotRef, // todo maybe the naming/variable order can be better, or just snapshot id is better? this is parent
54+
from_snapshot_id: Option<i64>,
55+
matching_operations: &HashSet<Operation>,
56+
manifest_content_type: ManifestContentType,
3857
) -> Result<(Vec<ManifestFile>, HashSet<i64>)> {
39-
// todo: add default implementation
40-
Ok((vec![], HashSet::new()))
58+
let mut manifests: Vec<ManifestFile> = vec![];
59+
let mut new_snapshots = HashSet::new();
60+
let mut last_snapshot: Option<SnapshotRef> = None;
61+
62+
let snapshots = ancestors_between(
63+
&Arc::new(base.metadata().clone()),
64+
to_snapshot.snapshot_id(),
65+
from_snapshot_id.clone(),
66+
);
67+
68+
for current_snapshot in snapshots {
69+
last_snapshot = Some(current_snapshot.clone());
70+
71+
// Find all snapshots with the matching operations
72+
// and their manifest files with the matching content type
73+
if matching_operations.contains(&current_snapshot.summary().operation) {
74+
new_snapshots.insert(current_snapshot.snapshot_id());
75+
current_snapshot
76+
.load_manifest_list(base.file_io(), base.metadata())
77+
.await?
78+
.entries()
79+
.iter()
80+
.for_each(|manifest| {
81+
if manifest.content == manifest_content_type
82+
&& manifest.added_snapshot_id == current_snapshot.snapshot_id()
83+
{
84+
manifests.push(manifest.clone());
85+
}
86+
});
87+
}
88+
}
89+
90+
if last_snapshot.is_some()
91+
&& last_snapshot.clone().unwrap().parent_snapshot_id() != from_snapshot_id
92+
{
93+
return Err(Error::new(
94+
ErrorKind::DataInvalid,
95+
format!(
96+
"Cannot determine history between starting snapshot {} and the last known ancestor {}",
97+
from_snapshot_id.unwrap_or(-1),
98+
last_snapshot.unwrap().snapshot_id()
99+
),
100+
));
101+
}
102+
103+
Ok((manifests, new_snapshots))
104+
}
105+
106+
#[allow(dead_code)]
107+
async fn validate_no_new_delete_files_for_data_files(
108+
&self,
109+
base: &Table,
110+
from_snapshot_id: Option<i64>,
111+
_data_files: &[DataFile],
112+
to_snapshot: SnapshotRef,
113+
) -> Result<()> {
114+
// Get matching delete files have been added since the from_snapshot_id
115+
let (delete_manifests, snapshot_ids) = self
116+
.validation_history(
117+
base,
118+
to_snapshot,
119+
from_snapshot_id,
120+
&VALIDATE_ADDED_DELETE_FILES_OPERATIONS,
121+
ManifestContentType::Deletes,
122+
)
123+
.await?;
124+
125+
// Building delete file index
126+
let (_delete_file_index, mut delete_file_tx) = DeleteFileIndex::new();
127+
let manifests = try_join_all(
128+
delete_manifests
129+
.iter()
130+
.map(|f| f.load_manifest(base.file_io()))
131+
.collect::<Vec<_>>(),
132+
)
133+
.await?;
134+
135+
let delete_files_ctx = manifests
136+
.iter()
137+
.flat_map(|manifest| manifest.entries())
138+
.map(|entry| DeleteFileContext {
139+
manifest_entry: entry.clone(),
140+
partition_spec_id: entry.data_file().partition_spec_id
141+
}).collect::<Vec<_>>();
142+
143+
for ctx in delete_files_ctx {
144+
delete_file_tx.send(ctx).await?
145+
}
146+
147+
// todo validate if there are deletes
148+
149+
Ok(())
41150
}
42151
}

0 commit comments

Comments
 (0)