2020import com .mongodb .client .MongoDriverInformation ;
2121import com .mongodb .client .gridfs .codecs .GridFSFileCodecProvider ;
2222import com .mongodb .client .model .geojson .codecs .GeoJsonCodecProvider ;
23- import com .mongodb .connection .AsynchronousSocketChannelStreamFactoryFactory ;
24- import com .mongodb .connection .Cluster ;
23+ import com .mongodb .connection .AsynchronousSocketChannelStreamFactory ;
2524import com .mongodb .connection .ClusterSettings ;
2625import com .mongodb .connection .ConnectionPoolSettings ;
2726import com .mongodb .connection .DefaultClusterFactory ;
2827import com .mongodb .connection .ServerSettings ;
2928import com .mongodb .connection .SocketSettings ;
3029import com .mongodb .connection .SslSettings ;
3130import com .mongodb .connection .StreamFactory ;
32- import com .mongodb .connection .netty .NettyStreamFactoryFactory ;
31+ import com .mongodb .connection .StreamFactoryFactory ;
32+ import com .mongodb .connection .netty .NettyStreamFactory ;
3333import com .mongodb .event .CommandEventMulticaster ;
3434import com .mongodb .event .CommandListener ;
3535import com .mongodb .management .JMXConnectionPoolListener ;
36+ import io .netty .channel .EventLoopGroup ;
37+ import io .netty .channel .nio .NioEventLoopGroup ;
3638import org .bson .codecs .BsonValueCodecProvider ;
3739import org .bson .codecs .DocumentCodecProvider ;
3840import org .bson .codecs .IterableCodecProvider ;
3941import org .bson .codecs .ValueCodecProvider ;
4042import org .bson .codecs .configuration .CodecRegistry ;
4143
44+ import java .io .Closeable ;
45+ import java .io .IOException ;
4246import java .util .List ;
4347
44- import static java .lang .String .format ;
4548import static java .util .Arrays .asList ;
4649import static org .bson .codecs .configuration .CodecRegistries .fromProviders ;
4750
@@ -91,7 +94,7 @@ public static MongoClient create(final String connectionString) {
9194 * </p>
9295 * <p>
9396 * The connection string's stream type is then applied by setting the
94- * {@link com.mongodb.connection.StreamFactoryFactory } to an instance of {@link NettyStreamFactoryFactory },
97+ * {@link com.mongodb.connection.StreamFactory } to an instance of {@link NettyStreamFactory },
9598 * </p>
9699 *
97100 * @param connectionString the settings
@@ -115,21 +118,21 @@ public static MongoClient create(final ConnectionString connectionString) {
115118 *
116119 * <p>Note: Intended for driver and library authors to associate extra driver metadata with the connections.</p>
117120 *
118- * @param settings the settings
121+ * @param settings the settings
119122 * @param mongoDriverInformation any driver information to associate with the MongoClient
120123 * @return the client
121124 * @since 3.4
122125 */
123126 public static MongoClient create (final MongoClientSettings settings , final MongoDriverInformation mongoDriverInformation ) {
124- return new MongoClientImpl (settings , createCluster ( settings , mongoDriverInformation ) );
127+ return create (settings , mongoDriverInformation , null );
125128 }
126129
127130 /**
128131 * Create a new client with the given connection string.
129132 *
130133 * <p>Note: Intended for driver and library authors to associate extra driver metadata with the connections.</p>
131134 *
132- * @param connectionString the settings
135+ * @param connectionString the settings
133136 * @param mongoDriverInformation any driver information to associate with the MongoClient
134137 * @return the client
135138 * @throws IllegalArgumentException if the connection string's stream type is not one of "netty" or "nio2"
@@ -153,15 +156,7 @@ public static MongoClient create(final ConnectionString connectionString, final
153156 .socketSettings (SocketSettings .builder ()
154157 .applyConnectionString (connectionString )
155158 .build ());
156- if (connectionString .getStreamType () != null ) {
157- if (connectionString .getStreamType ().toLowerCase ().equals ("netty" )) {
158- builder .streamFactoryFactory (NettyStreamFactoryFactory .builder ().build ());
159- } else if (connectionString .getStreamType ().toLowerCase ().equals ("nio2" )) {
160- builder .streamFactoryFactory (new AsynchronousSocketChannelStreamFactoryFactory ());
161- } else if (!connectionString .getStreamType ().toLowerCase ().equals ("nio2" )) {
162- throw new IllegalArgumentException (format ("Unsupported stream type %s" , connectionString .getStreamType ()));
163- }
164- }
159+
165160 if (connectionString .getReadPreference () != null ) {
166161 builder .readPreference (connectionString .getReadPreference ());
167162 }
@@ -174,9 +169,27 @@ public static MongoClient create(final ConnectionString connectionString, final
174169 if (connectionString .getApplicationName () != null ) {
175170 builder .applicationName (connectionString .getApplicationName ());
176171 }
177- return create (builder .build (), mongoDriverInformation );
172+ return create (builder .build (), mongoDriverInformation , connectionString . getStreamType () );
178173 }
179174
175+ private static MongoClient create (final MongoClientSettings settings , final MongoDriverInformation mongoDriverInformation ,
176+ final String requestedStreamType ) {
177+ String streamType = getStreamType (requestedStreamType );
178+ EventLoopGroup eventLoopGroup = getEventLoopGroupIfNecessary (settings .getStreamFactoryFactory (), streamType );
179+ StreamFactory streamFactory = getStreamFactory (settings .getStreamFactoryFactory (), settings .getSocketSettings (),
180+ settings .getSslSettings (), streamType , eventLoopGroup );
181+ StreamFactory heartbeatStreamFactory = getStreamFactory (settings .getStreamFactoryFactory (), settings .getHeartbeatSocketSettings (),
182+ settings .getSslSettings (), streamType , eventLoopGroup );
183+ return new MongoClientImpl (settings , new DefaultClusterFactory ().create (settings .getClusterSettings (), settings .getServerSettings (),
184+ settings .getConnectionPoolSettings (), streamFactory ,
185+ heartbeatStreamFactory ,
186+ settings .getCredentialList (), null , new JMXConnectionPoolListener (), null ,
187+ createCommandListener (settings .getCommandListeners ()),
188+ settings .getApplicationName (), mongoDriverInformation ),
189+ getEventLoopGroupCloser (eventLoopGroup ));
190+ }
191+
192+
180193 /**
181194 * Gets the default codec registry. It includes the following providers:
182195 *
@@ -203,23 +216,55 @@ public static CodecRegistry getDefaultCodecRegistry() {
203216 new GeoJsonCodecProvider (),
204217 new GridFSFileCodecProvider ()));
205218
206- private static Cluster createCluster (final MongoClientSettings settings , final MongoDriverInformation mongoDriverInformation ) {
207- StreamFactory streamFactory = getStreamFactory (settings );
208- StreamFactory heartbeatStreamFactory = getHeartbeatStreamFactory (settings );
209- return new DefaultClusterFactory ().create (settings .getClusterSettings (), settings .getServerSettings (),
210- settings .getConnectionPoolSettings (), streamFactory ,
211- heartbeatStreamFactory ,
212- settings .getCredentialList (), null , new JMXConnectionPoolListener (), null ,
213- createCommandListener (settings .getCommandListeners ()),
214- settings .getApplicationName (), mongoDriverInformation );
219+ private static StreamFactory getStreamFactory (final StreamFactoryFactory streamFactoryFactory ,
220+ final SocketSettings socketSettings , final SslSettings sslSettings ,
221+ final String streamType , final EventLoopGroup eventLoopGroup ) {
222+ if (streamFactoryFactory != null ) {
223+ return streamFactoryFactory .create (socketSettings , sslSettings );
224+ } else if (isNetty (streamType )) {
225+ return new NettyStreamFactory (socketSettings , sslSettings , eventLoopGroup );
226+ } else if (isNio2 (streamType )) {
227+ return new AsynchronousSocketChannelStreamFactory (socketSettings , sslSettings );
228+ } else {
229+ throw new IllegalArgumentException ("Unsupported stream type: " + streamType );
230+ }
231+ }
232+
233+ private static boolean isNetty (final String streamType ) {
234+ return streamType .toLowerCase ().equals ("netty" );
235+ }
236+
237+ private static boolean isNio2 (final String streamType ) {
238+ return streamType .toLowerCase ().equals ("nio2" );
215239 }
216240
217- private static StreamFactory getHeartbeatStreamFactory (final MongoClientSettings settings ) {
218- return settings .getStreamFactoryFactory ().create (settings .getHeartbeatSocketSettings (), settings .getSslSettings ());
241+ private static String getStreamType (final String requestedStreamType ) {
242+ if (requestedStreamType != null ) {
243+ return requestedStreamType ;
244+ } else {
245+ return System .getProperty ("org.mongodb.async.type" , "nio2" );
246+ }
219247 }
220248
221- private static StreamFactory getStreamFactory (final MongoClientSettings settings ) {
222- return settings .getStreamFactoryFactory ().create (settings .getSocketSettings (), settings .getSslSettings ());
249+ private static Closeable getEventLoopGroupCloser (final EventLoopGroup eventLoopGroup ) {
250+ if (eventLoopGroup == null ) {
251+ return null ;
252+ } else {
253+ return new Closeable () {
254+ @ Override
255+ public void close () throws IOException {
256+ eventLoopGroup .shutdownGracefully ().awaitUninterruptibly ();
257+ }
258+ };
259+ }
260+ }
261+ private static EventLoopGroup getEventLoopGroupIfNecessary (final StreamFactoryFactory streamFactoryFactory ,
262+ final String streamType ) {
263+ if (isNetty (streamType ) && streamFactoryFactory == null ) {
264+ return new NioEventLoopGroup ();
265+ } else {
266+ return null ;
267+ }
223268 }
224269
225270 private static CommandListener createCommandListener (final List <CommandListener > commandListeners ) {
0 commit comments