@@ -152,8 +152,92 @@ class CommandMessageSpecification extends Specification {
152152 ]
153153 }
154154
155- def ' should respect the message settings limits ' () {
155+ def ' should respect the max message size ' () {
156156 given :
157+ def maxMessageSize = 1024
158+ def messageSettings = MessageSettings . builder(). maxMessageSize(maxMessageSize). serverVersion(new ServerVersion (3 , 6 )). build()
159+ def payload = new SplittablePayload (INSERT , [new BsonDocument (' a' , new BsonBinary (new byte [922 ])),
160+ new BsonDocument (' b' , new BsonBinary (new byte [450 ])),
161+ new BsonDocument (' c' , new BsonBinary (new byte [459 ])),
162+ new BsonDocument (' b' , new BsonBinary (new byte [450 ])),
163+ new BsonDocument (' c' , new BsonBinary (new byte [460 ]))])
164+ def message = new CommandMessage (namespace, command, fieldNameValidator, ReadPreference . primary(), messageSettings,
165+ false , payload, fieldNameValidator)
166+ def output = new BasicOutputBuffer ()
167+ def sessionContext = Stub (SessionContext )
168+
169+ when :
170+ message. encode(output, sessionContext)
171+ def byteBuf = new ByteBufNIO (ByteBuffer . wrap(output. toByteArray()))
172+ def messageHeader = new MessageHeader (byteBuf, maxMessageSize)
173+
174+ then :
175+ messageHeader. opCode == OpCode . OP_MSG . value
176+ messageHeader. requestId < RequestMessage . currentGlobalId
177+ messageHeader. responseTo == 0
178+ messageHeader. messageLength == 1024
179+ byteBuf. getInt() == 0
180+ payload. getPosition() == 1
181+ payload. hasAnotherSplit()
182+
183+ when :
184+ payload = payload. getNextSplit()
185+ message = new CommandMessage (namespace, command, fieldNameValidator, ReadPreference . primary(), messageSettings,
186+ false , payload, fieldNameValidator)
187+ output. truncateToPosition(0 )
188+ message. encode(output, sessionContext)
189+ byteBuf = new ByteBufNIO (ByteBuffer . wrap(output. toByteArray()))
190+ messageHeader = new MessageHeader (byteBuf, maxMessageSize)
191+
192+ then :
193+ messageHeader. opCode == OpCode . OP_MSG . value
194+ messageHeader. requestId < RequestMessage . currentGlobalId
195+ messageHeader. responseTo == 0
196+ messageHeader. messageLength == 1024
197+ byteBuf. getInt() == 0
198+ payload. getPosition() == 2
199+ payload. hasAnotherSplit()
200+
201+ when :
202+ payload = payload. getNextSplit()
203+ message = new CommandMessage (namespace, command, fieldNameValidator, ReadPreference . primary(), messageSettings,
204+ false , payload, fieldNameValidator)
205+ output. truncateToPosition(0 )
206+ message. encode(output, sessionContext)
207+ byteBuf = new ByteBufNIO (ByteBuffer . wrap(output. toByteArray()))
208+ messageHeader = new MessageHeader (byteBuf, maxMessageSize)
209+
210+ then :
211+ messageHeader. opCode == OpCode . OP_MSG . value
212+ messageHeader. requestId < RequestMessage . currentGlobalId
213+ messageHeader. responseTo == 0
214+ messageHeader. messageLength == 552
215+ byteBuf. getInt() == 0
216+ payload. getPosition() == 1
217+ payload. hasAnotherSplit()
218+
219+ when :
220+ payload = payload. getNextSplit()
221+ message = new CommandMessage (namespace, command, fieldNameValidator, ReadPreference . primary(), messageSettings,
222+ false , payload, fieldNameValidator)
223+ output. truncateToPosition(0 )
224+ message. encode(output, sessionContext)
225+ byteBuf = new ByteBufNIO (ByteBuffer . wrap(output. toByteArray()))
226+ messageHeader = new MessageHeader (byteBuf, maxMessageSize)
227+
228+ then :
229+ messageHeader. opCode == OpCode . OP_MSG . value
230+ messageHeader. requestId < RequestMessage . currentGlobalId
231+ messageHeader. responseTo == 0
232+ messageHeader. messageLength == 562
233+ byteBuf. getInt() == 1 << 1
234+ payload. getPosition() == 1
235+ ! payload. hasAnotherSplit()
236+ }
237+
238+ def ' should respect the max batch count' () {
239+ given :
240+ def messageSettings = MessageSettings . builder(). maxBatchCount(2 ). serverVersion(new ServerVersion (3 , 6 )). build()
157241 def payload = new SplittablePayload (INSERT , [new BsonDocument (' a' , new BsonBinary (new byte [900 ])),
158242 new BsonDocument (' b' , new BsonBinary (new byte [450 ])),
159243 new BsonDocument (' c' , new BsonBinary (new byte [450 ]))])
@@ -171,12 +255,12 @@ class CommandMessageSpecification extends Specification {
171255 messageHeader. opCode == OpCode . OP_MSG . value
172256 messageHeader. requestId < RequestMessage . currentGlobalId
173257 messageHeader. responseTo == 0
258+ messageHeader. messageLength == 1465
174259 byteBuf. getInt() == 0
175- payload. getPosition() == pos1
260+ payload. getPosition() == 2
176261 payload. hasAnotherSplit()
177262
178263 when :
179- def initialRequestId = messageHeader. requestId
180264 payload = payload. getNextSplit()
181265 message = new CommandMessage (namespace, command, fieldNameValidator, ReadPreference . primary(), messageSettings,
182266 false , payload, fieldNameValidator)
@@ -188,16 +272,10 @@ class CommandMessageSpecification extends Specification {
188272 then :
189273 messageHeader. opCode == OpCode . OP_MSG . value
190274 messageHeader. requestId < RequestMessage . currentGlobalId
191- messageHeader. requestId > initialRequestId
192275 messageHeader. responseTo == 0
193276 byteBuf. getInt() == 1 << 1
194- payload. getPosition() == pos2
277+ payload. getPosition() == 1
195278 ! payload. hasAnotherSplit()
196-
197- where :
198- pos1 | pos2 | messageSettings
199- 1 | 2 | MessageSettings . builder(). maxMessageSize(1024 ). serverVersion(new ServerVersion (3 , 6 )). build()
200- 2 | 1 | MessageSettings . builder(). maxBatchCount(2 ). serverVersion(new ServerVersion (3 , 6 )). build()
201279 }
202280
203281 def ' should throw if payload document bigger than max document size' () {
0 commit comments