1616
1717package com.mongodb.connection
1818
19+ import category.Slow
20+ import com.mongodb.ClusterFixture
1921import com.mongodb.MongoClientException
2022import com.mongodb.MongoException
2123import com.mongodb.MongoInternalException
24+ import com.mongodb.MongoInterruptedException
2225import com.mongodb.MongoTimeoutException
2326import com.mongodb.MongoWaitQueueFullException
2427import com.mongodb.ReadPreference
2528import com.mongodb.ServerAddress
2629import com.mongodb.selector.ReadPreferenceServerSelector
2730import com.mongodb.selector.ServerAddressSelector
2831import com.mongodb.selector.WritableServerSelector
32+ import org.junit.experimental.categories.Category
2933import spock.lang.Specification
3034
3135import java.util.concurrent.CountDownLatch
@@ -83,7 +87,7 @@ class BaseClusterSpecification extends Specification {
8387 def cluster = new MultiServerCluster (new ClusterId (),
8488 builder(). mode(MULTIPLE )
8589 .hosts([firstServer, secondServer])
86- .serverSelectionTimeout(1 , MILLISECONDS )
90+ .serverSelectionTimeout(serverSelectionTimeoutMS , MILLISECONDS )
8791 .build(),
8892 factory)
8993
@@ -98,7 +102,8 @@ class BaseClusterSpecification extends Specification {
98102
99103 then :
100104 def e = thrown(MongoTimeoutException )
101- e. getMessage(). startsWith(' Timed out after 1 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN' )
105+ e. getMessage(). startsWith(" Timed out after ${ serverSelectionTimeoutMS} ms while waiting to connect. " +
106+ ' Client view of cluster state is {type=UNKNOWN' )
102107 e. getMessage(). contains(' {address=localhost:27017, type=UNKNOWN, state=CONNECTING, ' +
103108 ' exception={com.mongodb.MongoInternalException: oops}}' );
104109 e. getMessage(). contains(' {address=localhost:27018, type=UNKNOWN, state=CONNECTING}' );
@@ -108,17 +113,105 @@ class BaseClusterSpecification extends Specification {
108113
109114 then :
110115 e = thrown(MongoTimeoutException )
111- e. getMessage(). startsWith(' Timed out after 1 ms while waiting for a server that matches WritableServerSelector. ' +
112- ' Client view of cluster state is {type=UNKNOWN' )
116+ e. getMessage(). startsWith(" Timed out after ${ serverSelectionTimeoutMS } ms while waiting for a server " +
117+ ' that matches WritableServerSelector. Client view of cluster state is {type=UNKNOWN' )
113118 e. getMessage(). contains(' {address=localhost:27017, type=UNKNOWN, state=CONNECTING, ' +
114119 ' exception={com.mongodb.MongoInternalException: oops}}' );
115120 e. getMessage(). contains(' {address=localhost:27018, type=UNKNOWN, state=CONNECTING}' );
121+
122+ where :
123+ serverSelectionTimeoutMS << [1 , 0 ]
116124 }
117125
118- def ' should select server asynchronously ' () {
126+ def ' should select server' () {
119127 given :
120128 def cluster = new MultiServerCluster (new ClusterId (),
121129 builder(). mode(MULTIPLE )
130+ .hosts([firstServer, secondServer, thirdServer])
131+ .serverSelectionTimeout(serverSelectionTimeoutMS, SECONDS )
132+ .build(),
133+ factory)
134+ factory. sendNotification(firstServer, REPLICA_SET_SECONDARY , allServers)
135+ factory. sendNotification(secondServer, REPLICA_SET_SECONDARY , allServers)
136+ factory. sendNotification(thirdServer, REPLICA_SET_PRIMARY , allServers)
137+
138+ expect :
139+ cluster. selectServer(new ReadPreferenceServerSelector (ReadPreference . primary())). description. address == thirdServer
140+
141+ cleanup :
142+ cluster?. close()
143+
144+ where :
145+ serverSelectionTimeoutMS << [30 , 0 , -1 ]
146+ }
147+
148+ @Category (Slow )
149+ def ' should wait indefinitely for a server until interrupted' () {
150+ given :
151+ def cluster = new MultiServerCluster (new ClusterId (),
152+ builder(). mode(MULTIPLE )
153+ .hosts([firstServer, secondServer, thirdServer])
154+ .serverSelectionTimeout(-1 , SECONDS )
155+ .build(),
156+ factory)
157+
158+ when :
159+ def latch = new CountDownLatch (1 )
160+ def thread = new Thread ({
161+ try {
162+ cluster. selectServer(new ReadPreferenceServerSelector (ReadPreference . primary()))
163+ } catch (MongoInterruptedException e) {
164+ latch. countDown()
165+ }
166+ })
167+ thread. start()
168+ sleep(1000 )
169+ thread. interrupt()
170+ latch. await(ClusterFixture . TIMEOUT , SECONDS )
171+
172+ then :
173+ true
174+
175+ cleanup :
176+ cluster?. close()
177+ }
178+
179+ @Category (Slow )
180+ def ' should wait indefinitely for a cluster description until interrupted' () {
181+ given :
182+ def cluster = new MultiServerCluster (new ClusterId (),
183+ builder(). mode(MULTIPLE )
184+ .hosts([firstServer, secondServer, thirdServer])
185+ .serverSelectionTimeout(-1 , SECONDS )
186+ .build(),
187+ factory)
188+
189+ when :
190+ def latch = new CountDownLatch (1 )
191+ def thread = new Thread ({
192+ try {
193+ cluster. getDescription()
194+ } catch (MongoInterruptedException e) {
195+ latch. countDown()
196+ }
197+ })
198+ thread. start()
199+ sleep(1000 )
200+ thread. interrupt()
201+ latch. await(ClusterFixture . TIMEOUT , SECONDS )
202+
203+ then :
204+ true
205+
206+ cleanup :
207+ cluster?. close()
208+ }
209+
210+ def ' should select server asynchronously when server is already available' () {
211+ given :
212+ def cluster = new MultiServerCluster (new ClusterId (),
213+ builder(). mode(MULTIPLE )
214+ .serverSelectionTimeout(serverSelectionTimeoutMS, MILLISECONDS )
122215 .hosts([firstServer, secondServer, thirdServer])
123216 .build(),
124217 factory)
@@ -130,6 +223,22 @@ class BaseClusterSpecification extends Specification {
130223 then :
131224 server. description. address == firstServer
132225
226+ cleanup :
227+ cluster?. close()
228+
229+ where :
230+ serverSelectionTimeoutMS << [30 , 0 , -1 ]
231+ }
232+
233+ def ' should select server asynchronously when server is not yet available' () {
234+ given :
235+ def cluster = new MultiServerCluster (new ClusterId (),
236+ builder(). mode(MULTIPLE )
237+ .serverSelectionTimeout(serverSelectionTimeoutMS, MILLISECONDS )
238+ .hosts([firstServer, secondServer, thirdServer])
239+ .build(),
240+ factory)
241+
133242 when :
134243 def secondServerLatch = selectServerAsync(cluster, secondServer)
135244 def thirdServerLatch = selectServerAsync(cluster, thirdServer)
@@ -144,6 +253,9 @@ class BaseClusterSpecification extends Specification {
144253
145254 cleanup :
146255 cluster?. close()
256+
257+ where :
258+ serverSelectionTimeoutMS << [30 , -1 ]
147259 }
148260
149261 def ' when selecting server asynchronously should send MongoClientException to callback if cluster is closed before success' () {
@@ -171,7 +283,7 @@ class BaseClusterSpecification extends Specification {
171283 def cluster = new MultiServerCluster (new ClusterId (),
172284 builder(). mode(MULTIPLE )
173285 .hosts([firstServer, secondServer, thirdServer])
174- .serverSelectionTimeout(100 , MILLISECONDS )
286+ .serverSelectionTimeout(serverSelectionTimeoutMS , MILLISECONDS )
175287 .build(),
176288 factory)
177289
@@ -183,6 +295,10 @@ class BaseClusterSpecification extends Specification {
183295
184296 cleanup :
185297 cluster?. close()
298+
299+
300+ where :
301+ serverSelectionTimeoutMS << [100 , 0 ]
186302 }
187303
188304 def ' when selecting server asynchronously should send MongoWaitQueueFullException to callback if there are too many waiters' () {
0 commit comments