@@ -419,31 +419,39 @@ private BulkWriteResult updateWithCommandProtocol(final List<ModifyRequest> upda
419419 return writeWithCommandProtocol (port , UPDATE , message , writeConcern );
420420 }
421421
422- private BulkWriteResult writeWithCommandProtocol (final DBPort port , final WriteRequest .Type type , BaseWriteCommandMessage message ,
422+ private BulkWriteResult writeWithCommandProtocol (final DBPort port , final WriteRequest .Type type , final BaseWriteCommandMessage message ,
423423 final WriteConcern writeConcern ) {
424- int batchNum = 0 ;
425- int currentRangeStartIndex = 0 ;
426- BulkWriteBatchCombiner bulkWriteBatchCombiner = new BulkWriteBatchCombiner (port .getAddress (), writeConcern );
427- do {
428- batchNum ++;
429- BaseWriteCommandMessage nextMessage = sendWriteCommandMessage (message , batchNum , port );
430- int itemCount = nextMessage != null ? message .getItemCount () - nextMessage .getItemCount () : message .getItemCount ();
431- IndexMap indexMap = IndexMap .create (currentRangeStartIndex , itemCount );
432- CommandResult commandResult = receiveWriteCommandMessage (port );
433- if (willTrace () && nextMessage != null || batchNum > 1 ) {
434- getLogger ().fine (format ("Received response for batch %d" , batchNum ));
435- }
424+ return db .getConnector ().doOperation (db , port , new DBPort .Operation <BulkWriteResult >() {
425+ @ Override
426+ public BulkWriteResult execute () throws IOException {
427+ BaseWriteCommandMessage curMessage = message ;
428+ int batchNum = 0 ;
429+ int currentRangeStartIndex = 0 ;
430+ BulkWriteBatchCombiner bulkWriteBatchCombiner = new BulkWriteBatchCombiner (port .getAddress (), writeConcern );
431+ do {
432+ batchNum ++;
433+ BaseWriteCommandMessage nextMessage = sendWriteCommandMessage (curMessage , batchNum , port );
434+ int itemCount = nextMessage != null ? curMessage .getItemCount () - nextMessage .getItemCount ()
435+ : curMessage .getItemCount ();
436+ IndexMap indexMap = IndexMap .create (currentRangeStartIndex , itemCount );
437+ CommandResult commandResult = receiveWriteCommandMessage (port );
438+ if (willTrace () && nextMessage != null || batchNum > 1 ) {
439+ getLogger ().fine (format ("Received response for batch %d" , batchNum ));
440+ }
436441
437- if (hasError (commandResult )) {
438- bulkWriteBatchCombiner .addErrorResult (getBulkWriteException (type , commandResult ), indexMap );
439- } else {
440- bulkWriteBatchCombiner .addResult (getBulkWriteResult (type , commandResult ), indexMap );
442+ if (hasError (commandResult )) {
443+ bulkWriteBatchCombiner .addErrorResult (getBulkWriteException (type , commandResult ), indexMap );
444+ } else {
445+ bulkWriteBatchCombiner .addResult (getBulkWriteResult (type , commandResult ), indexMap );
446+ }
447+ currentRangeStartIndex += itemCount ;
448+ curMessage = nextMessage ;
449+ } while (curMessage != null && !bulkWriteBatchCombiner .shouldStopSendingMoreBatches ());
450+
451+ return bulkWriteBatchCombiner .getResult ();
441452 }
442- currentRangeStartIndex += itemCount ;
443- message = nextMessage ;
444- } while (message != null && !bulkWriteBatchCombiner .shouldStopSendingMoreBatches ());
453+ });
445454
446- return bulkWriteBatchCombiner .getResult ();
447455 }
448456
449457 private boolean useWriteCommands (final WriteConcern concern , final DBPort port ) {
@@ -469,38 +477,26 @@ private MongoNamespace getNamespace() {
469477 }
470478
471479 private BaseWriteCommandMessage sendWriteCommandMessage (final BaseWriteCommandMessage message , final int batchNum ,
472- final DBPort port ) {
480+ final DBPort port ) throws IOException {
473481 final PoolOutputBuffer buffer = new PoolOutputBuffer ();
474482 try {
475483 BaseWriteCommandMessage nextMessage = message .encode (buffer );
476484 if (nextMessage != null || batchNum > 1 ) {
477485 getLogger ().fine (format ("Sending batch %d" , batchNum ));
478486 }
479- db .getConnector ().doOperation (getDB (), port , new DBPort .Operation <Void >() {
480- @ Override
481- public Void execute () throws IOException {
482- buffer .pipe (port .getOutputStream ());
483- return null ;
484- }
485- });
487+ buffer .pipe (port .getOutputStream ());
486488 return nextMessage ;
487489 } finally {
488490 buffer .reset ();
489491 }
490492 }
491493
492- private CommandResult receiveWriteCommandMessage (final DBPort port ) {
493- return db .getConnector ().doOperation (getDB (), port , new DBPort .Operation <CommandResult >() {
494- @ Override
495- public CommandResult execute () throws IOException {
496- Response response = new Response (port .getAddress (), null , port .getInputStream (),
497- DefaultDBDecoder .FACTORY .create ());
498- CommandResult writeCommandResult = new CommandResult (port .getAddress ());
499- writeCommandResult .putAll (response .get (0 ));
500- writeCommandResult .throwOnError ();
501- return writeCommandResult ;
502- }
503- });
494+ private CommandResult receiveWriteCommandMessage (final DBPort port ) throws IOException {
495+ Response response = new Response (port .getAddress (), null , port .getInputStream (), DefaultDBDecoder .FACTORY .create ());
496+ CommandResult writeCommandResult = new CommandResult (port .getAddress ());
497+ writeCommandResult .putAll (response .get (0 ));
498+ writeCommandResult .throwOnError ();
499+ return writeCommandResult ;
504500 }
505501
506502
0 commit comments