Skip to content

Commit 2c86b1e

Browse files
goffrieConvex, Inc.
authored andcommitted
[2/2] Switch snapshot import to StorageZipReader (#40444)
This reads zip snapshots directly from object storage instead of copying to a temporary file first. GitOrigin-RevId: 80f843f1c27afc8ff4546958953b2a10d0113c33
1 parent ceab78c commit 2c86b1e

File tree

10 files changed

+145
-221
lines changed

10 files changed

+145
-221
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ exclude = [ "crates/py_client", "crates/python_client_tests" ]
66
[workspace.dependencies]
77
application = { path = "crates/application" }
88
async_lru = { path = "crates/async_lru" }
9-
async_zip_reader = { path = "crates/async_zip_reader" }
109
authentication = { path = "crates/authentication" }
1110
aws_s3 = { path = "crates/aws_s3" }
1211
aws_utils = { path = "crates/aws_utils" }
@@ -55,6 +54,7 @@ shape_inference = { path = "crates/shape_inference" }
5554
sodium_secretbox = { path = "crates/sodium_secretbox" }
5655
sqlite = { path = "crates/sqlite" }
5756
storage = { path = "crates/storage" }
57+
storage_zip_reader = { path = "crates/storage_zip_reader" }
5858
sync = { path = "crates/sync" }
5959
sync_types = { package = "convex_sync_types", path = "crates/convex/sync_types" }
6060
text_search = { path = "crates/text_search" }

crates/application/Cargo.toml

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ testing = [
3232
"value/testing",
3333
"vector/testing",
3434
"exports/testing",
35+
"storage_zip_reader/testing",
3536
]
3637

3738
[dependencies]
@@ -41,7 +42,6 @@ async-recursion = { workspace = true }
4142
async-trait = { workspace = true }
4243
async_lru = { workspace = true }
4344
async_zip = { workspace = true }
44-
async_zip_reader = { workspace = true }
4545
authentication = { workspace = true }
4646
aws_s3 = { workspace = true }
4747
bytes = { workspace = true }
@@ -95,6 +95,7 @@ slugify = "0.1.0"
9595
smallvec = { workspace = true }
9696
sourcemap = { workspace = true }
9797
storage = { workspace = true }
98+
storage_zip_reader = { workspace = true }
9899
strum = { workspace = true }
99100
sync_types = { workspace = true }
100101
tempfile = { workspace = true }
@@ -115,9 +116,7 @@ vector = { workspace = true }
115116
libc = { workspace = true }
116117

117118
[dev-dependencies]
118-
authentication = { workspace = true, features = [
119-
"testing",
120-
] }
119+
authentication = { workspace = true, features = ["testing"] }
121120
common = { workspace = true, features = ["testing"] }
122121
database = { workspace = true, features = ["testing"] }
123122
errors = { workspace = true, features = ["testing"] }
@@ -140,10 +139,9 @@ runtime = { workspace = true, features = ["testing"] }
140139
search = { workspace = true, features = ["testing"] }
141140
shape_inference = { workspace = true, features = ["testing"] }
142141
storage = { workspace = true, features = ["testing"] }
142+
storage_zip_reader = { workspace = true, features = ["testing"] }
143143
udf = { workspace = true, features = ["testing"] }
144-
usage_tracking = { workspace = true, features = [
145-
"testing",
146-
] }
144+
usage_tracking = { workspace = true, features = ["testing"] }
147145
value = { workspace = true, features = ["testing"] }
148146
vector = { workspace = true, features = ["testing"] }
149147

crates/application/src/exports/tests.rs

Lines changed: 11 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,9 @@ use std::{
33
BTreeMap,
44
BTreeSet,
55
},
6-
io::Cursor,
76
sync::Arc,
87
};
98

10-
use anyhow::Context as _;
11-
use async_zip_reader::ZipReader;
129
use common::components::ComponentId;
1310
use database::{
1411
BootstrapComponentsModel,
@@ -27,8 +24,8 @@ use serde_json::json;
2724
use storage::{
2825
LocalDirStorage,
2926
Storage,
30-
StorageExt as _,
3127
};
28+
use storage_zip_reader::StorageZipArchive;
3229
use tokio::io::AsyncReadExt as _;
3330
use value::{
3431
assert_obj,
@@ -136,22 +133,15 @@ async fn test_export_components(rt: TestRuntime) -> anyhow::Result<()> {
136133
.await?;
137134

138135
// Check we can get the stored zip.
139-
let storage_stream = storage
140-
.get(&zip_object_key)
141-
.await?
142-
.context("object missing from storage")?;
143-
let stored_bytes = storage_stream.collect_as_bytes().await?;
144-
let mut zip_reader = ZipReader::new(Cursor::new(stored_bytes)).await?;
136+
let zip_reader = StorageZipArchive::open(storage.clone(), &zip_object_key).await?;
145137
let mut zip_entries = BTreeMap::new();
146-
let filenames: Vec<_> = zip_reader.file_names().await?;
147-
for (i, filename) in filenames.into_iter().enumerate() {
148-
let entry_reader = zip_reader.by_index(i).await?;
138+
for entry in zip_reader.entries() {
149139
let mut entry_contents = String::new();
150-
entry_reader
151-
.read()
140+
zip_reader
141+
.read_entry(entry.clone())
152142
.read_to_string(&mut entry_contents)
153143
.await?;
154-
zip_entries.insert(filename, entry_contents);
144+
zip_entries.insert(entry.name.clone(), entry_contents);
155145
}
156146
assert_eq!(zip_entries, expected_export_entries);
157147

@@ -198,22 +188,15 @@ async fn test_export_unmounted_components(rt: TestRuntime) -> anyhow::Result<()>
198188
.await?;
199189

200190
// Check we can get the stored zip.
201-
let storage_stream = storage
202-
.get(&zip_object_key)
203-
.await?
204-
.context("object missing from storage")?;
205-
let stored_bytes = storage_stream.collect_as_bytes().await?;
206-
let mut zip_reader = ZipReader::new(Cursor::new(stored_bytes)).await?;
191+
let zip_reader = StorageZipArchive::open(storage.clone(), &zip_object_key).await?;
207192
let mut zip_entries = BTreeSet::new();
208-
let filenames: Vec<_> = zip_reader.file_names().await?;
209-
for (i, filename) in filenames.into_iter().enumerate() {
210-
let entry_reader = zip_reader.by_index(i).await?;
193+
for entry in zip_reader.entries() {
211194
let mut entry_contents = String::new();
212-
entry_reader
213-
.read()
195+
zip_reader
196+
.read_entry(entry.clone())
214197
.read_to_string(&mut entry_contents)
215198
.await?;
216-
zip_entries.insert(filename);
199+
zip_entries.insert(entry.name.clone());
217200
}
218201
assert_eq!(zip_entries, expected_export_entries);
219202

crates/application/src/snapshot_import/mod.rs

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,7 @@ use shape_inference::{
9292
export_context::GeneratedSchema,
9393
ProdConfigWithOptionalFields,
9494
};
95-
use storage::{
96-
Storage,
97-
StorageExt,
98-
};
95+
use storage::Storage;
9996
use sync_types::{
10097
backoff::Backoff,
10198
Timestamp,
@@ -412,17 +409,17 @@ impl<RT: Runtime> SnapshotImportExecutor<RT> {
412409
snapshot_import.component_path.clone(),
413410
)
414411
};
415-
let body_stream = move || {
416-
let object_key = object_key.clone();
417-
async move {
418-
let reader = match object_key.clone() {
419-
Ok(key) => self.snapshot_imports_storage.get_fq_object(&key).await?,
420-
Err(key) => self.snapshot_imports_storage.get(&key).await?,
421-
};
422-
reader.with_context(|| format!("Missing import object {object_key:?}"))
423-
}
412+
let fq_key = match &object_key {
413+
Ok(key) => key.clone(),
414+
Err(key) => self.snapshot_imports_storage.fully_qualified_key(key),
424415
};
425-
let objects = parse_objects(format.clone(), component_path.clone(), body_stream).boxed();
416+
let objects = parse_objects(
417+
format.clone(),
418+
component_path.clone(),
419+
self.snapshot_imports_storage.clone(),
420+
fq_key,
421+
)
422+
.boxed();
426423

427424
let component_id = prepare_component_for_import(&self.database, &component_path).await?;
428425
// Remapping could be more extensive here, it's just relatively simple to handle

0 commit comments

Comments
 (0)