Skip to content

Commit 4bb6e2a

Browse files
authored
Fix azure multipart upload data corruption (#5919)
Blocks were added to BlockList in completion order rather than sequential order, causing data corruption. Zero pad block IDs and sort before committing to ensure correct blob reconstruction. Also improve the existing test that was only checking file, the test would fail on main if integrity check was turned on. This PR passes the unit test, also has been runnin in for last two days in production without data corruption, prior to this PR - split was getting corrupted every 45 minutes.
1 parent a58b6de commit 4bb6e2a

File tree

3 files changed

+41
-15
lines changed

3 files changed

+41
-15
lines changed

quickwit/quickwit-storage/src/lib.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,9 +383,28 @@ pub(crate) mod test_suite {
383383
#[cfg(feature = "integration-testsuite")]
384384
pub async fn storage_test_multi_part_upload(storage: &mut dyn Storage) -> anyhow::Result<()> {
385385
let test_path = Path::new("hello_large.txt");
386-
let test_buffer = vec![0u8; 15_000_000];
387-
storage.put(test_path, Box::new(test_buffer)).await?;
386+
387+
let mut test_buffer = Vec::with_capacity(15_000_000);
388+
for i in 0..15_000_000u32 {
389+
test_buffer.push((i % 256) as u8);
390+
}
391+
392+
storage
393+
.put(test_path, Box::new(test_buffer.clone()))
394+
.await?;
395+
388396
assert_eq!(storage.file_num_bytes(test_path).await?, 15_000_000);
397+
398+
let downloaded_data = storage.get_all(test_path).await?;
399+
400+
assert_eq!(test_buffer.len(), downloaded_data.len(), "Length mismatch");
401+
// dont use assert_eq since we dont want large buffers to be printed
402+
// if assert fails
403+
assert!(
404+
test_buffer.as_slice() == downloaded_data.as_slice(),
405+
"Content mismatch - data corruption detected!"
406+
);
407+
389408
Ok(())
390409
}
391410
}

quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ impl AzureBlobStorage {
274274
chunk_range(0..total_len as usize, part_len as usize).map(into_u64_range);
275275

276276
let blob_client = self.container_client.blob_client(name);
277-
let mut upload_blocks_stream_result = tokio_stream::iter(multipart_ranges.enumerate())
277+
let upload_blocks_stream = tokio_stream::iter(multipart_ranges.enumerate())
278278
.map(|(num, range)| {
279279
let moved_blob_client = blob_client.clone();
280280
let moved_payload = payload.clone();
@@ -284,7 +284,8 @@ impl AzureBlobStorage {
284284
.inc_by(range.end - range.start);
285285
async move {
286286
retry(&self.retry_params, || async {
287-
let block_id = format!("block:{num}");
287+
// zero pad block ids to make them sortable as strings
288+
let block_id = format!("block:{:05}", num);
288289
let (data, hash_digest) =
289290
extract_range_data_and_hash(moved_payload.box_clone(), range.clone())
290291
.await?;
@@ -301,16 +302,22 @@ impl AzureBlobStorage {
301302
})
302303
.buffer_unordered(self.multipart_policy.max_concurrent_uploads());
303304

304-
// Concurrently upload block with limit.
305-
let mut block_list = BlockList::default();
306-
while let Some(put_block_result) = upload_blocks_stream_result.next().await {
307-
match put_block_result {
308-
Ok(block_id) => block_list
309-
.blocks
310-
.push(BlobBlockType::new_uncommitted(block_id)),
311-
Err(error) => return Err(error.into()),
312-
}
313-
}
305+
// Collect and sort block ids to preserve part order for put_block_list.
306+
// Azure docs: "The put block list operation enforces the order in which blocks
307+
// are to be combined to create a blob".
308+
// https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-list
309+
let mut block_ids: Vec<String> = upload_blocks_stream
310+
.try_collect()
311+
.await
312+
.map_err(StorageError::from)?;
313+
block_ids.sort_unstable();
314+
315+
let block_list = BlockList {
316+
blocks: block_ids
317+
.into_iter()
318+
.map(BlobBlockType::new_uncommitted)
319+
.collect(),
320+
};
314321

315322
// Commit all uploaded blocks.
316323
blob_client

quickwit/quickwit-storage/tests/azure_storage.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ async fn azure_storage_test_suite() -> anyhow::Result<()> {
4545

4646
object_storage.set_policy(MultiPartPolicy {
4747
// On azure, block size is limited between 64KB and 100MB.
48-
target_part_num_bytes: 5 * 1_024 * 1_024, // 5MB
48+
target_part_num_bytes: 5 * 1_024 * 1_024, // 5MiB
4949
max_num_parts: 10_000,
5050
multipart_threshold_num_bytes: 10_000_000,
5151
max_object_num_bytes: 5_000_000_000_000,

0 commit comments

Comments
 (0)