Skip to content

Commit e31edf6

Browse files
authored
Fix gcs integration tests and use default multipart size (#5765)
* Fix gcs tests with dynamic bucket * Comment multipart upload test with link to issue * Increase the default multipart target size * Fix typo
1 parent 53ca587 commit e31edf6

File tree

7 files changed

+124
-49
lines changed

7 files changed

+124
-49
lines changed

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ services:
157157
- all
158158
- fake-gcs-server
159159
volumes:
160-
- fake_gcs_server_data:/data/sample-bucket
160+
- fake_gcs_server_data:/data
161161
command: -scheme http
162162

163163
grafana:

quickwit/quickwit-storage/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ pub use self::object_storage::{
7474
#[cfg(feature = "gcs")]
7575
pub use self::opendal_storage::GoogleCloudStorageFactory;
7676
#[cfg(all(feature = "gcs", feature = "integration-testsuite"))]
77-
pub use self::opendal_storage::new_emulated_google_cloud_storage;
77+
pub use self::opendal_storage::test_config_helpers;
7878
pub use self::ram_storage::{RamStorage, RamStorageBuilder};
7979
pub use self::split::{SplitPayload, SplitPayloadBuilder};
8080
#[cfg(any(test, feature = "testsuite"))]

quickwit/quickwit-storage/src/object_storage/policy.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,7 @@ impl MultiPartPolicy {
6363
}
6464
}
6565

66-
// Default values from https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
67-
// The best default value may however differ depending on vendors.
66+
// The best default value may differ depending on vendors.
6867
impl Default for MultiPartPolicy {
6968
fn default() -> Self {
7069
MultiPartPolicy {

quickwit/quickwit-storage/src/opendal_storage/base.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,19 @@ use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};
2626
use crate::metrics::object_storage_get_slice_in_flight_guards;
2727
use crate::storage::SendableAsync;
2828
use crate::{
29-
BulkDeleteError, OwnedBytes, PutPayload, Storage, StorageError, StorageErrorKind,
30-
StorageResolverError, StorageResult,
29+
BulkDeleteError, MultiPartPolicy, OwnedBytes, PutPayload, Storage, StorageError,
30+
StorageErrorKind, StorageResolverError, StorageResult,
3131
};
3232

3333
/// OpenDAL based storage implementation.
3434
/// # TODO
3535
///
3636
/// - Implement REQUEST_SEMAPHORE to control the concurrency.
3737
/// - Implement STORAGE_METRICS for metrics.
38-
/// - Add multipart_policy to control write at once or via multiple.
39-
#[derive(Clone)]
4038
pub struct OpendalStorage {
4139
uri: Uri,
4240
op: Operator,
41+
multipart_policy: MultiPartPolicy,
4342
}
4443

4544
impl fmt::Debug for OpendalStorage {
@@ -58,7 +57,16 @@ impl OpendalStorage {
5857
cfg: opendal::services::Gcs,
5958
) -> Result<Self, StorageResolverError> {
6059
let op = Operator::new(cfg)?.finish();
61-
Ok(Self { uri, op })
60+
Ok(Self {
61+
uri,
62+
op,
63+
// limits are the same as on S3
64+
multipart_policy: MultiPartPolicy::default(),
65+
})
66+
}
67+
68+
pub fn set_policy(&mut self, multipart_policy: MultiPartPolicy) {
69+
self.multipart_policy = multipart_policy;
6270
}
6371
}
6472

@@ -69,17 +77,14 @@ impl Storage for OpendalStorage {
6977
Ok(())
7078
}
7179

72-
/// # TODO
73-
///
74-
/// We can implement something like `multipart_policy` determine whether to use copy.
75-
/// If the payload is small enough, we can call `op.write()` at once.
7680
async fn put(&self, path: &Path, payload: Box<dyn PutPayload>) -> StorageResult<()> {
7781
let path = path.as_os_str().to_string_lossy();
7882
let mut payload_reader = payload.byte_stream().await?.into_async_read();
7983

8084
let mut storage_writer = self
8185
.op
8286
.writer_with(&path)
87+
.chunk(self.multipart_policy.part_num_bytes(payload.len()) as usize)
8388
.await?
8489
.into_futures_async_write()
8590
.compat_write();
@@ -148,7 +153,7 @@ impl Storage for OpendalStorage {
148153
#[cfg(feature = "integration-testsuite")]
149154
{
150155
let storage_info = self.op.info();
151-
if storage_info.name() == "sample-bucket"
156+
if storage_info.name().starts_with("sample-bucket")
152157
&& storage_info.scheme() == opendal::Scheme::Gcs
153158
{
154159
let mut bulk_error = BulkDeleteError::default();

quickwit/quickwit-storage/src/opendal_storage/google_cloud_storage.rs

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,18 @@ impl StorageFactory for GoogleCloudStorageFactory {
5050
}
5151
}
5252

53-
/// Creates an emulated storage for testing.
53+
/// Helpers to configure the GCP local test setup.
5454
#[cfg(feature = "integration-testsuite")]
55-
pub fn new_emulated_google_cloud_storage(
56-
uri: &Uri,
57-
) -> Result<OpendalStorage, StorageResolverError> {
55+
pub mod test_config_helpers {
56+
use super::*;
57+
58+
/// URL of the local GCP emulator.
59+
pub const LOCAL_GCP_EMULATOR_ENDPOINT: &str = "http://127.0.0.1:4443";
60+
61+
/// reqsign::GoogleTokenLoad implementation for testing.
5862
#[derive(Debug)]
59-
struct DummyTokenLoader;
63+
pub struct DummyTokenLoader;
64+
6065
#[async_trait]
6166
impl reqsign::GoogleTokenLoad for DummyTokenLoader {
6267
async fn load(&self, _: reqwest::Client) -> anyhow::Result<Option<reqsign::GoogleToken>> {
@@ -67,16 +72,21 @@ pub fn new_emulated_google_cloud_storage(
6772
)))
6873
}
6974
}
70-
let (bucket, root) = parse_google_uri(uri).expect("must be valid google uri");
7175

72-
let cfg = opendal::services::Gcs::default()
73-
.bucket(&bucket)
74-
.root(&root.to_string_lossy())
75-
.endpoint("http://127.0.0.1:4443")
76-
.customized_token_loader(Box::new(DummyTokenLoader));
77-
78-
let store = OpendalStorage::new_google_cloud_storage(uri.clone(), cfg)?;
79-
Ok(store)
76+
/// Creates a storage connecting to a local emulated google cloud storage.
77+
pub fn new_emulated_google_cloud_storage(
78+
uri: &Uri,
79+
) -> Result<OpendalStorage, StorageResolverError> {
80+
let (bucket, root) = parse_google_uri(uri).expect("must be valid google uri");
81+
82+
let cfg = opendal::services::Gcs::default()
83+
.bucket(&bucket)
84+
.root(&root.to_string_lossy())
85+
.endpoint(LOCAL_GCP_EMULATOR_ENDPOINT)
86+
.customized_token_loader(Box::new(DummyTokenLoader));
87+
let store = OpendalStorage::new_google_cloud_storage(uri.clone(), cfg)?;
88+
Ok(store)
89+
}
8090
}
8191

8292
fn from_uri(

quickwit/quickwit-storage/src/opendal_storage/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,4 @@ mod google_cloud_storage;
1919

2020
pub use google_cloud_storage::GoogleCloudStorageFactory;
2121
#[cfg(feature = "integration-testsuite")]
22-
pub use google_cloud_storage::new_emulated_google_cloud_storage;
22+
pub use google_cloud_storage::test_config_helpers;

quickwit/quickwit-storage/tests/google_cloud_storage.rs

Lines changed: 81 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,90 @@
1616
// to Fake GCS Server (the emulated google cloud storage environment)
1717

1818
#[cfg(all(feature = "integration-testsuite", feature = "gcs"))]
19-
#[tokio::test]
2019
#[cfg_attr(not(feature = "ci-test"), ignore)]
21-
async fn google_cloud_storage_test_suite() -> anyhow::Result<()> {
20+
mod gcp_storage_test_suite {
2221
use std::str::FromStr;
2322

2423
use anyhow::Context;
24+
use quickwit_common::rand::append_random_suffix;
25+
use quickwit_common::setup_logging_for_tests;
2526
use quickwit_common::uri::Uri;
26-
use quickwit_storage::new_emulated_google_cloud_storage;
27-
let _ = tracing_subscriber::fmt::try_init();
28-
29-
let mut object_storage =
30-
new_emulated_google_cloud_storage(&Uri::from_str("gs://sample-bucket")?)?;
31-
quickwit_storage::storage_test_suite(&mut object_storage).await?;
32-
33-
let mut object_storage = new_emulated_google_cloud_storage(&Uri::from_str(
34-
"gs://sample-bucket/integration-tests/test-azure-compatible-storage",
35-
)?)?;
36-
quickwit_storage::storage_test_single_part_upload(&mut object_storage)
37-
.await
38-
.context("test single-part upload failed")?;
39-
40-
quickwit_storage::storage_test_multi_part_upload(&mut object_storage)
41-
.await
42-
.context("test multipart upload failed")?;
43-
Ok(())
27+
use quickwit_storage::test_config_helpers::{
28+
DummyTokenLoader, LOCAL_GCP_EMULATOR_ENDPOINT, new_emulated_google_cloud_storage,
29+
};
30+
use reqsign::GoogleTokenLoad;
31+
32+
pub async fn sign_gcs_request(req: &mut reqwest::Request) -> anyhow::Result<()> {
33+
let client = reqwest::Client::new();
34+
let token = DummyTokenLoader
35+
.load(client.clone())
36+
.await?
37+
.ok_or_else(|| anyhow::anyhow!("Failed to obtain authentication token"))?;
38+
39+
let signer = reqsign::GoogleSigner::new("storage");
40+
signer.sign(req, &token)?;
41+
42+
Ok(())
43+
}
44+
45+
async fn create_gcs_bucket(bucket_name: &str) -> anyhow::Result<()> {
46+
let client = reqwest::Client::new();
47+
let url = format!("{LOCAL_GCP_EMULATOR_ENDPOINT}/storage/v1/b");
48+
let mut request = client
49+
.post(url)
50+
.body(serde_json::to_vec(&serde_json::json!({
51+
"name": bucket_name,
52+
}))?)
53+
.header(reqwest::header::CONTENT_TYPE, "application/json")
54+
.build()?;
55+
56+
sign_gcs_request(&mut request).await?;
57+
58+
let response = client.execute(request).await?;
59+
60+
if !response.status().is_success() {
61+
let error_text = response.text().await?;
62+
anyhow::bail!("Failed to create bucket: {}", error_text);
63+
};
64+
Ok(())
65+
}
66+
67+
#[tokio::test]
68+
async fn google_cloud_storage_test_suite() -> anyhow::Result<()> {
69+
setup_logging_for_tests();
70+
71+
let bucket_name = append_random_suffix("sample-bucket").to_lowercase();
72+
create_gcs_bucket(bucket_name.as_str())
73+
.await
74+
.context("Failed to create test GCS bucket")?;
75+
76+
let mut object_storage =
77+
new_emulated_google_cloud_storage(&Uri::from_str(&format!("gs://{bucket_name}"))?)?;
78+
79+
quickwit_storage::storage_test_suite(&mut object_storage).await?;
80+
81+
let mut object_storage = new_emulated_google_cloud_storage(&Uri::from_str(&format!(
82+
"gs://{bucket_name}/integration-tests/test-gcs-storage"
83+
))?)?;
84+
85+
quickwit_storage::storage_test_single_part_upload(&mut object_storage)
86+
.await
87+
.context("test single-part upload failed")?;
88+
89+
// TODO: Uncomment storage_test_multi_part_upload when the XML API is
90+
// supported in the emulated GCS server
91+
// (https://github.com/fsouza/fake-gcs-server/pull/1164)
92+
93+
// object_storage.set_policy(MultiPartPolicy {
94+
// target_part_num_bytes: 5 * 1_024 * 1_024,
95+
// max_num_parts: 10_000,
96+
// multipart_threshold_num_bytes: 10_000_000,
97+
// max_object_num_bytes: 5_000_000_000_000,
98+
// max_concurrent_uploads: 100,
99+
// });
100+
// quickwit_storage::storage_test_multi_part_upload(&mut object_storage)
101+
// .await
102+
// .context("test multipart upload failed")?;
103+
Ok(())
104+
}
44105
}

0 commit comments

Comments
 (0)