Skip to content

Commit f0bd611

Browse files
committed
use separate vec to store data and delete files in snapshot producer
1 parent e4269fe commit f0bd611

File tree

3 files changed

+90
-20
lines changed

3 files changed

+90
-20
lines changed

crates/iceberg/src/transaction/append.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ impl TransactionAction for FastAppendAction {
9191
self.snapshot_properties.clone(),
9292
self.added_data_files.clone(),
9393
vec![],
94+
vec![],
95+
vec![],
9496
);
9597

9698
// validate added files

crates/iceberg/src/transaction/rewrite_files.rs

Lines changed: 82 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ use uuid::Uuid;
2424
use super::snapshot::{DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer};
2525
use super::{ActionCommit, TransactionAction};
2626
use crate::error::{Error, ErrorKind, Result};
27-
use crate::spec::{DataFile, ManifestContentType, ManifestEntry, ManifestEntryRef, ManifestFile, ManifestStatus, Operation};
27+
use crate::spec::{
28+
DataContentType, DataFile, ManifestContentType, ManifestEntry, ManifestEntryRef, ManifestFile,
29+
ManifestStatus, Operation,
30+
};
2831
use crate::table::Table;
2932

3033
/// Transaction action for rewriting files.
@@ -33,7 +36,9 @@ pub struct RewriteFilesAction {
3336
key_metadata: Option<Vec<u8>>,
3437
snapshot_properties: HashMap<String, String>,
3538
added_data_files: Vec<DataFile>,
39+
added_delete_files: Vec<DataFile>,
3640
deleted_data_files: Vec<DataFile>,
41+
deleted_delete_files: Vec<DataFile>,
3742
}
3843

3944
pub struct RewriteFilesOperation;
@@ -45,7 +50,9 @@ impl RewriteFilesAction {
4550
key_metadata: None,
4651
snapshot_properties: Default::default(),
4752
added_data_files: vec![],
53+
added_delete_files: vec![],
4854
deleted_data_files: vec![],
55+
deleted_delete_files: vec![],
4956
}
5057
}
5158

@@ -54,7 +61,14 @@ impl RewriteFilesAction {
5461
mut self,
5562
data_files: impl IntoIterator<Item = DataFile>,
5663
) -> Result<Self> {
57-
self.added_data_files.extend(data_files);
64+
for data_file in data_files {
65+
match data_file.content {
66+
DataContentType::Data => self.added_data_files.push(data_file),
67+
DataContentType::PositionDeletes | DataContentType::EqualityDeletes => {
68+
self.added_delete_files.push(data_file)
69+
}
70+
}
71+
}
5872
Ok(self)
5973
}
6074

@@ -63,7 +77,15 @@ impl RewriteFilesAction {
6377
mut self,
6478
data_files: impl IntoIterator<Item = DataFile>,
6579
) -> Result<Self> {
66-
self.deleted_data_files.extend(data_files);
80+
for data_file in data_files {
81+
match data_file.content {
82+
DataContentType::Data => self.deleted_data_files.push(data_file),
83+
DataContentType::PositionDeletes | DataContentType::EqualityDeletes => {
84+
self.deleted_delete_files.push(data_file)
85+
}
86+
}
87+
}
88+
6789
Ok(self)
6890
}
6991

@@ -95,9 +117,15 @@ impl TransactionAction for RewriteFilesAction {
95117
self.key_metadata.clone(),
96118
self.snapshot_properties.clone(),
97119
self.added_data_files.clone(),
120+
self.added_delete_files.clone(),
98121
self.deleted_data_files.clone(),
122+
self.deleted_delete_files.clone(),
99123
);
100124

125+
// todo need to figure out validation
126+
// 1. validate replace and added files
127+
// 2. validate no new deletes using the starting snapshot id
128+
101129
// todo should be able to configure merge manifest process
102130
snapshot_producer
103131
.commit(RewriteFilesOperation, DefaultManifestProcess)
@@ -111,19 +139,28 @@ fn copy_with_deleted_status(entry: &ManifestEntryRef) -> Result<ManifestEntry> {
111139
.snapshot_id(entry.snapshot_id().ok_or_else(|| {
112140
Error::new(
113141
ErrorKind::DataInvalid,
114-
format!("Missing snapshot_id for entry with file path: {}", entry.file_path()),
142+
format!(
143+
"Missing snapshot_id for entry with file path: {}",
144+
entry.file_path()
145+
),
115146
)
116147
})?)
117148
.sequence_number(entry.sequence_number().ok_or_else(|| {
118149
Error::new(
119150
ErrorKind::DataInvalid,
120-
format!("Missing sequence_number for entry with file path: {}", entry.file_path()),
151+
format!(
152+
"Missing sequence_number for entry with file path: {}",
153+
entry.file_path()
154+
),
121155
)
122156
})?)
123157
.file_sequence_number(entry.file_sequence_number().ok_or_else(|| {
124158
Error::new(
125159
ErrorKind::DataInvalid,
126-
format!("Missing file_sequence_number for entry with file path: {}", entry.file_path()),
160+
format!(
161+
"Missing file_sequence_number for entry with file path: {}",
162+
entry.file_path()
163+
),
127164
)
128165
})?)
129166
.data_file(entry.data_file().clone());
@@ -159,12 +196,25 @@ impl SnapshotProduceOperation for RewriteFilesOperation {
159196
.await?;
160197

161198
for entry in manifest.entries() {
162-
if snapshot_producer
163-
.deleted_data_files
164-
.iter()
165-
.any(|f| f.file_path == entry.data_file().file_path)
166-
{
167-
delete_entries.push(copy_with_deleted_status(entry)?);
199+
match entry.content_type() {
200+
DataContentType::Data => {
201+
if snapshot_producer
202+
.deleted_data_files
203+
.iter()
204+
.any(|f| f.file_path == entry.data_file().file_path)
205+
{
206+
delete_entries.push(copy_with_deleted_status(entry)?)
207+
}
208+
}
209+
DataContentType::PositionDeletes | DataContentType::EqualityDeletes => {
210+
if snapshot_producer
211+
.deleted_delete_files
212+
.iter()
213+
.any(|f| f.file_path == entry.data_file().file_path)
214+
{
215+
delete_entries.push(copy_with_deleted_status(entry)?)
216+
}
217+
}
168218
}
169219
}
170220
}
@@ -202,15 +252,27 @@ impl SnapshotProduceOperation for RewriteFilesOperation {
202252
.entries()
203253
.iter()
204254
.filter_map(|entry| {
205-
if snapshot_producer
206-
.deleted_data_files
207-
.iter()
208-
.any(|f| f.file_path == entry.data_file().file_path)
209-
{
210-
Some(entry.data_file().file_path().to_string())
211-
} else {
212-
None
255+
match entry.content_type() {
256+
DataContentType::Data => {
257+
if snapshot_producer
258+
.deleted_data_files
259+
.iter()
260+
.any(|f| f.file_path == entry.data_file().file_path)
261+
{
262+
return Some(entry.data_file().file_path().to_string());
263+
}
264+
}
265+
DataContentType::EqualityDeletes | DataContentType::PositionDeletes => {
266+
if snapshot_producer
267+
.deleted_delete_files
268+
.iter()
269+
.any(|f| f.file_path == entry.data_file().file_path)
270+
{
271+
return Some(entry.data_file().file_path().to_string());
272+
}
273+
}
213274
}
275+
None
214276
})
215277
.collect();
216278

crates/iceberg/src/transaction/snapshot.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,9 @@ pub(crate) struct SnapshotProducer<'a> {
7474
key_metadata: Option<Vec<u8>>,
7575
snapshot_properties: HashMap<String, String>,
7676
added_data_files: Vec<DataFile>,
77+
added_delete_files: Vec<DataFile>,
7778
pub deleted_data_files: Vec<DataFile>,
79+
pub deleted_delete_files: Vec<DataFile>,
7880
// A counter used to generate unique manifest file names.
7981
// It starts from 0 and increments for each new manifest file.
8082
// Note: This counter is limited to the range of (0..u64::MAX).
@@ -88,7 +90,9 @@ impl<'a> SnapshotProducer<'a> {
8890
key_metadata: Option<Vec<u8>>,
8991
snapshot_properties: HashMap<String, String>,
9092
added_data_files: Vec<DataFile>,
93+
added_delete_files: Vec<DataFile>,
9194
deleted_data_files: Vec<DataFile>,
95+
deleted_delete_files: Vec<DataFile>,
9296
) -> Self {
9397
Self {
9498
table,
@@ -97,7 +101,9 @@ impl<'a> SnapshotProducer<'a> {
97101
key_metadata,
98102
snapshot_properties,
99103
added_data_files,
104+
added_delete_files,
100105
deleted_data_files,
106+
deleted_delete_files,
101107
manifest_counter: (0..),
102108
}
103109
}

0 commit comments

Comments
 (0)