3838import org .bson .codecs .Decoder ;
3939
4040import java .util .List ;
41- import java .util .concurrent .atomic .AtomicBoolean ;
4241import java .util .concurrent .atomic .AtomicInteger ;
4342import java .util .concurrent .atomic .AtomicReference ;
4443
@@ -65,7 +64,6 @@ class AsyncQueryBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
6564 private final Decoder <T > decoder ;
6665 private final long maxTimeMS ;
6766 private final AsyncConnectionSource connectionSource ;
68- private final AtomicBoolean isClosed = new AtomicBoolean ();
6967 private final AtomicReference <ServerCursor > cursor ;
7068 private volatile QueryResult <T > firstBatch ;
7169 private volatile int batchSize ;
@@ -74,6 +72,12 @@ class AsyncQueryBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
7472 private volatile BsonTimestamp operationTime ;
7573 private volatile boolean firstBatchEmpty ;
7674
75+ /* protected by `this` */
76+ private boolean isOperationInProgress = false ;
77+ private boolean isClosed = false ;
78+ private boolean isClosePending = false ;
79+ /* protected by `this` */
80+
7781 AsyncQueryBatchCursor (final QueryResult <T > firstBatch , final int limit , final int batchSize , final long maxTimeMS ,
7882 final Decoder <T > decoder , final AsyncConnectionSource connectionSource , final AsyncConnection connection ) {
7983 this (firstBatch , limit , batchSize , maxTimeMS , decoder , connectionSource , connection , null );
@@ -108,7 +112,19 @@ class AsyncQueryBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
108112
109113 @ Override
110114 public void close () {
111- if (!isClosed .getAndSet (true )) {
115+ boolean killCursor = false ;
116+
117+ synchronized (this ) {
118+ if (isOperationInProgress ) {
119+ isClosePending = true ;
120+ } else {
121+ killCursor = !isClosed ;
122+ isClosed = true ;
123+ isClosePending = false ;
124+ }
125+ }
126+
127+ if (killCursor ) {
112128 killCursorOnClose ();
113129 }
114130 }
@@ -125,19 +141,25 @@ public void tryNext(final SingleResultCallback<List<T>> callback) {
125141
126142 @ Override
127143 public void setBatchSize (final int batchSize ) {
128- isTrue ("open" , !isClosed .get ());
144+ synchronized (this ) {
145+ isTrue ("open" , !isClosed );
146+ }
129147 this .batchSize = batchSize ;
130148 }
131149
132150 @ Override
133151 public int getBatchSize () {
134- isTrue ("open" , !isClosed .get ());
152+ synchronized (this ) {
153+ isTrue ("open" , !isClosed );
154+ }
135155 return batchSize ;
136156 }
137157
138158 @ Override
139159 public boolean isClosed () {
140- return isClosed .get ();
160+ synchronized (this ) {
161+ return isClosed ;
162+ }
141163 }
142164
143165 @ Override
@@ -170,9 +192,19 @@ private void next(final SingleResultCallback<List<T>> callback, final boolean tr
170192 } else {
171193 ServerCursor localCursor = getServerCursor ();
172194 if (localCursor == null ) {
173- isClosed .set (true );
195+ synchronized (this ) {
196+ isClosed = true ;
197+ }
174198 callback .onResult (null , null );
175199 } else {
200+ synchronized (this ) {
201+ if (isClosed ) {
202+ callback .onResult (null , new MongoException (format ("%s called after the cursor was closed." ,
203+ tryNext ? "tryNext()" : "next()" )));
204+ return ;
205+ }
206+ isOperationInProgress = true ;
207+ }
176208 getMore (localCursor , callback , tryNext );
177209 }
178210 }
@@ -187,6 +219,7 @@ private void getMore(final ServerCursor cursor, final SingleResultCallback<List<
187219 @ Override
188220 public void onResult (final AsyncConnection connection , final Throwable t ) {
189221 if (t != null ) {
222+ endOperationInProgress ();
190223 callback .onResult (null , t );
191224 } else {
192225 getMore (connection , cursor , callback , tryNext );
@@ -274,17 +307,20 @@ private BsonDocument asKillCursorsCommandDocument(final ServerCursor localCursor
274307 .append ("cursors" , new BsonArray (singletonList (new BsonInt64 (localCursor .getId ()))));
275308 }
276309
310+ private void endOperationInProgress () {
311+ boolean closePending = false ;
312+ synchronized (this ) {
313+ isOperationInProgress = false ;
314+ closePending = this .isClosePending ;
315+ }
316+ if (closePending ) {
317+ close ();
318+ }
319+ }
277320
278321 private void handleGetMoreQueryResult (final AsyncConnection connection , final SingleResultCallback <List <T >> callback ,
279322 final QueryResult <T > result , final boolean tryNext ) {
280- if (isClosed ()) {
281- connection .release ();
282- callback .onResult (null , new MongoException (format ("The cursor was closed before %s completed." ,
283- tryNext ? "tryNext()" : "next()" )));
284- return ;
285- }
286-
287- cursor .getAndSet (result .getCursor ());
323+ cursor .set (result .getCursor ());
288324 if (!tryNext && result .getResults ().isEmpty () && result .getCursor () != null ) {
289325 getMore (connection , result .getCursor (), callback , false );
290326 } else {
@@ -298,6 +334,7 @@ private void handleGetMoreQueryResult(final AsyncConnection connection, final Si
298334 connectionSource .release ();
299335 }
300336 }
337+ endOperationInProgress ();
301338
302339 if (result .getResults ().isEmpty ()) {
303340 callback .onResult (null , null );
@@ -328,6 +365,7 @@ public void onResult(final BsonDocument result, final Throwable t) {
328365 ? translateCommandException ((MongoCommandException ) t , cursor )
329366 : t ;
330367 connection .release ();
368+ endOperationInProgress ();
331369 callback .onResult (null , translatedException );
332370 } else {
333371 QueryResult <T > queryResult = getMoreCursorDocumentToQueryResult (result .getDocument (CURSOR ),
@@ -354,6 +392,7 @@ private class QueryResultSingleResultCallback implements SingleResultCallback<Qu
354392 public void onResult (final QueryResult <T > result , final Throwable t ) {
355393 if (t != null ) {
356394 connection .release ();
395+ endOperationInProgress ();
357396 callback .onResult (null , t );
358397 } else {
359398 handleGetMoreQueryResult (connection , callback , result , tryNext );
0 commit comments