Skip to content

Commit a7e1008

Browse files
committed
add validator, write added delete files to manifest
1 parent 3563b34 commit a7e1008

File tree

5 files changed

+109
-11
lines changed

5 files changed

+109
-11
lines changed

crates/iceberg/src/transaction/append.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::table::Table;
2727
use crate::transaction::snapshot::{
2828
DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer,
2929
};
30+
use crate::transaction::validate::SnapshotValidator;
3031
use crate::transaction::{ActionCommit, TransactionAction};
3132

3233
/// FastAppendAction is a transaction action for fast append data files to the table.
@@ -113,6 +114,8 @@ impl TransactionAction for FastAppendAction {
113114

114115
struct FastAppendOperation;
115116

117+
impl SnapshotValidator for FastAppendOperation {}
118+
116119
impl SnapshotProduceOperation for FastAppendOperation {
117120
fn operation(&self) -> Operation {
118121
Operation::Append

crates/iceberg/src/transaction/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ mod update_location;
6363
mod update_properties;
6464
mod update_statistics;
6565
mod upgrade_format_version;
66+
mod validate;
6667

6768
use std::sync::Arc;
6869
use std::time::Duration;

crates/iceberg/src/transaction/rewrite_files.rs

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,10 @@ use super::{ActionCommit, TransactionAction};
2626
use crate::error::{Error, ErrorKind, Result};
2727
use crate::spec::{
2828
DataContentType, DataFile, ManifestContentType, ManifestEntry, ManifestEntryRef, ManifestFile,
29-
ManifestStatus, Operation,
29+
ManifestStatus, Operation, SnapshotRef,
3030
};
3131
use crate::table::Table;
32+
use crate::transaction::validate::SnapshotValidator;
3233

3334
/// Transaction action for rewriting files.
3435
pub struct RewriteFilesAction {
@@ -41,7 +42,12 @@ pub struct RewriteFilesAction {
4142
deleted_delete_files: Vec<DataFile>,
4243
}
4344

44-
pub struct RewriteFilesOperation;
45+
pub struct RewriteFilesOperation {
46+
added_data_files: Vec<DataFile>,
47+
added_delete_files: Vec<DataFile>,
48+
deleted_data_files: Vec<DataFile>,
49+
deleted_delete_files: Vec<DataFile>,
50+
}
4551

4652
impl RewriteFilesAction {
4753
pub fn new() -> Self {
@@ -122,13 +128,16 @@ impl TransactionAction for RewriteFilesAction {
122128
self.deleted_delete_files.clone(),
123129
);
124130

125-
// todo need to figure out validation
126-
// 1. validate replace and added files
127-
// 2. validate no new deletes using the starting snapshot id
131+
let rewrite_operation = RewriteFilesOperation {
132+
added_data_files: self.added_data_files.clone(),
133+
added_delete_files: self.added_delete_files.clone(),
134+
deleted_data_files: self.deleted_data_files.clone(),
135+
deleted_delete_files: self.deleted_delete_files.clone(),
136+
};
128137

129138
// todo should be able to configure merge manifest process
130139
snapshot_producer
131-
.commit(RewriteFilesOperation, DefaultManifestProcess)
140+
.commit(rewrite_operation, DefaultManifestProcess)
132141
.await
133142
}
134143
}
@@ -168,6 +177,36 @@ fn copy_with_deleted_status(entry: &ManifestEntryRef) -> Result<ManifestEntry> {
168177
Ok(builder.build())
169178
}
170179

180+
impl SnapshotValidator for RewriteFilesOperation {
181+
fn validate(&self, _table: &Table, _snapshot: Option<&SnapshotRef>) -> Result<()> {
182+
// Validate replaced and added files
183+
if self.deleted_data_files.is_empty() && self.deleted_delete_files.is_empty() {
184+
return Err(Error::new(
185+
ErrorKind::DataInvalid,
186+
"Files to delete cannot be empty",
187+
));
188+
}
189+
if self.deleted_data_files.is_empty() && !self.added_data_files.is_empty() {
190+
return Err(Error::new(
191+
ErrorKind::DataInvalid,
192+
"Data files to add must be empty because there's no data file to be rewritten",
193+
));
194+
}
195+
if self.deleted_delete_files.is_empty() && !self.added_delete_files.is_empty() {
196+
return Err(Error::new(
197+
ErrorKind::DataInvalid,
198+
"Delete files to add must be empty because there's no delete file to be rewritten",
199+
));
200+
}
201+
202+
// todo add use_starting_seq_number to help the validation
203+
// todo validate no new deletes since the current base
204+
// if there are replaced data files, there cannot be any new row-level deletes for those data files
205+
206+
Ok(())
207+
}
208+
}
209+
171210
impl SnapshotProduceOperation for RewriteFilesOperation {
172211
fn operation(&self) -> Operation {
173212
Operation::Replace

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ use crate::spec::{
3131
};
3232
use crate::table::Table;
3333
use crate::transaction::ActionCommit;
34+
use crate::transaction::validate::SnapshotValidator;
3435
use crate::{Error, ErrorKind, TableRequirement, TableUpdate};
3536

3637
const META_ROOT_PATH: &str = "metadata";
3738

38-
pub(crate) trait SnapshotProduceOperation: Send + Sync {
39+
pub(crate) trait SnapshotProduceOperation: Send + Sync + SnapshotValidator {
3940
fn operation(&self) -> Operation;
4041
fn delete_entries(
4142
&self,
@@ -265,8 +266,12 @@ impl<'a> SnapshotProducer<'a> {
265266
}
266267

267268
// Write manifest file for added data files and return the ManifestFile for ManifestList.
268-
async fn write_added_manifest(&mut self) -> Result<ManifestFile> {
269-
let added_data_files = std::mem::take(&mut self.added_data_files);
269+
async fn write_added_manifest(&mut self, content_type: ManifestContentType) -> Result<ManifestFile> {
270+
let added_data_files = match content_type {
271+
ManifestContentType::Data => std::mem::take(&mut self.added_data_files),
272+
ManifestContentType::Deletes => std::mem::take(&mut self.added_delete_files),
273+
};
274+
270275
if added_data_files.is_empty() {
271276
return Err(Error::new(
272277
ErrorKind::PreconditionFailed,
@@ -289,7 +294,7 @@ impl<'a> SnapshotProducer<'a> {
289294
}
290295
});
291296
let mut writer = self.new_manifest_writer(
292-
ManifestContentType::Data,
297+
content_type,
293298
self.table.metadata().default_partition_spec_id(),
294299
)?;
295300
for entry in manifest_entries {
@@ -373,7 +378,11 @@ impl<'a> SnapshotProducer<'a> {
373378

374379
// Process added entries.
375380
if !self.added_data_files.is_empty() {
376-
let added_manifest = self.write_added_manifest().await?;
381+
let added_manifest = self.write_added_manifest(ManifestContentType::Data).await?;
382+
manifest_files.push(added_manifest);
383+
}
384+
if !self.added_delete_files.is_empty() {
385+
let added_manifest = self.write_added_manifest(ManifestContentType::Deletes).await?;
377386
manifest_files.push(added_manifest);
378387
}
379388

@@ -455,6 +464,10 @@ impl<'a> SnapshotProducer<'a> {
455464
snapshot_produce_operation: OP,
456465
process: MP,
457466
) -> Result<ActionCommit> {
467+
// Validate to avoid conflicts
468+
snapshot_produce_operation
469+
.validate(self.table, self.table.metadata().current_snapshot())?;
470+
458471
let new_manifests = self
459472
.manifest_file(&snapshot_produce_operation, &process)
460473
.await?;
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::HashSet;
19+
20+
use crate::error::Result;
21+
use crate::spec::{ManifestContentType, ManifestFile, Operation, SnapshotRef};
22+
use crate::table::Table;
23+
24+
pub(crate) trait SnapshotValidator {
25+
fn validate(&self, _table: &Table, _snapshot: Option<&SnapshotRef>) -> Result<()> {
26+
// todo: add default implementation
27+
Ok(())
28+
}
29+
30+
#[allow(dead_code)]
31+
async fn validation_history(
32+
&self,
33+
_base: &Table,
34+
_from_snapshot: Option<&SnapshotRef>,
35+
_to_snapshot: &SnapshotRef,
36+
_matching_operations: HashSet<Operation>,
37+
_manifest_content_type: ManifestContentType,
38+
) -> Result<(Vec<ManifestFile>, HashSet<i64>)> {
39+
// todo: add default implementation
40+
Ok((vec![], HashSet::new()))
41+
}
42+
}

0 commit comments

Comments
 (0)