@@ -4,33 +4,33 @@ use std::{
44 path:: { Path , PathBuf } ,
55} ;
66
7- use bson:: { Document , RawDocument , RawDocumentBuf } ;
7+ use bson:: { rawdoc , Document , RawDocument , RawDocumentBuf } ;
88use futures_util:: { stream, TryStreamExt } ;
9- use mongocrypt:: ctx:: { Ctx , State } ;
9+ use mongocrypt:: ctx:: { Ctx , KmsProvider , State } ;
1010use rayon:: ThreadPool ;
1111use tokio:: {
1212 io:: { AsyncReadExt , AsyncWriteExt } ,
1313 sync:: { oneshot, Mutex } ,
1414} ;
1515
1616use crate :: {
17- client:: { options:: ServerAddress , WeakClient } ,
17+ client:: { auth :: Credential , options:: ServerAddress , WeakClient } ,
1818 coll:: options:: FindOptions ,
1919 error:: { Error , Result } ,
2020 operation:: { RawOutput , RunCommand } ,
2121 options:: ReadConcern ,
22- runtime:: { AsyncStream , Process , TlsConfig } ,
22+ runtime:: { AsyncStream , HttpClient , Process , TlsConfig } ,
2323 Client ,
2424 Namespace ,
2525} ;
2626
27- use super :: options:: KmsProvidersTlsOptions ;
27+ use super :: options:: KmsProviders ;
2828
2929#[ derive( Debug ) ]
3030pub ( crate ) struct CryptExecutor {
3131 key_vault_client : WeakClient ,
3232 key_vault_namespace : Namespace ,
33- tls_options : Option < KmsProvidersTlsOptions > ,
33+ kms_providers : KmsProviders ,
3434 crypto_threads : ThreadPool ,
3535 mongocryptd : Option < Mongocryptd > ,
3636 mongocryptd_client : Option < Client > ,
@@ -41,7 +41,7 @@ impl CryptExecutor {
4141 pub ( crate ) fn new_explicit (
4242 key_vault_client : WeakClient ,
4343 key_vault_namespace : Namespace ,
44- tls_options : Option < KmsProvidersTlsOptions > ,
44+ kms_providers : KmsProviders ,
4545 ) -> Result < Self > {
4646 // TODO RUST-1492: Replace num_cpus with std::thread::available_parallelism.
4747 let crypto_threads = rayon:: ThreadPoolBuilder :: new ( )
@@ -51,7 +51,7 @@ impl CryptExecutor {
5151 Ok ( Self {
5252 key_vault_client,
5353 key_vault_namespace,
54- tls_options ,
54+ kms_providers ,
5555 crypto_threads,
5656 mongocryptd : None ,
5757 mongocryptd_client : None ,
@@ -62,7 +62,7 @@ impl CryptExecutor {
6262 pub ( crate ) async fn new_implicit (
6363 key_vault_client : WeakClient ,
6464 key_vault_namespace : Namespace ,
65- tls_options : Option < KmsProvidersTlsOptions > ,
65+ kms_providers : KmsProviders ,
6666 mongocryptd_opts : Option < MongocryptdOptions > ,
6767 mongocryptd_client : Option < Client > ,
6868 metadata_client : Option < WeakClient > ,
@@ -71,7 +71,7 @@ impl CryptExecutor {
7171 Some ( opts) => Some ( Mongocryptd :: new ( opts) . await ?) ,
7272 None => None ,
7373 } ;
74- let mut exec = Self :: new_explicit ( key_vault_client, key_vault_namespace, tls_options ) ?;
74+ let mut exec = Self :: new_explicit ( key_vault_client, key_vault_namespace, kms_providers ) ?;
7575 exec. mongocryptd = mongocryptd;
7676 exec. mongocryptd_client = mongocryptd_client;
7777 exec. metadata_client = metadata_client;
@@ -185,8 +185,8 @@ impl CryptExecutor {
185185 let addr = ServerAddress :: parse ( endpoint) ?;
186186 let provider = kms_ctx. kms_provider ( ) ?;
187187 let tls_options = self
188- . tls_options
189- . as_ref ( )
188+ . kms_providers
189+ . tls_options ( )
190190 . and_then ( |tls| tls. get ( & provider) )
191191 . cloned ( )
192192 . unwrap_or_default ( ) ;
@@ -208,8 +208,38 @@ impl CryptExecutor {
208208 . await ?;
209209 }
210210 State :: NeedKmsCredentials => {
211- // TODO(RUST-1314, RUST-1417): support fetching KMS credentials.
212- return Err ( Error :: internal ( "KMS credentials are not yet supported" ) ) ;
211+ let ctx = result_mut ( & mut ctx) ?;
212+ let mut out = rawdoc ! { } ;
213+ if self
214+ . kms_providers
215+ . credentials ( )
216+ . get ( & KmsProvider :: Aws )
217+ . map_or ( false , |d| d. is_empty ( ) )
218+ {
219+ #[ cfg( feature = "aws-auth" ) ]
220+ {
221+ let aws_creds = crate :: client:: auth:: aws:: AwsCredential :: get (
222+ & Credential :: default ( ) ,
223+ & HttpClient :: default ( ) ,
224+ )
225+ . await ?;
226+ let mut creds = rawdoc ! {
227+ "accessKeyId" : aws_creds. access_key( ) ,
228+ "secretAccessKey" : aws_creds. secret_key( ) ,
229+ } ;
230+ if let Some ( token) = aws_creds. session_token ( ) {
231+ creds. append ( "sessionToken" , token) ;
232+ }
233+ out. append ( "aws" , creds) ;
234+ }
235+ #[ cfg( not( feature = "aws-auth" ) ) ]
236+ {
237+ return Err ( Error :: invalid_argument (
238+ "On-demand AWS KMS credentials require the `aws-auth` feature." ,
239+ ) ) ;
240+ }
241+ }
242+ ctx. provide_kms_providers ( & out) ?;
213243 }
214244 State :: Ready => {
215245 let ( tx, rx) = oneshot:: channel ( ) ;
0 commit comments