@@ -916,50 +916,49 @@ def kill_cursors(cursor_ids):
916916class _BulkWriteContext (object ):
917917 """A wrapper around SocketInfo for use with write splitting functions."""
918918
919- __slots__ = ('db_name' , 'command' , ' sock_info' , 'op_id' ,
919+ __slots__ = ('db_name' , 'sock_info' , 'op_id' ,
920920 'name' , 'field' , 'publish' , 'start_time' , 'listeners' ,
921- 'session' , 'compress' , 'op_type' , 'codec' )
921+ 'session' , 'compress' , 'op_type' , 'codec' , 'cmd_legacy' )
922922
923- def __init__ (self , database_name , command , sock_info , operation_id ,
924- listeners , session , op_type , codec ):
923+ def __init__ (self , database_name , cmd_name , sock_info , operation_id ,
924+ listeners , session , op_type , codec , cmd_legacy = None ):
925925 self .db_name = database_name
926- self .command = command
927926 self .sock_info = sock_info
928927 self .op_id = operation_id
929928 self .listeners = listeners
930929 self .publish = listeners .enabled_for_commands
931- self .name = next ( iter ( command ))
930+ self .name = cmd_name
932931 self .field = _FIELD_MAP [self .name ]
933932 self .start_time = datetime .datetime .now () if self .publish else None
934933 self .session = session
935934 self .compress = True if sock_info .compression_context else False
936935 self .op_type = op_type
937936 self .codec = codec
938- sock_info . add_server_api ( command )
937+ self . cmd_legacy = cmd_legacy
939938
940- def _batch_command (self , docs ):
939+ def _batch_command (self , cmd , docs ):
941940 namespace = self .db_name + '.$cmd'
942941 request_id , msg , to_send = _do_bulk_write_command (
943- namespace , self .op_type , self . command , docs , self .check_keys ,
942+ namespace , self .op_type , cmd , docs , self .check_keys ,
944943 self .codec , self )
945944 if not to_send :
946945 raise InvalidOperation ("cannot do an empty bulk write" )
947946 return request_id , msg , to_send
948947
949- def execute (self , docs , client ):
950- request_id , msg , to_send = self ._batch_command (docs )
951- result = self .write_command (request_id , msg , to_send )
948+ def execute (self , cmd , docs , client ):
949+ request_id , msg , to_send = self ._batch_command (cmd , docs )
950+ result = self .write_command (cmd , request_id , msg , to_send )
952951 client ._process_response (result , self .session )
953952 return result , to_send
954953
955- def execute_unack (self , docs , client ):
956- request_id , msg , to_send = self ._batch_command (docs )
954+ def execute_unack (self , cmd , docs , client ):
955+ request_id , msg , to_send = self ._batch_command (cmd , docs )
957956 # Though this isn't strictly a "legacy" write, the helper
958957 # handles publishing commands and sending our message
959958 # without receiving a result. Send 0 for max_doc_size
960959 # to disable size checking. Size checking is handled while
961960 # the documents are encoded to BSON.
962- self .legacy_write (request_id , msg , 0 , False , to_send )
961+ self .legacy_write (cmd , request_id , msg , 0 , False , to_send )
963962 return to_send
964963
965964 @property
@@ -996,14 +995,16 @@ def legacy_bulk_insert(
996995 request_id , msg = _compress (
997996 2002 , msg , self .sock_info .compression_context )
998997 return self .legacy_write (
999- request_id , msg , max_doc_size , acknowledged , docs )
998+ self .cmd_legacy .copy (), request_id , msg , max_doc_size ,
999+ acknowledged , docs )
10001000
1001- def legacy_write (self , request_id , msg , max_doc_size , acknowledged , docs ):
1001+ def legacy_write (self , cmd , request_id , msg , max_doc_size , acknowledged ,
1002+ docs ):
10021003 """A proxy for SocketInfo.legacy_write that handles event publishing.
10031004 """
10041005 if self .publish :
10051006 duration = datetime .datetime .now () - self .start_time
1006- cmd = self ._start (request_id , docs )
1007+ cmd = self ._start (cmd , request_id , docs )
10071008 start = datetime .datetime .now ()
10081009 try :
10091010 result = self .sock_info .legacy_write (
@@ -1032,12 +1033,12 @@ def legacy_write(self, request_id, msg, max_doc_size, acknowledged, docs):
10321033 self .start_time = datetime .datetime .now ()
10331034 return result
10341035
1035- def write_command (self , request_id , msg , docs ):
1036+ def write_command (self , cmd , request_id , msg , docs ):
10361037 """A proxy for SocketInfo.write_command that handles event publishing.
10371038 """
10381039 if self .publish :
10391040 duration = datetime .datetime .now () - self .start_time
1040- self ._start (request_id , docs )
1041+ self ._start (cmd , request_id , docs )
10411042 start = datetime .datetime .now ()
10421043 try :
10431044 reply = self .sock_info .write_command (request_id , msg )
@@ -1057,9 +1058,8 @@ def write_command(self, request_id, msg, docs):
10571058 self .start_time = datetime .datetime .now ()
10581059 return reply
10591060
1060- def _start (self , request_id , docs ):
1061+ def _start (self , cmd , request_id , docs ):
10611062 """Publish a CommandStartedEvent."""
1062- cmd = self .command .copy ()
10631063 cmd [self .field ] = docs
10641064 self .listeners .publish_command_start (
10651065 cmd , self .db_name ,
@@ -1092,10 +1092,10 @@ def _fail(self, request_id, failure, duration):
10921092class _EncryptedBulkWriteContext (_BulkWriteContext ):
10931093 __slots__ = ()
10941094
1095- def _batch_command (self , docs ):
1095+ def _batch_command (self , cmd , docs ):
10961096 namespace = self .db_name + '.$cmd'
10971097 msg , to_send = _encode_batched_write_command (
1098- namespace , self .op_type , self . command , docs , self .check_keys ,
1098+ namespace , self .op_type , cmd , docs , self .check_keys ,
10991099 self .codec , self )
11001100 if not to_send :
11011101 raise InvalidOperation ("cannot do an empty bulk write" )
@@ -1106,17 +1106,18 @@ def _batch_command(self, docs):
11061106 DEFAULT_RAW_BSON_OPTIONS )
11071107 return cmd , to_send
11081108
1109- def execute (self , docs , client ):
1110- cmd , to_send = self ._batch_command (docs )
1109+ def execute (self , cmd , docs , client ):
1110+ batched_cmd , to_send = self ._batch_command (cmd , docs )
11111111 result = self .sock_info .command (
1112- self .db_name , cmd , codec_options = _UNICODE_REPLACE_CODEC_OPTIONS ,
1112+ self .db_name , batched_cmd ,
1113+ codec_options = _UNICODE_REPLACE_CODEC_OPTIONS ,
11131114 session = self .session , client = client )
11141115 return result , to_send
11151116
1116- def execute_unack (self , docs , client ):
1117- cmd , to_send = self ._batch_command (docs )
1117+ def execute_unack (self , cmd , docs , client ):
1118+ batched_cmd , to_send = self ._batch_command (cmd , docs )
11181119 self .sock_info .command (
1119- self .db_name , cmd , write_concern = WriteConcern (w = 0 ),
1120+ self .db_name , batched_cmd , write_concern = WriteConcern (w = 0 ),
11201121 session = self .session , client = client )
11211122 return to_send
11221123
0 commit comments