|
16 | 16 |
|
17 | 17 | package com.mongodb.internal.connection |
18 | 18 |
|
19 | | - |
20 | 19 | import com.mongodb.MongoCommandException |
21 | 20 | import com.mongodb.MongoInternalException |
22 | 21 | import com.mongodb.MongoNamespace |
@@ -44,10 +43,13 @@ import com.mongodb.internal.session.SessionContext |
44 | 43 | import com.mongodb.internal.validator.NoOpFieldNameValidator |
45 | 44 | import org.bson.BsonDocument |
46 | 45 | import org.bson.BsonInt32 |
| 46 | +import org.bson.BsonReader |
47 | 47 | import org.bson.BsonString |
48 | 48 | import org.bson.ByteBuf |
49 | 49 | import org.bson.ByteBufNIO |
50 | 50 | import org.bson.codecs.BsonDocumentCodec |
| 51 | +import org.bson.codecs.DecoderContext |
| 52 | +import org.bson.codecs.configuration.CodecConfigurationException |
51 | 53 | import spock.lang.Specification |
52 | 54 |
|
53 | 55 | import java.nio.ByteBuffer |
@@ -504,6 +506,29 @@ class InternalStreamConnectionSpecification extends Specification { |
504 | 506 | new BsonDocument('ok', new BsonInt32(1)), 1000)]) |
505 | 507 | } |
506 | 508 |
|
| 509 | + def 'should send events for successful command with decoding error'() { |
| 510 | + given: |
| 511 | + def connection = getOpenedConnection() |
| 512 | + def pingCommandDocument = new BsonDocument('ping', new BsonInt32(1)) |
| 513 | + def commandMessage = new CommandMessage(cmdNamespace, pingCommandDocument, fieldNameValidator, primary(), messageSettings, null) |
| 514 | + stream.getBuffer(1024) >> { new ByteBufNIO(ByteBuffer.wrap(new byte[1024])) } |
| 515 | + stream.read(16, 0) >> helper.defaultMessageHeader(commandMessage.getId()) |
| 516 | + stream.read(90, 0) >> helper.defaultReply() |
| 517 | + |
| 518 | + when: |
| 519 | + connection.sendAndReceive(commandMessage, { |
| 520 | + BsonReader reader, DecoderContext decoderContext -> throw new CodecConfigurationException('') |
| 521 | + }, NoOpSessionContext.INSTANCE, IgnorableRequestContext.INSTANCE) |
| 522 | + |
| 523 | + then: |
| 524 | + thrown(CodecConfigurationException) |
| 525 | + commandListener.eventsWereDelivered([ |
| 526 | + new CommandStartedEvent(1, connection.getDescription(), 'admin', 'ping', |
| 527 | + pingCommandDocument.append('$db', new BsonString('admin'))), |
| 528 | + new CommandSucceededEvent(1, connection.getDescription(), 'ping', |
| 529 | + new BsonDocument('ok', new BsonInt32(1)), 1000)]) |
| 530 | + } |
| 531 | + |
507 | 532 | def 'should extract cluster and operation time into session context'() { |
508 | 533 | given: |
509 | 534 | def connection = getOpenedConnection() |
@@ -747,6 +772,39 @@ class InternalStreamConnectionSpecification extends Specification { |
747 | 772 | new BsonDocument('ok', new BsonInt32(1)), 1000)]) |
748 | 773 | } |
749 | 774 |
|
| 775 | + def 'should send events for successful asynchronous command with decoding error'() { |
| 776 | + given: |
| 777 | + def connection = getOpenedConnection() |
| 778 | + def pingCommandDocument = new BsonDocument('ping', new BsonInt32(1)) |
| 779 | + def commandMessage = new CommandMessage(cmdNamespace, pingCommandDocument, fieldNameValidator, primary(), messageSettings, null) |
| 780 | + def callback = new FutureResultCallback() |
| 781 | + |
| 782 | + stream.getBuffer(1024) >> { new ByteBufNIO(ByteBuffer.wrap(new byte[1024])) } |
| 783 | + stream.writeAsync(_, _) >> { buffers, handler -> |
| 784 | + handler.completed(null) |
| 785 | + } |
| 786 | + stream.readAsync(16, _) >> { numBytes, handler -> |
| 787 | + handler.completed(helper.defaultMessageHeader(commandMessage.getId())) |
| 788 | + } |
| 789 | + stream.readAsync(90, _) >> { numBytes, handler -> |
| 790 | + handler.completed(helper.defaultReply()) |
| 791 | + } |
| 792 | + |
| 793 | + when: |
| 794 | + connection.sendAndReceiveAsync(commandMessage, { |
| 795 | + BsonReader reader, DecoderContext decoderContext -> throw new CodecConfigurationException('') |
| 796 | + }, NoOpSessionContext.INSTANCE, IgnorableRequestContext.INSTANCE, callback) |
| 797 | + callback.get() |
| 798 | + |
| 799 | + then: |
| 800 | + thrown(CodecConfigurationException) |
| 801 | + commandListener.eventsWereDelivered([ |
| 802 | + new CommandStartedEvent(1, connection.getDescription(), 'admin', 'ping', |
| 803 | + pingCommandDocument.append('$db', new BsonString('admin'))), |
| 804 | + new CommandSucceededEvent(1, connection.getDescription(), 'ping', |
| 805 | + new BsonDocument('ok', new BsonInt32(1)), 1000)]) |
| 806 | + } |
| 807 | + |
750 | 808 |
|
751 | 809 | def 'should send events for asynchronous command failure with exception writing message'() { |
752 | 810 | given: |
|
0 commit comments