@@ -463,31 +463,39 @@ private BulkWriteResult updateWithCommandProtocol(final List<ModifyRequest> upda
463463 return writeWithCommandProtocol (port , UPDATE , message , writeConcern );
464464 }
465465
466- private BulkWriteResult writeWithCommandProtocol (final DBPort port , final WriteRequest .Type type , BaseWriteCommandMessage message ,
466+ private BulkWriteResult writeWithCommandProtocol (final DBPort port , final WriteRequest .Type type , final BaseWriteCommandMessage message ,
467467 final WriteConcern writeConcern ) {
468- int batchNum = 0 ;
469- int currentRangeStartIndex = 0 ;
470- BulkWriteBatchCombiner bulkWriteBatchCombiner = new BulkWriteBatchCombiner (port .getAddress (), writeConcern );
471- do {
472- batchNum ++;
473- BaseWriteCommandMessage nextMessage = sendWriteCommandMessage (message , batchNum , port );
474- int itemCount = nextMessage != null ? message .getItemCount () - nextMessage .getItemCount () : message .getItemCount ();
475- IndexMap indexMap = IndexMap .create (currentRangeStartIndex , itemCount );
476- CommandResult commandResult = receiveWriteCommandMessage (port );
477- if (willTrace () && nextMessage != null || batchNum > 1 ) {
478- getLogger ().fine (format ("Received response for batch %d" , batchNum ));
479- }
468+ return db .getConnector ().doOperation (db , port , new DBPort .Operation <BulkWriteResult >() {
469+ @ Override
470+ public BulkWriteResult execute () throws IOException {
471+ BaseWriteCommandMessage curMessage = message ;
472+ int batchNum = 0 ;
473+ int currentRangeStartIndex = 0 ;
474+ BulkWriteBatchCombiner bulkWriteBatchCombiner = new BulkWriteBatchCombiner (port .getAddress (), writeConcern );
475+ do {
476+ batchNum ++;
477+ BaseWriteCommandMessage nextMessage = sendWriteCommandMessage (curMessage , batchNum , port );
478+ int itemCount = nextMessage != null ? curMessage .getItemCount () - nextMessage .getItemCount ()
479+ : curMessage .getItemCount ();
480+ IndexMap indexMap = IndexMap .create (currentRangeStartIndex , itemCount );
481+ CommandResult commandResult = receiveWriteCommandMessage (port );
482+ if (willTrace () && nextMessage != null || batchNum > 1 ) {
483+ getLogger ().fine (format ("Received response for batch %d" , batchNum ));
484+ }
480485
481- if (hasError (commandResult )) {
482- bulkWriteBatchCombiner .addErrorResult (getBulkWriteException (type , commandResult ), indexMap );
483- } else {
484- bulkWriteBatchCombiner .addResult (getBulkWriteResult (type , commandResult ), indexMap );
486+ if (hasError (commandResult )) {
487+ bulkWriteBatchCombiner .addErrorResult (getBulkWriteException (type , commandResult ), indexMap );
488+ } else {
489+ bulkWriteBatchCombiner .addResult (getBulkWriteResult (type , commandResult ), indexMap );
490+ }
491+ currentRangeStartIndex += itemCount ;
492+ curMessage = nextMessage ;
493+ } while (curMessage != null && !bulkWriteBatchCombiner .shouldStopSendingMoreBatches ());
494+
495+ return bulkWriteBatchCombiner .getResult ();
485496 }
486- currentRangeStartIndex += itemCount ;
487- message = nextMessage ;
488- } while (message != null && !bulkWriteBatchCombiner .shouldStopSendingMoreBatches ());
497+ });
489498
490- return bulkWriteBatchCombiner .getResult ();
491499 }
492500
493501 private boolean useWriteCommands (final WriteConcern concern , final DBPort port ) {
@@ -513,38 +521,26 @@ private MongoNamespace getNamespace() {
513521 }
514522
515523 private BaseWriteCommandMessage sendWriteCommandMessage (final BaseWriteCommandMessage message , final int batchNum ,
516- final DBPort port ) {
524+ final DBPort port ) throws IOException {
517525 final PoolOutputBuffer buffer = new PoolOutputBuffer ();
518526 try {
519527 BaseWriteCommandMessage nextMessage = message .encode (buffer );
520528 if (nextMessage != null || batchNum > 1 ) {
521529 getLogger ().fine (format ("Sending batch %d" , batchNum ));
522530 }
523- db .getConnector ().doOperation (getDB (), port , new DBPort .Operation <Void >() {
524- @ Override
525- public Void execute () throws IOException {
526- buffer .pipe (port .getOutputStream ());
527- return null ;
528- }
529- });
531+ buffer .pipe (port .getOutputStream ());
530532 return nextMessage ;
531533 } finally {
532534 buffer .reset ();
533535 }
534536 }
535537
536- private CommandResult receiveWriteCommandMessage (final DBPort port ) {
537- return db .getConnector ().doOperation (getDB (), port , new DBPort .Operation <CommandResult >() {
538- @ Override
539- public CommandResult execute () throws IOException {
540- Response response = new Response (port .getAddress (), null , port .getInputStream (),
541- DefaultDBDecoder .FACTORY .create ());
542- CommandResult writeCommandResult = new CommandResult (port .getAddress ());
543- writeCommandResult .putAll (response .get (0 ));
544- writeCommandResult .throwOnError ();
545- return writeCommandResult ;
546- }
547- });
538+ private CommandResult receiveWriteCommandMessage (final DBPort port ) throws IOException {
539+ Response response = new Response (port .getAddress (), null , port .getInputStream (), DefaultDBDecoder .FACTORY .create ());
540+ CommandResult writeCommandResult = new CommandResult (port .getAddress ());
541+ writeCommandResult .putAll (response .get (0 ));
542+ writeCommandResult .throwOnError ();
543+ return writeCommandResult ;
548544 }
549545
550546
0 commit comments