2424import com .mongodb .internal .binding .AsyncConnectionSource ;
2525import com .mongodb .internal .binding .AsyncReadBinding ;
2626import com .mongodb .internal .operation .OperationHelper .AsyncCallableWithSource ;
27+ import com .mongodb .lang .NonNull ;
2728import org .bson .BsonDocument ;
2829import org .bson .BsonTimestamp ;
2930import org .bson .RawBsonDocument ;
3031
3132import java .util .ArrayList ;
3233import java .util .List ;
3334import java .util .concurrent .atomic .AtomicBoolean ;
35+ import java .util .concurrent .atomic .AtomicReference ;
3436
37+ import static com .mongodb .assertions .Assertions .assertNotNull ;
38+ import static com .mongodb .assertions .Assertions .assertNull ;
3539import static com .mongodb .internal .async .ErrorHandlingResultCallback .errorHandlingCallback ;
3640import static com .mongodb .internal .operation .ChangeStreamBatchCursorHelper .isRetryableError ;
3741import static com .mongodb .internal .operation .OperationHelper .LOGGER ;
@@ -44,7 +48,12 @@ final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBat
4448 private final int maxWireVersion ;
4549
4650 private volatile BsonDocument resumeToken ;
47- private volatile AsyncAggregateResponseBatchCursor <RawBsonDocument > wrapped ;
51+ /**
52+ * {@linkplain ChangeStreamBatchCursorHelper#isRetryableError(Throwable, int) Retryable errors} can result in
53+ * {@code wrapped} containing {@code null} and {@link #isClosed} being {@code false}.
54+ * This represents a situation in which the wrapped object was closed by {@code this} but {@code this} remained open.
55+ */
56+ private final AtomicReference <AsyncAggregateResponseBatchCursor <RawBsonDocument >> wrapped ;
4857 private final AtomicBoolean isClosed ;
4958
5059 AsyncChangeStreamBatchCursor (final ChangeStreamOperation <T > changeStreamOperation ,
@@ -53,16 +62,17 @@ final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBat
5362 final BsonDocument resumeToken ,
5463 final int maxWireVersion ) {
5564 this .changeStreamOperation = changeStreamOperation ;
56- this .wrapped = wrapped ;
65+ this .wrapped = new AtomicReference <>( assertNotNull ( wrapped )) ;
5766 this .binding = binding ;
5867 binding .retain ();
5968 this .resumeToken = resumeToken ;
6069 this .maxWireVersion = maxWireVersion ;
6170 isClosed = new AtomicBoolean ();
6271 }
6372
73+ @ NonNull
6474 AsyncAggregateResponseBatchCursor <RawBsonDocument > getWrapped () {
65- return wrapped ;
75+ return assertNotNull ( wrapped . get ()) ;
6676 }
6777
6878 @ Override
@@ -93,7 +103,7 @@ public void apply(final AsyncAggregateResponseBatchCursor<RawBsonDocument> curso
93103 public void close () {
94104 if (isClosed .compareAndSet (false , true )) {
95105 try {
96- wrapped . close ();
106+ nullifyAndCloseWrapped ();
97107 } finally {
98108 binding .release ();
99109 }
@@ -102,22 +112,63 @@ public void close() {
102112
103113 @ Override
104114 public void setBatchSize (final int batchSize ) {
105- wrapped .setBatchSize (batchSize );
115+ getWrapped () .setBatchSize (batchSize );
106116 }
107117
108118 @ Override
109119 public int getBatchSize () {
110- return wrapped .getBatchSize ();
120+ return getWrapped () .getBatchSize ();
111121 }
112122
113123 @ Override
114124 public boolean isClosed () {
115- return isClosed .get ();
125+ if (isClosed .get ()) {
126+ return true ;
127+ } else if (wrappedClosedItself ()) {
128+ close ();
129+ return true ;
130+ } else {
131+ return false ;
132+ }
133+ }
134+
135+ private boolean wrappedClosedItself () {
136+ AsyncAggregateResponseBatchCursor <RawBsonDocument > observedWrapped = wrapped .get ();
137+ return observedWrapped != null && observedWrapped .isClosed ();
138+ }
139+
140+ /**
141+ * {@code null} is written to {@link #wrapped} before closing the wrapped object to maintain the following guarantee:
142+ * if {@link #wrappedClosedItself()} observes a {@linkplain AsyncAggregateResponseBatchCursor#isClosed() closed} wrapped object,
143+ * then it closed itself as opposed to being closed by {@code this}.
144+ */
145+ private void nullifyAndCloseWrapped () {
146+ AsyncAggregateResponseBatchCursor <RawBsonDocument > observedWrapped = wrapped .getAndSet (null );
147+ if (observedWrapped != null ) {
148+ observedWrapped .close ();
149+ }
150+ }
151+
152+ /**
153+ * This method guarantees that the {@code newValue} argument is closed even if
154+ * {@link #setWrappedOrCloseIt(AsyncAggregateResponseBatchCursor)} is called concurrently with or after (in the happens-before order)
155+ * the method {@link #close()}.
156+ */
157+ private void setWrappedOrCloseIt (final AsyncAggregateResponseBatchCursor <RawBsonDocument > newValue ) {
158+ if (isClosed ()) {
159+ assertNull (this .wrapped .get ());
160+ newValue .close ();
161+ } else {
162+ assertNull (this .wrapped .getAndSet (newValue ));
163+ if (isClosed ()) {
164+ nullifyAndCloseWrapped ();
165+ }
166+ }
116167 }
117168
118169 @ Override
119170 public BsonDocument getPostBatchResumeToken () {
120- return wrapped .getPostBatchResumeToken ();
171+ return getWrapped () .getPostBatchResumeToken ();
121172 }
122173
123174 @ Override
@@ -127,7 +178,7 @@ public BsonTimestamp getOperationTime() {
127178
128179 @ Override
129180 public boolean isFirstBatchEmpty () {
130- return wrapped .isFirstBatchEmpty ();
181+ return getWrapped () .isFirstBatchEmpty ();
131182 }
132183
133184 @ Override
@@ -178,18 +229,18 @@ private interface AsyncBlock {
178229
179230 private void resumeableOperation (final AsyncBlock asyncBlock , final SingleResultCallback <List <RawBsonDocument >> callback ,
180231 final boolean tryNext ) {
181- if (isClosed . get ()) {
232+ if (isClosed ()) {
182233 callback .onResult (null , new MongoException (format ("%s called after the cursor was closed." ,
183234 tryNext ? "tryNext()" : "next()" )));
184235 return ;
185236 }
186- asyncBlock .apply (wrapped , new SingleResultCallback <List <RawBsonDocument >>() {
237+ asyncBlock .apply (getWrapped () , new SingleResultCallback <List <RawBsonDocument >>() {
187238 @ Override
188239 public void onResult (final List <RawBsonDocument > result , final Throwable t ) {
189240 if (t == null ) {
190241 callback .onResult (result , null );
191242 } else if (isRetryableError (t , maxWireVersion )) {
192- wrapped . close ();
243+ nullifyAndCloseWrapped ();
193244 retryOperation (asyncBlock , callback , tryNext );
194245 } else {
195246 callback .onResult (null , t );
@@ -214,9 +265,15 @@ public void onResult(final AsyncBatchCursor<T> result, final Throwable t) {
214265 if (t != null ) {
215266 callback .onResult (null , t );
216267 } else {
217- wrapped = ((AsyncChangeStreamBatchCursor <T >) result ).getWrapped ();
218- binding .release (); // release the new change stream batch cursor's reference to the binding
219- resumeableOperation (asyncBlock , callback , tryNext );
268+ try {
269+ setWrappedOrCloseIt (((AsyncChangeStreamBatchCursor <T >) result ).getWrapped ());
270+ } finally {
271+ try {
272+ binding .release (); // release the new change stream batch cursor's reference to the binding
273+ } finally {
274+ resumeableOperation (asyncBlock , callback , tryNext );
275+ }
276+ }
220277 }
221278 }
222279 });
0 commit comments