Skip to content
Draft
7 changes: 7 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ jobs:
- name: Run object_store tests
run: cargo test --features=aws,azure,gcp,http

- name: Run crypto feature tests
run: |
# With ring
cargo test crypto --no-default-features --features=aws,azure,gcp,ring
# Without ring
cargo test crypto --no-default-features --features=aws,azure,gcp

# Don't rerun doc tests (some of them rely on features other than aws)
- name: Run object_store tests (AWS native conditional put)
run: cargo test --lib --tests --features=aws
Expand Down
10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ serde = { version = "1.0", default-features = false, features = ["derive"], opti
serde_json = { version = "1.0", default-features = false, features = ["std"], optional = true }
serde_urlencoded = { version = "0.7", optional = true }
tokio = { version = "1.29.0", features = ["sync", "macros", "rt", "time", "io-util"] }
openssl = "0.10.73"

[target.'cfg(target_family="unix")'.dev-dependencies]
nix = { version = "0.30.0", features = ["fs"] }
Expand All @@ -69,8 +70,9 @@ web-time = { version = "1.1.0" }
wasm-bindgen-futures = "0.4.18"

[features]
default = ["fs"]
cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/stream", "chrono/serde", "base64", "rand", "ring", "http-body-util", "form_urlencoded", "serde_urlencoded"]
default = ["fs", "ring"]
ring = ["dep:ring"]
cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/stream", "chrono/serde", "base64", "rand", "dep:ring", "http-body-util", "form_urlencoded", "serde_urlencoded"]
azure = ["cloud", "httparse"]
fs = ["walkdir"]
gcp = ["cloud", "rustls-pemfile"]
Expand Down Expand Up @@ -105,3 +107,7 @@ features = ["js"]
name = "get_range_file"
path = "tests/get_range_file.rs"
required-features = ["fs"]

[[test]]
name = "crypto"
path = "tests/crypto.rs"
93 changes: 89 additions & 4 deletions src/aws/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::aws::{
};
use crate::client::{http_connector, HttpConnector, TokenCredentialProvider};
use crate::config::ConfigValue;
use crate::crypto::CryptoProviderRef;
use crate::{ClientConfigKey, ClientOptions, Result, RetryConfig, StaticCredentialProvider};
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
Expand Down Expand Up @@ -87,6 +88,9 @@ enum Error {
header: &'static str,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},

#[error("Missing crypto provider. Please enabled the default crypto provider or configure one explicitly.")]
MissingCryptoProvider {},
}

impl From<Error> for crate::Error {
Expand Down Expand Up @@ -120,8 +124,10 @@ impl From<Error> for crate::Error {
/// .with_secret_access_key(SECRET_KEY)
/// .build();
/// ```
#[derive(Debug, Default, Clone)]
#[derive(Debug, Clone)]
pub struct AmazonS3Builder {
/// Crypto provider
crypto_provider: Option<CryptoProviderRef>,
/// Access key id
access_key_id: Option<String>,
/// Secret access_key
Expand Down Expand Up @@ -486,10 +492,55 @@ impl FromStr for AmazonS3ConfigKey {
}
}

impl Default for AmazonS3Builder {
fn default() -> Self {
Self::new()
}
}

impl AmazonS3Builder {
/// Create a new [`AmazonS3Builder`] with default values.
pub fn new() -> Self {
Default::default()
let mut builder = Self {
crypto_provider: None,
access_key_id: None,
secret_access_key: None,
region: None,
bucket_name: None,
endpoint: None,
token: None,
url: None,
retry_config: RetryConfig::default(),
imdsv1_fallback: ConfigValue::default(),
virtual_hosted_style_request: ConfigValue::default(),
s3_express: ConfigValue::default(),
unsigned_payload: ConfigValue::default(),
checksum_algorithm: None,
metadata_endpoint: Some(DEFAULT_METADATA_ENDPOINT.to_string()),
container_credentials_relative_uri: None,
container_credentials_full_uri: None,
container_authorization_token_file: None,
client_options: ClientOptions::default(),
credentials: None,
skip_signature: ConfigValue::default(),
copy_if_not_exists: None,
conditional_put: ConfigValue::default(),
disable_tagging: ConfigValue::default(),
encryption_type: None,
encryption_kms_key_id: None,
encryption_bucket_key_enabled: None,
encryption_customer_key_base64: None,
request_payer: ConfigValue::default(),
http_connector: None,
};

#[cfg(feature = "ring")]
{
use crate::crypto::ring_crypto::RingProvider;
builder = builder.with_crypto(Arc::new(RingProvider::default()));
};

builder
}

/// Fill the [`AmazonS3Builder`] with regular AWS environment variables
Expand Down Expand Up @@ -537,6 +588,12 @@ impl AmazonS3Builder {
builder
}

/// TODO(jakedern): Docs
pub fn with_crypto(mut self, crypto_provider: CryptoProviderRef) -> Self {
self.crypto_provider = Some(crypto_provider);
self
}

/// Parse available connection info form a well-known storage URL.
///
/// The supported url schemes are:
Expand Down Expand Up @@ -1096,6 +1153,10 @@ impl AmazonS3Builder {
)) as _
};

let crypto_provider = self
.crypto_provider
.ok_or(Error::MissingCryptoProvider {})?;

let (session_provider, zonal_endpoint) = match self.s3_express.get()? {
true => {
let zone = parse_bucket_az(&bucket).ok_or_else(|| {
Expand All @@ -1109,6 +1170,7 @@ impl AmazonS3Builder {
let session = Arc::new(
TokenCredentialProvider::new(
SessionProvider {
crypto_provider: crypto_provider.clone(),
endpoint: endpoint.clone(),
region: region.clone(),
credentials: Arc::clone(&credentials),
Expand Down Expand Up @@ -1148,6 +1210,7 @@ impl AmazonS3Builder {
};

let config = S3Config {
crypto_provider: crypto_provider.clone(),
region,
bucket,
bucket_endpoint,
Expand All @@ -1166,9 +1229,12 @@ impl AmazonS3Builder {
};

let http_client = http.connect(&config.client_options)?;
let client = Arc::new(S3Client::new(config, http_client));
let client = Arc::new(S3Client::new(config, http_client, crypto_provider.clone()));

Ok(AmazonS3 { client })
Ok(AmazonS3 {
client,
crypto_provider,
})
}
}

Expand Down Expand Up @@ -1353,6 +1419,8 @@ impl From<S3EncryptionHeaders> for HeaderMap {

#[cfg(test)]
mod tests {
use crate::crypto;

use super::*;
use std::collections::HashMap;

Expand Down Expand Up @@ -1731,6 +1799,23 @@ mod tests {
assert!(
debug_str.contains("TokenCredentialProvider"),
"expected TokenCredentialProvider but got: {debug_str}"
)
}

fn aws_test_crypto_configuration() {
let builder = AmazonS3Builder::default()
.with_bucket_name("testbucket")
.with_crypto(Arc::from(crypto::noop_crypto::NoopCrypto {}));

let bytes = b"hello world";
assert_eq!(
builder
.crypto_provider
.unwrap()
.digest_sha256(bytes)
.unwrap()
.as_ref(),
bytes
);
}
}
54 changes: 36 additions & 18 deletions src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use crate::client::s3::{
InitiateMultipartUploadResult, ListResponse, PartMetadata,
};
use crate::client::{GetOptionsExt, HttpClient, HttpError, HttpResponse};
use crate::crypto::{self, CryptoProvider, CryptoProviderRef};
use crate::list::{PaginatedListOptions, PaginatedListResult};
use crate::multipart::PartId;
use crate::{
Expand All @@ -52,8 +53,6 @@ use itertools::Itertools;
use md5::{Digest, Md5};
use percent_encoding::{utf8_percent_encode, PercentEncode};
use quick_xml::events::{self as xml_events};
use ring::digest;
use ring::digest::Context;
use serde::{Deserialize, Serialize};
use std::sync::Arc;

Expand Down Expand Up @@ -192,6 +191,7 @@ impl From<DeleteError> for Error {

#[derive(Debug)]
pub(crate) struct S3Config {
pub crypto_provider: CryptoProviderRef,
pub region: String,
pub bucket: String,
pub bucket_endpoint: String,
Expand Down Expand Up @@ -224,6 +224,7 @@ impl S3Config {
};

Ok(SessionCredential {
crypto_provider: self.crypto_provider.as_ref(),
credential,
session_token: self.session_provider.is_some(),
config: self,
Expand All @@ -244,17 +245,22 @@ impl S3Config {
}

struct SessionCredential<'a> {
crypto_provider: &'a dyn CryptoProvider,
credential: Option<Arc<AwsCredential>>,
session_token: bool,
config: &'a S3Config,
}

impl SessionCredential<'_> {
fn authorizer(&self) -> Option<AwsAuthorizer<'_>> {
let mut authorizer =
AwsAuthorizer::new(self.credential.as_deref()?, "s3", &self.config.region)
.with_sign_payload(self.config.sign_payload)
.with_request_payer(self.config.request_payer);
let mut authorizer = AwsAuthorizer::new(
self.credential.as_deref()?,
self.crypto_provider,
"s3",
&self.config.region,
)
.with_sign_payload(self.config.sign_payload)
.with_request_payer(self.config.request_payer);

if self.session_token {
let token = HeaderName::from_static("x-amz-s3session-token");
Expand Down Expand Up @@ -291,10 +297,11 @@ impl From<RequestError> for crate::Error {

/// A builder for a request allowing customisation of the headers and query string
pub(crate) struct Request<'a> {
crypto_provider: &'a dyn CryptoProvider,
path: &'a Path,
config: &'a S3Config,
builder: HttpRequestBuilder,
payload_sha256: Option<digest::Digest>,
payload_sha256: Option<crypto::Digest>,
payload: Option<PutPayload>,
use_session_creds: bool,
idempotent: bool,
Expand Down Expand Up @@ -395,32 +402,33 @@ impl Request<'_> {
Self { builder, ..self }
}

pub(crate) fn with_payload(mut self, payload: PutPayload) -> Self {
pub(crate) fn with_payload(mut self, payload: PutPayload) -> Result<Self> {
if (!self.config.skip_signature && self.config.sign_payload)
|| self.config.checksum.is_some()
{
let mut sha256 = Context::new(&digest::SHA256);
payload.iter().for_each(|x| sha256.update(x));
let payload_sha256 = sha256.finish();
let payload_sha256 = self
.crypto_provider
.digest_all_sha256(&mut payload.iter().map(|p| p.as_ref()))?;

if let Some(Checksum::SHA256) = self.config.checksum {
self.builder = self
.builder
.header(SHA256_CHECKSUM, BASE64_STANDARD.encode(payload_sha256));
.header(SHA256_CHECKSUM, BASE64_STANDARD.encode(&payload_sha256));
}
self.payload_sha256 = Some(payload_sha256);
}

let content_length = payload.content_length();
self.builder = self.builder.header(CONTENT_LENGTH, content_length);
self.payload = Some(payload);
self
Ok(self)
}

pub(crate) async fn send(self) -> Result<HttpResponse, RequestError> {
let credential = match self.use_session_creds {
true => self.config.get_session_credential().await?,
false => SessionCredential {
crypto_provider: self.crypto_provider,
credential: self.config.get_credential().await?,
session_token: false,
config: self.config,
Expand Down Expand Up @@ -456,16 +464,26 @@ impl Request<'_> {
pub(crate) struct S3Client {
pub config: S3Config,
pub client: HttpClient,
pub crypto_provider: CryptoProviderRef,
}

impl S3Client {
pub(crate) fn new(config: S3Config, client: HttpClient) -> Self {
Self { config, client }
pub(crate) fn new(
config: S3Config,
client: HttpClient,
crypto_provider: CryptoProviderRef,
) -> Self {
Self {
config,
client,
crypto_provider,
}
}

pub(crate) fn request<'a>(&'a self, method: Method, path: &'a Path) -> Request<'a> {
let url = self.config.path_url(path);
Request {
crypto_provider: self.crypto_provider.as_ref(),
path,
builder: self.client.request(method, url),
payload: None,
Expand Down Expand Up @@ -534,8 +552,8 @@ impl S3Client {

let mut builder = self.client.request(Method::POST, url);

let digest = digest::digest(&digest::SHA256, &body);
builder = builder.header(SHA256_CHECKSUM, BASE64_STANDARD.encode(digest));
let digest = self.crypto_provider.digest_sha256(&body)?;
builder = builder.header(SHA256_CHECKSUM, BASE64_STANDARD.encode(digest.as_ref()));

// S3 *requires* DeleteObjects to include a Content-MD5 header:
// https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
Expand Down Expand Up @@ -685,7 +703,7 @@ impl S3Client {
.idempotent(true);

request = match data {
PutPartPayload::Part(payload) => request.with_payload(payload),
PutPartPayload::Part(payload) => request.with_payload(payload)?,
PutPartPayload::Copy(path) => request.header(
"x-amz-copy-source",
&format!("{}/{}", self.config.bucket, encode_path(path)),
Expand Down
Loading