|
21 | 21 | import com.mongodb.ReadPreference; |
22 | 22 | import com.mongodb.ServerAddress; |
23 | 23 | import com.mongodb.ServerCursor; |
24 | | -import com.mongodb.internal.async.AsyncBatchCursor; |
25 | | -import com.mongodb.internal.async.SingleResultCallback; |
26 | 24 | import com.mongodb.connection.ConnectionDescription; |
27 | 25 | import com.mongodb.connection.ServerDescription; |
| 26 | +import com.mongodb.internal.async.AsyncBatchCursor; |
| 27 | +import com.mongodb.internal.async.SingleResultCallback; |
28 | 28 | import com.mongodb.internal.binding.AsyncConnectionSource; |
29 | 29 | import com.mongodb.internal.binding.AsyncReadBinding; |
30 | 30 | import com.mongodb.internal.binding.ConnectionSource; |
|
56 | 56 | import static com.mongodb.connection.ServerType.SHARD_ROUTER; |
57 | 57 | import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; |
58 | 58 | import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; |
59 | | -import static com.mongodb.internal.operation.CommandOperationHelper.executeCommandAsync; |
| 59 | +import static com.mongodb.internal.operation.CommandOperationHelper.executeCommandAsyncWithConnection; |
60 | 60 | import static com.mongodb.internal.operation.CommandOperationHelper.executeCommandWithConnection; |
61 | 61 | import static com.mongodb.internal.operation.CommandOperationHelper.isNamespaceError; |
62 | 62 | import static com.mongodb.internal.operation.CommandOperationHelper.rethrowIfNotNamespaceError; |
@@ -262,22 +262,22 @@ public void call(final AsyncConnectionSource source, final AsyncConnection conne |
262 | 262 | if (t != null) { |
263 | 263 | errHandlingCallback.onResult(null, t); |
264 | 264 | } else { |
265 | | - final SingleResultCallback<AsyncBatchCursor<T>> wrappedCallback = releasingCallback(errHandlingCallback, |
266 | | - source, connection); |
267 | 265 | if (serverIsAtLeastVersionThreeDotZero(connection.getDescription())) { |
268 | | - executeCommandAsync(binding, databaseName, getCommandCreator(), createCommandDecoder(), |
269 | | - asyncTransformer(), retryReads, |
| 266 | + executeCommandAsyncWithConnection(binding, source, databaseName, getCommandCreator(), createCommandDecoder(), |
| 267 | + asyncTransformer(), retryReads, connection, |
270 | 268 | new SingleResultCallback<AsyncBatchCursor<T>>() { |
271 | 269 | @Override |
272 | 270 | public void onResult(final AsyncBatchCursor<T> result, final Throwable t) { |
273 | 271 | if (t != null && !isNamespaceError(t)) { |
274 | | - wrappedCallback.onResult(null, t); |
| 272 | + errHandlingCallback.onResult(null, t); |
275 | 273 | } else { |
276 | | - wrappedCallback.onResult(result != null ? result : emptyAsyncCursor(source), null); |
| 274 | + errHandlingCallback.onResult(result != null ? result : emptyAsyncCursor(source), null); |
277 | 275 | } |
278 | 276 | } |
279 | 277 | }); |
280 | 278 | } else { |
| 279 | + final SingleResultCallback<AsyncBatchCursor<T>> wrappedCallback = releasingCallback(errHandlingCallback, |
| 280 | + source, connection); |
281 | 281 | connection.queryAsync(getNamespace(), asQueryDocument(connection.getDescription(), binding.getReadPreference()), |
282 | 282 | null, 0, 0, batchSize, binding.getReadPreference().isSlaveOk(), false, false, false, false, false, |
283 | 283 | new BsonDocumentCodec(), new SingleResultCallback<QueryResult<BsonDocument>>() { |
|
0 commit comments