Skip to content

Commit bc48949

Browse files
committed
JAVA-2778: Release connection source for async cursor as soon as the last batch has been retrieved
1 parent fc6149e commit bc48949

File tree

3 files changed

+54
-43
lines changed

3 files changed

+54
-43
lines changed

driver-core/src/main/com/mongodb/operation/AsyncQueryBatchCursor.java

Lines changed: 15 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ class AsyncQueryBatchCursor<T> implements AsyncBatchCursor<T> {
6262
private final AsyncConnectionSource connectionSource;
6363
private final AtomicBoolean isClosed = new AtomicBoolean();
6464
private final AtomicReference<ServerCursor> cursor;
65-
private final Object lock = new Object();
6665
private volatile QueryResult<T> firstBatch;
6766
private volatile int batchSize;
6867
private volatile int count;
@@ -80,10 +79,11 @@ class AsyncQueryBatchCursor<T> implements AsyncBatchCursor<T> {
8079
this.connectionSource = notNull("connectionSource", connectionSource);
8180
this.count += firstBatch.getResults().size();
8281

83-
connectionSource.retain();
84-
if (firstBatch.getCursor() != null && limitReached()) {
82+
if (firstBatch.getCursor() != null) {
8583
connectionSource.retain();
86-
killCursor(connection);
84+
if (limitReached()) {
85+
killCursor(connection);
86+
}
8787
}
8888
}
8989

@@ -134,8 +134,9 @@ private void next(final SingleResultCallback<List<T>> callback, final boolean tr
134134
firstBatch = null;
135135
callback.onResult(results, null);
136136
} else {
137-
ServerCursor localCursor = getCursorForNext();
137+
ServerCursor localCursor = getServerCursor();
138138
if (localCursor == null) {
139+
isClosed.set(true);
139140
callback.onResult(null, null);
140141
} else {
141142
getMore(localCursor, callback, tryNext);
@@ -152,7 +153,6 @@ private void getMore(final ServerCursor cursor, final SingleResultCallback<List<
152153
@Override
153154
public void onResult(final AsyncConnection connection, final Throwable t) {
154155
if (t != null) {
155-
connectionSource.release();
156156
callback.onResult(null, t);
157157
} else {
158158
getMore(connection, cursor, callback, tryNext);
@@ -170,13 +170,13 @@ private void getMore(final AsyncConnection connection, final ServerCursor cursor
170170

171171
} else {
172172
connection.getMoreAsync(namespace, cursor.getId(), getNumberToReturn(limit, batchSize, count),
173-
decoder, new QueryResultSingleResultCallback(connection, callback, tryNext));
173+
decoder, new QueryResultSingleResultCallback(connection, callback, tryNext));
174174
}
175175
}
176176

177177
private BsonDocument asGetMoreCommandDocument(final long cursorId) {
178178
BsonDocument document = new BsonDocument("getMore", new BsonInt64(cursorId))
179-
.append("collection", new BsonString(namespace.getCollectionName()));
179+
.append("collection", new BsonString(namespace.getCollectionName()));
180180

181181
int batchSizeForGetMoreCommand = Math.abs(getNumberToReturn(limit, this.batchSize, count));
182182
if (batchSizeForGetMoreCommand != 0) {
@@ -201,8 +201,6 @@ public void onResult(final AsyncConnection connection, final Throwable t) {
201201
}
202202
}
203203
});
204-
} else {
205-
connectionSource.release();
206204
}
207205
}
208206

@@ -239,15 +237,14 @@ public void onResult(final Void result, final Throwable t) {
239237

240238
private BsonDocument asKillCursorsCommandDocument(final ServerCursor localCursor) {
241239
return new BsonDocument("killCursors", new BsonString(namespace.getCollectionName()))
242-
.append("cursors", new BsonArray(singletonList(new BsonInt64(localCursor.getId()))));
240+
.append("cursors", new BsonArray(singletonList(new BsonInt64(localCursor.getId()))));
243241
}
244242

245243

246244
private void handleGetMoreQueryResult(final AsyncConnection connection, final SingleResultCallback<List<T>> callback,
247245
final QueryResult<T> result, final boolean tryNext) {
248246
if (isClosed()) {
249247
connection.release();
250-
connectionSource.release();
251248
callback.onResult(null, new MongoException(format("The cursor was closed before %s completed.",
252249
tryNext ? "tryNext()" : "next()")));
253250
return;
@@ -263,7 +260,9 @@ private void handleGetMoreQueryResult(final AsyncConnection connection, final Si
263260
connection.release();
264261
} else {
265262
connection.release();
266-
connectionSource.release();
263+
if (result.getCursor() == null) {
264+
connectionSource.release();
265+
}
267266
}
268267

269268
if (result.getResults().isEmpty()) {
@@ -292,10 +291,9 @@ private class CommandResultSingleResultCallback implements SingleResultCallback<
292291
public void onResult(final BsonDocument result, final Throwable t) {
293292
if (t != null) {
294293
Throwable translatedException = t instanceof MongoCommandException
295-
? translateCommandException((MongoCommandException) t, cursor)
296-
: t;
294+
? translateCommandException((MongoCommandException) t, cursor)
295+
: t;
297296
connection.release();
298-
connectionSource.release();
299297
callback.onResult(null, translatedException);
300298
} else {
301299
QueryResult<T> queryResult = getMoreCursorDocumentToQueryResult(result.getDocument("cursor"),
@@ -321,35 +319,14 @@ private class QueryResultSingleResultCallback implements SingleResultCallback<Qu
321319
public void onResult(final QueryResult<T> result, final Throwable t) {
322320
if (t != null) {
323321
connection.release();
324-
connectionSource.release();
325322
callback.onResult(null, t);
326323
} else {
327324
handleGetMoreQueryResult(connection, callback, result, tryNext);
328325
}
329326
}
330327
}
331328

332-
private ServerCursor getCursorForNext() {
333-
ServerCursor localCursor;
334-
synchronized (lock) {
335-
localCursor = cursor.get();
336-
if (localCursor == null) {
337-
if (!isClosed.getAndSet(true)) {
338-
connectionSource.release();
339-
}
340-
} else {
341-
connectionSource.retain();
342-
}
343-
}
344-
return localCursor;
345-
}
346-
347329
ServerCursor getServerCursor() {
348-
ServerCursor localCursor;
349-
synchronized (lock) {
350-
localCursor = cursor.get();
351-
}
352-
return localCursor;
330+
return cursor.get();
353331
}
354-
355332
}

driver-core/src/test/functional/com/mongodb/operation/AsyncQueryBatchCursorFunctionalSpecification.groovy

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,40 @@ class AsyncQueryBatchCursorFunctionalSpecification extends OperationFunctionalSp
102102
!nextBatch()
103103
}
104104

105+
def 'should not retain connection and source after cursor is exhausted on first batch'() {
106+
given:
107+
cursor = new AsyncQueryBatchCursor<Document>(executeQuery(), 0, 0, 0, new DocumentCodec(), connectionSource, connection)
108+
109+
when:
110+
nextBatch()
111+
112+
then:
113+
connection.count == 1
114+
connectionSource.count == 1
115+
}
116+
117+
def 'should not retain connection and source after cursor is exhausted on getMore'() {
118+
given:
119+
cursor = new AsyncQueryBatchCursor<Document>(executeQuery(1, 0), 1, 1, 0, new DocumentCodec(), connectionSource, connection)
120+
121+
when:
122+
nextBatch()
123+
124+
then:
125+
connection.count == 1
126+
connectionSource.count == 1
127+
}
128+
129+
def 'should not retain connection and source after cursor is exhausted after first batch'() {
130+
when:
131+
cursor = new AsyncQueryBatchCursor<Document>(executeQuery(10, 10), 10, 10, 0, new DocumentCodec(), connectionSource,
132+
connection)
133+
134+
then:
135+
connection.count == 1
136+
connectionSource.count == 1
137+
}
138+
105139
def 'should exhaust single batch with limit'() {
106140
given:
107141
cursor = new AsyncQueryBatchCursor<Document>(executeQuery(1, 0), 1, 0, 0, new DocumentCodec(), connectionSource, connection)

driver-core/src/test/unit/com/mongodb/operation/AsyncQueryBatchCursorSpecification.groovy

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ class AsyncQueryBatchCursorSpecification extends Specification {
134134
nextBatch(cursor) == FIRST_BATCH
135135

136136
then:
137-
connectionSource.getCount() == 1
137+
connectionSource.getCount() == 0
138138

139139
then:
140140
nextBatch(cursor) == null
@@ -203,7 +203,7 @@ class AsyncQueryBatchCursorSpecification extends Specification {
203203
then:
204204
batch == thirdBatch
205205
connectionB.getCount() == 0
206-
connectionSource.getCount() == 1
206+
connectionSource.getCount() == 0
207207

208208
when:
209209
batch = tryNextBatch(cursor)
@@ -272,7 +272,7 @@ class AsyncQueryBatchCursorSpecification extends Specification {
272272
then:
273273
batch == thirdBatch
274274
connectionB.getCount() == 0
275-
connectionSource.getCount() == 1
275+
connectionSource.getCount() == 0
276276

277277
when:
278278
batch = nextBatch(cursor)
@@ -364,7 +364,7 @@ class AsyncQueryBatchCursorSpecification extends Specification {
364364

365365
then:
366366
connection.getCount() == 0
367-
connectionSource.getCount() == 1
367+
connectionSource.getCount() == 0
368368

369369
when:
370370
cursor.close()
@@ -412,7 +412,7 @@ class AsyncQueryBatchCursorSpecification extends Specification {
412412

413413
then:
414414
connection.getCount() == 0
415-
connectionSource.getCount() == 1
415+
connectionSource.getCount() == 0
416416

417417
when:
418418
cursor.close()

0 commit comments

Comments
 (0)