@@ -264,23 +264,29 @@ public <T> T sendAndReceive(final CommandMessage message, final Decoder<T> decod
264264 }
265265 private void sendCommandMessage (final CommandMessage message ,
266266 final ByteBufferBsonOutput bsonOutput , final SessionContext sessionContext ) {
267- try {
268- if ( sendCompressor == null || SECURITY_SENSITIVE_COMMANDS . contains ( message . getCommandDocument ( bsonOutput ). getFirstKey ())) {
267+ if ( sendCompressor == null || SECURITY_SENSITIVE_COMMANDS . contains ( message . getCommandDocument ( bsonOutput ). getFirstKey ())) {
268+ try {
269269 sendMessage (bsonOutput .getByteBuffers (), message .getId ());
270- } else {
271- CompressedMessage compressedMessage = new CompressedMessage (message .getOpCode (), bsonOutput .getByteBuffers (),
272- sendCompressor ,
270+ } finally {
271+ bsonOutput .close ();
272+ }
273+ } else {
274+ List <ByteBuf > byteBuffers = bsonOutput .getByteBuffers ();
275+ ByteBufferBsonOutput compressedBsonOutput ;
276+ try {
277+ CompressedMessage compressedMessage = new CompressedMessage (message .getOpCode (), byteBuffers , sendCompressor ,
273278 getMessageSettings (description ));
274- ByteBufferBsonOutput compressedBsonOutput = new ByteBufferBsonOutput (this );
279+ compressedBsonOutput = new ByteBufferBsonOutput (this );
275280 compressedMessage .encode (compressedBsonOutput , sessionContext );
276- try {
277- sendMessage (compressedBsonOutput .getByteBuffers (), message .getId ());
278- } finally {
279- compressedBsonOutput .close ();
280- }
281+ } finally {
282+ releaseAllBuffers (byteBuffers );
283+ bsonOutput .close ();
284+ }
285+ try {
286+ sendMessage (compressedBsonOutput .getByteBuffers (), message .getId ());
287+ } finally {
288+ compressedBsonOutput .close ();
281289 }
282- } finally {
283- bsonOutput .close ();
284290 }
285291 }
286292
@@ -324,11 +330,15 @@ public <T> void sendAndReceiveAsync(final CommandMessage message, final Decoder<
324330 sendCommandMessageAsync (message .getId (), decoder , sessionContext , callback , bsonOutput , commandEventSender ,
325331 message .isResponseExpected ());
326332 } else {
327- CompressedMessage compressedMessage = new CompressedMessage (message .getOpCode (), bsonOutput .getByteBuffers (),
328- sendCompressor ,
329- getMessageSettings (description ));
330- compressedMessage .encode (compressedBsonOutput , sessionContext );
331- bsonOutput .close ();
333+ List <ByteBuf > byteBuffers = bsonOutput .getByteBuffers ();
334+ try {
335+ CompressedMessage compressedMessage = new CompressedMessage (message .getOpCode (), byteBuffers , sendCompressor ,
336+ getMessageSettings (description ));
337+ compressedMessage .encode (compressedBsonOutput , sessionContext );
338+ } finally {
339+ releaseAllBuffers (byteBuffers );
340+ bsonOutput .close ();
341+ }
332342 sendCommandMessageAsync (message .getId (), decoder , sessionContext , callback , compressedBsonOutput , commandEventSender ,
333343 message .isResponseExpected ());
334344 }
@@ -339,6 +349,12 @@ public <T> void sendAndReceiveAsync(final CommandMessage message, final Decoder<
339349 }
340350 }
341351
352+ private void releaseAllBuffers (final List <ByteBuf > byteBuffers ) {
353+ for (ByteBuf cur : byteBuffers ) {
354+ cur .release ();
355+ }
356+ }
357+
342358 private <T > void sendCommandMessageAsync (final int messageId , final Decoder <T > decoder , final SessionContext sessionContext ,
343359 final SingleResultCallback <T > callback , final ByteBufferBsonOutput bsonOutput ,
344360 final CommandEventSender commandEventSender , final boolean responseExpected ) {
0 commit comments