Skip to content

Commit fc6149e

Browse files
committed
JAVA-2778: Release connection source for cursor as soon as the last batch has been retrieved
1 parent 4d90bec commit fc6149e

File tree

4 files changed

+52
-4
lines changed

4 files changed

+52
-4
lines changed

driver-core/src/main/com/mongodb/operation/QueryBatchCursor.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,12 @@
4949
class QueryBatchCursor<T> implements BatchCursor<T> {
5050
private static final FieldNameValidator NO_OP_FIELD_NAME_VALIDATOR = new NoOpFieldNameValidator();
5151
private final MongoNamespace namespace;
52+
private final ServerAddress serverAddress;
5253
private final int limit;
5354
private final Decoder<T> decoder;
54-
private final ConnectionSource connectionSource;
5555
private final long maxTimeMS;
5656
private int batchSize;
57+
private ConnectionSource connectionSource;
5758
private ServerCursor serverCursor;
5859
private List<T> nextBatch;
5960
private int count;
@@ -73,6 +74,7 @@ class QueryBatchCursor<T> implements BatchCursor<T> {
7374
isTrueArgument("maxTimeMS >= 0", maxTimeMS >= 0);
7475
this.maxTimeMS = maxTimeMS;
7576
this.namespace = firstQueryResult.getNamespace();
77+
this.serverAddress = firstQueryResult.getAddress();
7678
this.limit = limit;
7779
this.batchSize = batchSize;
7880
this.decoder = notNull("decoder", decoder);
@@ -89,6 +91,10 @@ class QueryBatchCursor<T> implements BatchCursor<T> {
8991
if (limitReached()) {
9092
killCursor(connection);
9193
}
94+
if (serverCursor == null && this.connectionSource != null) {
95+
this.connectionSource.release();
96+
this.connectionSource = null;
97+
}
9298
}
9399

94100
@Override
@@ -205,7 +211,7 @@ public ServerAddress getServerAddress() {
205211
throw new IllegalStateException("Iterator has been closed");
206212
}
207213

208-
return connectionSource.getServerDescription().getAddress();
214+
return serverAddress;
209215
}
210216

211217
private void getMore() {
@@ -229,6 +235,10 @@ private void getMore() {
229235
if (limitReached()) {
230236
killCursor(connection);
231237
}
238+
if (serverCursor == null) {
239+
this.connectionSource.release();
240+
this.connectionSource = null;
241+
}
232242
} finally {
233243
connection.release();
234244
}

driver-core/src/test/functional/com/mongodb/operation/QueryBatchCursorFunctionalSpecification.groovy

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import spock.lang.IgnoreIf
4040
import java.util.concurrent.CountDownLatch
4141
import java.util.concurrent.TimeUnit
4242

43+
import static com.mongodb.ClusterFixture.checkReferenceCountReachesTarget
4344
import static com.mongodb.ClusterFixture.getBinding
4445
import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet
4546
import static com.mongodb.ClusterFixture.isSharded
@@ -380,6 +381,35 @@ class QueryBatchCursorFunctionalSpecification extends OperationFunctionalSpecifi
380381
thrown(MongoCursorNotFoundException)
381382
}
382383

384+
def 'should release connection source if limit is reached on initial query'() throws InterruptedException {
385+
given:
386+
def firstBatch = executeQuery(5)
387+
def connection = connectionSource.getConnection()
388+
389+
when:
390+
cursor = new QueryBatchCursor<Document>(firstBatch, 5, 0, 0, new DocumentCodec(), connectionSource, connection)
391+
392+
then:
393+
checkReferenceCountReachesTarget(connectionSource, 1)
394+
395+
cleanup:
396+
connection?.release()
397+
}
398+
399+
def 'should release connection source if limit is reached on get more'() throws InterruptedException {
400+
given:
401+
def firstBatch = executeQuery(3)
402+
403+
cursor = new QueryBatchCursor<Document>(firstBatch, 5, 3, new DocumentCodec(), connectionSource)
404+
405+
when:
406+
cursor.next()
407+
cursor.next()
408+
409+
then:
410+
checkReferenceCountReachesTarget(connectionSource, 1)
411+
}
412+
383413
def 'test limit with get more'() {
384414
given:
385415
def firstBatch = executeQuery(2)

driver/src/main/com/mongodb/DBCursor.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -676,7 +676,11 @@ public int count() {
676676
*/
677677
public DBObject one() {
678678
DBCursor findOneCursor = copy().limit(-1);
679-
return findOneCursor.hasNext() ? findOneCursor.next() : null;
679+
try {
680+
return findOneCursor.hasNext() ? findOneCursor.next() : null;
681+
} finally {
682+
findOneCursor.close();
683+
}
680684
}
681685

682686
/**

driver/src/main/com/mongodb/FindIterableImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,11 @@ public FindIterable<TResult> snapshot(final boolean snapshot) {
191191
public TResult first() {
192192
FindOperation<TResult> findFirstOperation = createQueryOperation().batchSize(0).limit(-1);
193193
BatchCursor<TResult> batchCursor = getExecutor().execute(findFirstOperation, getReadPreference(), getClientSession());
194-
return batchCursor.hasNext() ? batchCursor.next().iterator().next() : null;
194+
try {
195+
return batchCursor.hasNext() ? batchCursor.next().iterator().next() : null;
196+
} finally {
197+
batchCursor.close();
198+
}
195199
}
196200

197201
protected ReadOperation<BatchCursor<TResult>> asReadOperation() {

0 commit comments

Comments
 (0)