Skip to content

Commit d5ac764

Browse files
committed
add validator, write added delete files to manifest
1 parent f0bd611 commit d5ac764

File tree

5 files changed

+109
-11
lines changed

5 files changed

+109
-11
lines changed

crates/iceberg/src/transaction/append.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::table::Table;
2727
use crate::transaction::snapshot::{
2828
DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer,
2929
};
30+
use crate::transaction::validate::SnapshotValidator;
3031
use crate::transaction::{ActionCommit, TransactionAction};
3132

3233
/// FastAppendAction is a transaction action for fast append data files to the table.
@@ -113,6 +114,8 @@ impl TransactionAction for FastAppendAction {
113114

114115
struct FastAppendOperation;
115116

117+
impl SnapshotValidator for FastAppendOperation {}
118+
116119
impl SnapshotProduceOperation for FastAppendOperation {
117120
fn operation(&self) -> Operation {
118121
Operation::Append

crates/iceberg/src/transaction/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ mod update_location;
6161
mod update_properties;
6262
mod update_statistics;
6363
mod upgrade_format_version;
64+
mod validate;
6465

6566
use std::sync::Arc;
6667
use std::time::Duration;

crates/iceberg/src/transaction/rewrite_files.rs

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ use super::{ActionCommit, TransactionAction};
2626
use crate::error::{Error, ErrorKind, Result};
2727
use crate::spec::{
2828
DataContentType, DataFile, ManifestContentType, ManifestEntry, ManifestEntryRef, ManifestFile,
29-
ManifestStatus, Operation,
29+
ManifestStatus, Operation, SnapshotRef,
3030
};
3131
use crate::table::Table;
32+
use crate::transaction::validate::SnapshotValidator;
3233

3334
/// Transaction action for rewriting files.
3435
pub struct RewriteFilesAction {
@@ -41,7 +42,12 @@ pub struct RewriteFilesAction {
4142
deleted_delete_files: Vec<DataFile>,
4243
}
4344

44-
pub struct RewriteFilesOperation;
45+
pub struct RewriteFilesOperation {
46+
added_data_files: Vec<DataFile>,
47+
added_delete_files: Vec<DataFile>,
48+
deleted_data_files: Vec<DataFile>,
49+
deleted_delete_files: Vec<DataFile>,
50+
}
4551

4652
impl RewriteFilesAction {
4753
pub fn new() -> Self {
@@ -122,13 +128,16 @@ impl TransactionAction for RewriteFilesAction {
122128
self.deleted_delete_files.clone(),
123129
);
124130

125-
// todo need to figure out validation
126-
// 1. validate replace and added files
127-
// 2. validate no new deletes using the starting snapshot id
131+
let rewrite_operation = RewriteFilesOperation {
132+
added_data_files: self.added_data_files.clone(),
133+
added_delete_files: self.added_delete_files.clone(),
134+
deleted_data_files: self.deleted_data_files.clone(),
135+
deleted_delete_files: self.deleted_delete_files.clone(),
136+
};
128137

129138
// todo should be able to configure merge manifest process
130139
snapshot_producer
131-
.commit(RewriteFilesOperation, DefaultManifestProcess)
140+
.commit(rewrite_operation, DefaultManifestProcess)
132141
.await
133142
}
134143
}
@@ -168,6 +177,36 @@ fn copy_with_deleted_status(entry: &ManifestEntryRef) -> Result<ManifestEntry> {
168177
Ok(builder.build())
169178
}
170179

180+
impl SnapshotValidator for RewriteFilesOperation {
181+
fn validate(&self, _table: &Table, _snapshot: Option<&SnapshotRef>) -> Result<()> {
182+
// Validate replaced and added files
183+
if self.deleted_data_files.is_empty() && self.deleted_delete_files.is_empty() {
184+
return Err(Error::new(
185+
ErrorKind::DataInvalid,
186+
"Files to delete cannot be empty",
187+
));
188+
}
189+
if self.deleted_data_files.is_empty() && !self.added_data_files.is_empty() {
190+
return Err(Error::new(
191+
ErrorKind::DataInvalid,
192+
"Data files to add must be empty because there's no data file to be rewritten",
193+
));
194+
}
195+
if self.deleted_delete_files.is_empty() && !self.added_delete_files.is_empty() {
196+
return Err(Error::new(
197+
ErrorKind::DataInvalid,
198+
"Delete files to add must be empty because there's no delete file to be rewritten",
199+
));
200+
}
201+
202+
// todo add use_starting_seq_number to help the validation
203+
// todo validate no new deletes since the current base
204+
// if there are replaced data files, there cannot be any new row-level deletes for those data files
205+
206+
Ok(())
207+
}
208+
}
209+
171210
impl SnapshotProduceOperation for RewriteFilesOperation {
172211
fn operation(&self) -> Operation {
173212
Operation::Replace

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ use crate::spec::{
3131
};
3232
use crate::table::Table;
3333
use crate::transaction::ActionCommit;
34+
use crate::transaction::validate::SnapshotValidator;
3435
use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
3536

3637
const META_ROOT_PATH: &str = "metadata";
3738

38-
pub(crate) trait SnapshotProduceOperation: Send + Sync {
39+
pub(crate) trait SnapshotProduceOperation: Send + Sync + SnapshotValidator {
3940
fn operation(&self) -> Operation;
4041
fn delete_entries(
4142
&self,
@@ -268,8 +269,12 @@ impl<'a> SnapshotProducer<'a> {
268269
}
269270

270271
// Write manifest file for added data files and return the ManifestFile for ManifestList.
271-
async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
272-
let added_data_files = std::mem::take(&mut self.added_data_files);
272+
async fn write_added_manifest(&mut self, content_type: ManifestContentType) -> Result<ManifestFile> {
273+
let added_data_files = match content_type {
274+
ManifestContentType::Data => std::mem::take(&mut self.added_data_files),
275+
ManifestContentType::Deletes => std::mem::take(&mut self.added_delete_files),
276+
};
277+
273278
if added_data_files.is_empty() {
274279
return Err(Error::new(
275280
ErrorKind::PreconditionFailed,
@@ -292,7 +297,7 @@ impl<'a> SnapshotProducer<'a> {
292297
}
293298
});
294299
let mut writer = self.new_manifest_writer(
295-
ManifestContentType::Data,
300+
content_type,
296301
self.table.metadata().default_partition_spec_id(),
297302
)?;
298303
for entry in manifest_entries {
@@ -376,7 +381,11 @@ impl<'a> SnapshotProducer<'a> {
376381

377382
// Process added entries.
378383
if !self.added_data_files.is_empty() {
379-
let added_manifest = self.write_added_manifest().await?;
384+
let added_manifest = self.write_added_manifest(ManifestContentType::Data).await?;
385+
manifest_files.push(added_manifest);
386+
}
387+
if !self.added_delete_files.is_empty() {
388+
let added_manifest = self.write_added_manifest(ManifestContentType::Deletes).await?;
380389
manifest_files.push(added_manifest);
381390
}
382391

@@ -458,6 +467,10 @@ impl<'a> SnapshotProducer<'a> {
458467
snapshot_produce_operation: OP,
459468
process: MP,
460469
) -> Result<ActionCommit> {
470+
// Validate to avoid conflicts
471+
snapshot_produce_operation
472+
.validate(self.table, self.table.metadata().current_snapshot())?;
473+
461474
let manifest_list_path = self.generate_manifest_list_file_path(0);
462475
let next_seq_num = self.table.metadata().next_sequence_number();
463476
let first_row_id = self.table.metadata().next_row_id();
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashSet;
19+
20+
use crate::error::Result;
21+
use crate::spec::{ManifestContentType, ManifestFile, Operation, SnapshotRef};
22+
use crate::table::Table;
23+
24+
pub(crate) trait SnapshotValidator {
25+
fn validate(&self, _table: &Table, _snapshot: Option<&SnapshotRef>) -> Result<()> {
26+
// todo: add default implementation
27+
Ok(())
28+
}
29+
30+
#[allow(dead_code)]
31+
async fn validation_history(
32+
&self,
33+
_base: &Table,
34+
_from_snapshot: Option<&SnapshotRef>,
35+
_to_snapshot: &SnapshotRef,
36+
_matching_operations: HashSet<Operation>,
37+
_manifest_content_type: ManifestContentType,
38+
) -> Result<(Vec<ManifestFile>, HashSet<i64>)> {
39+
// todo: add default implementation
40+
Ok((vec![], HashSet::new()))
41+
}
42+
}

0 commit comments

Comments
 (0)