Skip to content

Commit 7f6a09a

Browse files
Sergey Polovkorozza
authored andcommitted
JAVA-2149: add ability to configure channel type of NettyStream in async driver
1 parent d2c2cd7 commit 7f6a09a

File tree

7 files changed

+275
-16
lines changed

7 files changed

+275
-16
lines changed

driver-async/src/main/com/mongodb/async/client/MongoClientSettings.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ private static StreamFactoryFactory createDefaultStreamFactoryFactory() {
305305
String streamType = System.getProperty("org.mongodb.async.type", "nio2");
306306

307307
if (streamType.equals("netty")) {
308-
return new NettyStreamFactoryFactory();
308+
return NettyStreamFactoryFactory.builder().build();
309309
} else if (streamType.equals("nio2")) {
310310
return new AsynchronousSocketChannelStreamFactoryFactory();
311311
} else {
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2016 MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.async.client
18+
19+
import com.mongodb.async.FutureResultCallback
20+
import com.mongodb.connection.netty.NettyStreamFactoryFactory
21+
import io.netty.channel.oio.OioEventLoopGroup
22+
import io.netty.channel.socket.oio.OioSocketChannel
23+
import org.bson.Document
24+
25+
import static com.mongodb.async.client.Fixture.getMongoClientBuilderFromConnectionString
26+
import static java.util.concurrent.TimeUnit.SECONDS
27+
28+
class NettyStreamFactoryFactorySmokeTestSpecification extends FunctionalSpecification {
29+
30+
private MongoClient mongoClient
31+
32+
def 'should allow a custom Event Loop Group and Socket Channel'() {
33+
given:
34+
def eventLoopGroup = new OioEventLoopGroup()
35+
def streamFactoryFactory = NettyStreamFactoryFactory.builder()
36+
.eventLoopGroup(eventLoopGroup).socketChannelClass(OioSocketChannel).build()
37+
MongoClientSettings.Builder builder = getMongoClientBuilderFromConnectionString().streamFactoryFactory(streamFactoryFactory)
38+
def document = new Document('a', 1)
39+
40+
when:
41+
mongoClient = MongoClients.create(builder.build())
42+
def collection = mongoClient.getDatabase(databaseName).getCollection(collectionName)
43+
44+
45+
then:
46+
run(collection.&insertOne, document) == null
47+
48+
then: 'The count is one'
49+
run(collection.&count) == 1
50+
51+
cleanup:
52+
mongoClient?.close()
53+
54+
}
55+
56+
def run(operation, ... args) {
57+
def futureResultCallback = new FutureResultCallback()
58+
def opArgs = (args != null) ? args : []
59+
operation.call(*opArgs + futureResultCallback)
60+
futureResultCallback.get(60, SECONDS)
61+
}
62+
}

driver-async/src/test/unit/com/mongodb/async/client/MongoClientSettingsSpecification.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ class MongoClientSettingsSpecification extends Specification {
124124

125125
def 'should build with set options'() {
126126
given:
127-
def streamFactoryFactory = new NettyStreamFactoryFactory()
127+
def streamFactoryFactory = NettyStreamFactoryFactory.builder().build()
128128
def sslSettings = Stub(SslSettings)
129129
def socketSettings = Stub(SocketSettings)
130130
def serverSettings = Stub(ServerSettings)

driver-core/src/main/com/mongodb/connection/netty/NettyStream.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import io.netty.channel.EventLoopGroup;
4141
import io.netty.channel.SimpleChannelInboundHandler;
4242
import io.netty.channel.socket.SocketChannel;
43-
import io.netty.channel.socket.nio.NioSocketChannel;
4443
import io.netty.handler.ssl.SslHandler;
4544
import io.netty.handler.timeout.ReadTimeoutException;
4645
import io.netty.util.concurrent.EventExecutor;
@@ -66,6 +65,7 @@ final class NettyStream implements Stream {
6665
private final SocketSettings settings;
6766
private final SslSettings sslSettings;
6867
private final EventLoopGroup workerGroup;
68+
private final Class<? extends SocketChannel> socketChannelClass;
6969
private final ByteBufAllocator allocator;
7070

7171
private volatile boolean isClosed;
@@ -76,11 +76,13 @@ final class NettyStream implements Stream {
7676
private volatile Throwable pendingException;
7777

7878
public NettyStream(final ServerAddress address, final SocketSettings settings, final SslSettings sslSettings,
79-
final EventLoopGroup workerGroup, final ByteBufAllocator allocator) {
79+
final EventLoopGroup workerGroup, final Class<? extends SocketChannel> socketChannelClass,
80+
final ByteBufAllocator allocator) {
8081
this.address = address;
8182
this.settings = settings;
8283
this.sslSettings = sslSettings;
8384
this.workerGroup = workerGroup;
85+
this.socketChannelClass = socketChannelClass;
8486
this.allocator = allocator;
8587
}
8688

@@ -100,7 +102,7 @@ public void open() throws IOException {
100102
public void openAsync(final AsyncCompletionHandler<Void> handler) {
101103
Bootstrap bootstrap = new Bootstrap();
102104
bootstrap.group(workerGroup);
103-
bootstrap.channel(NioSocketChannel.class);
105+
bootstrap.channel(socketChannelClass);
104106

105107
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, settings.getConnectTimeout(MILLISECONDS));
106108
bootstrap.option(ChannelOption.TCP_NODELAY, true);
@@ -278,6 +280,26 @@ public boolean isClosed() {
278280
return isClosed;
279281
}
280282

283+
public SocketSettings getSettings() {
284+
return settings;
285+
}
286+
287+
public SslSettings getSslSettings() {
288+
return sslSettings;
289+
}
290+
291+
public EventLoopGroup getWorkerGroup() {
292+
return workerGroup;
293+
}
294+
295+
public Class<? extends SocketChannel> getSocketChannelClass() {
296+
return socketChannelClass;
297+
}
298+
299+
public ByteBufAllocator getAllocator() {
300+
return allocator;
301+
}
302+
281303
private class InboundBufferHandler extends SimpleChannelInboundHandler<io.netty.buffer.ByteBuf> {
282304
@Override
283305
protected void channelRead0(final ChannelHandlerContext ctx, final io.netty.buffer.ByteBuf buffer) throws Exception {
@@ -344,7 +366,6 @@ public T get() throws IOException {
344366
}
345367
}
346368

347-
348369
private void scheduleReadTimeout() {
349370
adjustTimeout(false);
350371
}
@@ -384,6 +405,5 @@ public void run() {
384405
}
385406
}
386407
}
387-
388408
}
389409
}

driver-core/src/main/com/mongodb/connection/netty/NettyStreamFactory.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import io.netty.buffer.PooledByteBufAllocator;
2626
import io.netty.channel.EventLoopGroup;
2727
import io.netty.channel.nio.NioEventLoopGroup;
28+
import io.netty.channel.socket.SocketChannel;
29+
import io.netty.channel.socket.nio.NioSocketChannel;
2830

2931
import static com.mongodb.assertions.Assertions.notNull;
3032

@@ -37,6 +39,7 @@ public class NettyStreamFactory implements StreamFactory {
3739
private final SocketSettings settings;
3840
private final SslSettings sslSettings;
3941
private final EventLoopGroup eventLoopGroup;
42+
private final Class<? extends SocketChannel> socketChannelClass;
4043
private final ByteBufAllocator allocator;
4144

4245
/**
@@ -45,18 +48,36 @@ public class NettyStreamFactory implements StreamFactory {
4548
* @param settings the socket settings
4649
* @param sslSettings the SSL settings
4750
* @param eventLoopGroup the event loop group that all channels created by this factory will be a part of
51+
* @param socketChannelClass the socket channel class
4852
* @param allocator the allocator to use for ByteBuf instances
53+
*
54+
* @since 3.3
4955
*/
50-
public NettyStreamFactory(final SocketSettings settings, final SslSettings sslSettings, final EventLoopGroup eventLoopGroup,
56+
public NettyStreamFactory(final SocketSettings settings, final SslSettings sslSettings,
57+
final EventLoopGroup eventLoopGroup, final Class<? extends SocketChannel> socketChannelClass,
5158
final ByteBufAllocator allocator) {
5259
this.settings = notNull("settings", settings);
5360
this.sslSettings = notNull("sslSettings", sslSettings);
5461
this.eventLoopGroup = notNull("eventLoopGroup", eventLoopGroup);
62+
this.socketChannelClass = notNull("socketChannelClass", socketChannelClass);
5563
this.allocator = notNull("allocator", allocator);
5664
}
5765

5866
/**
59-
* Construct a new instance of the factory with a default allocator and event loop group.
67+
* Construct a new instance of the factory.
68+
*
69+
* @param settings the socket settings
70+
* @param sslSettings the SSL settings
71+
* @param eventLoopGroup the event loop group that all channels created by this factory will be a part of
72+
* @param allocator the allocator to use for ByteBuf instances
73+
*/
74+
public NettyStreamFactory(final SocketSettings settings, final SslSettings sslSettings, final EventLoopGroup eventLoopGroup,
75+
final ByteBufAllocator allocator) {
76+
this(settings, sslSettings, eventLoopGroup, NioSocketChannel.class, allocator);
77+
}
78+
79+
/**
80+
* Construct a new instance of the factory with a default allocator, nio event loop group and nio socket channel.
6081
*
6182
* @param settings the socket settings
6283
* @param sslSettings the SSL settings
@@ -67,7 +88,7 @@ public NettyStreamFactory(final SocketSettings settings, final SslSettings sslSe
6788

6889
@Override
6990
public Stream create(final ServerAddress serverAddress) {
70-
return new NettyStream(serverAddress, settings, sslSettings, eventLoopGroup, allocator);
91+
return new NettyStream(serverAddress, settings, sslSettings, eventLoopGroup, socketChannelClass, allocator);
7192
}
7293

7394
}

driver-core/src/main/com/mongodb/connection/netty/NettyStreamFactoryFactory.java

Lines changed: 98 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import io.netty.buffer.ByteBufAllocator;
2424
import io.netty.channel.EventLoopGroup;
2525
import io.netty.channel.nio.NioEventLoopGroup;
26+
import io.netty.channel.socket.SocketChannel;
27+
import io.netty.channel.socket.nio.NioSocketChannel;
2628

2729
import static com.mongodb.assertions.Assertions.notNull;
2830

@@ -34,28 +36,118 @@
3436
public class NettyStreamFactoryFactory implements StreamFactoryFactory {
3537

3638
private final EventLoopGroup eventLoopGroup;
39+
private final Class<? extends SocketChannel> socketChannelClass;
3740
private final ByteBufAllocator allocator;
3841

42+
/**
43+
* Construct an instance with the default {@code EventLoopGroup} and {@code ByteBufAllocator}.
44+
*
45+
* @deprecated Use {@link NettyStreamFactoryFactory#builder()} instead to construct the {@code NettyStreamFactoryFactory}.
46+
*/
47+
@Deprecated
48+
public NettyStreamFactoryFactory() {
49+
this(builder());
50+
}
51+
3952
/**
4053
* Construct an instance with the given {@code EventLoopGroup} and {@code ByteBufAllocator}.
4154
*
4255
* @param eventLoopGroup the non-null event loop group
4356
* @param allocator the non-null byte buf allocator
57+
* @deprecated Use {@link NettyStreamFactoryFactory#builder()} instead to construct the {@code NettyStreamFactoryFactory}.
4458
*/
59+
@Deprecated
4560
public NettyStreamFactoryFactory(final EventLoopGroup eventLoopGroup, final ByteBufAllocator allocator) {
46-
this.eventLoopGroup = notNull("eventLoopGroup", eventLoopGroup);
47-
this.allocator = notNull("allocator", allocator);
61+
this(builder().eventLoopGroup(eventLoopGroup).allocator(allocator));
4862
}
4963

5064
/**
51-
* Construct an instance with the default {@code EventLoopGroup} and {@code ByteBufAllocator}.
65+
* Gets a builder for an instance of {@code NettyStreamFactoryFactory}.
66+
* @return the builder
67+
* @since 3.3
5268
*/
53-
public NettyStreamFactoryFactory() {
54-
this(new NioEventLoopGroup(), ByteBufAllocator.DEFAULT);
69+
public static Builder builder() {
70+
return new Builder();
71+
}
72+
73+
/**
74+
* A builder for an instance of {@code NettyStreamFactoryFactory}.
75+
*
76+
* @since 3.3
77+
*/
78+
public static final class Builder {
79+
private ByteBufAllocator allocator;
80+
private Class<? extends SocketChannel> socketChannelClass;
81+
private EventLoopGroup eventLoopGroup;
82+
83+
private Builder() {
84+
allocator(ByteBufAllocator.DEFAULT);
85+
socketChannelClass(NioSocketChannel.class);
86+
}
87+
88+
/**
89+
* Sets the allocator.
90+
*
91+
* @param allocator the allocator to use for ByteBuf instances
92+
* @return this
93+
*/
94+
public Builder allocator(final ByteBufAllocator allocator) {
95+
this.allocator = notNull("allocator", allocator);
96+
return this;
97+
}
98+
99+
/**
100+
* Sets the socket channel class
101+
*
102+
* @param socketChannelClass the socket channel class
103+
* @return this
104+
*/
105+
public Builder socketChannelClass(final Class<? extends SocketChannel> socketChannelClass) {
106+
this.socketChannelClass = notNull("socketChannelClass", socketChannelClass);
107+
return this;
108+
}
109+
110+
/**
111+
* Sets the event loop group.
112+
*
113+
* @param eventLoopGroup the event loop group that all channels created by this factory will be a part of
114+
* @return this
115+
*/
116+
public Builder eventLoopGroup(final EventLoopGroup eventLoopGroup) {
117+
this.eventLoopGroup = notNull("eventLoopGroup", eventLoopGroup);
118+
return this;
119+
}
120+
121+
/**
122+
* Build an instance of {@code NettyStreamFactoryFactory}.
123+
* @return factory of the netty stream factory
124+
*/
125+
public NettyStreamFactoryFactory build() {
126+
return new NettyStreamFactoryFactory(this);
127+
}
55128
}
56129

57130
@Override
58131
public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) {
59-
return new NettyStreamFactory(socketSettings, sslSettings, eventLoopGroup, allocator);
132+
return new NettyStreamFactory(socketSettings, sslSettings, eventLoopGroup, socketChannelClass, allocator);
133+
}
134+
135+
@Override
136+
public String toString() {
137+
return "NettyStreamFactoryFactory{"
138+
+ "eventLoopGroup=" + eventLoopGroup
139+
+ ", socketChannelClass=" + socketChannelClass
140+
+ ", allocator=" + allocator
141+
+ '}';
142+
}
143+
144+
private NettyStreamFactoryFactory(final Builder builder) {
145+
allocator = builder.allocator;
146+
socketChannelClass = builder.socketChannelClass;
147+
if (builder.eventLoopGroup != null) {
148+
eventLoopGroup = builder.eventLoopGroup;
149+
} else {
150+
eventLoopGroup = new NioEventLoopGroup();
151+
}
60152
}
61153
}

0 commit comments

Comments
 (0)