3838import java .util .Map ;
3939
4040import static com .mongodb .ReadPreference .primary ;
41+ import static com .mongodb .ReadPreference .primaryPreferred ;
4142import static com .mongodb .assertions .Assertions .isTrue ;
4243import static com .mongodb .connection .BsonWriterHelper .writePayload ;
44+ import static com .mongodb .connection .ClusterConnectionMode .MULTIPLE ;
45+ import static com .mongodb .connection .ClusterConnectionMode .SINGLE ;
46+ import static com .mongodb .connection .ServerType .SHARD_ROUTER ;
4347
4448/**
4549 * A command message that uses OP_MSG or OP_QUERY to send the command.
@@ -52,15 +56,18 @@ final class CommandMessage extends RequestMessage {
5256 private final SplittablePayload payload ;
5357 private final FieldNameValidator payloadFieldNameValidator ;
5458 private final boolean responseExpected ;
59+ private final ClusterConnectionMode clusterConnectionMode ;
5560
5661 CommandMessage (final MongoNamespace namespace , final BsonDocument command , final FieldNameValidator commandFieldNameValidator ,
5762 final ReadPreference readPreference , final MessageSettings settings ) {
58- this (namespace , command , commandFieldNameValidator , readPreference , settings , true , null , null );
63+ this (namespace , command , commandFieldNameValidator , readPreference , settings , true , null , null ,
64+ MULTIPLE );
5965 }
6066
6167 CommandMessage (final MongoNamespace namespace , final BsonDocument command , final FieldNameValidator commandFieldNameValidator ,
6268 final ReadPreference readPreference , final MessageSettings settings , final boolean responseExpected ,
63- final SplittablePayload payload , final FieldNameValidator payloadFieldNameValidator ) {
69+ final SplittablePayload payload , final FieldNameValidator payloadFieldNameValidator ,
70+ final ClusterConnectionMode clusterConnectionMode ) {
6471 super (namespace .getFullName (), getOpCode (settings ), settings );
6572 this .namespace = namespace ;
6673 this .command = command ;
@@ -69,6 +76,7 @@ final class CommandMessage extends RequestMessage {
6976 this .responseExpected = responseExpected ;
7077 this .payload = payload ;
7178 this .payloadFieldNameValidator = payloadFieldNameValidator ;
79+ this .clusterConnectionMode = clusterConnectionMode ;
7280 }
7381
7482 BsonDocument getCommandDocument (final ByteBufferBsonOutput bsonOutput ) {
@@ -102,7 +110,7 @@ boolean containsPayload() {
102110
103111 boolean isResponseExpected () {
104112 isTrue ("The message must be encoded before determining if a response is expected" , getEncodingMetadata () != null );
105- return calculateIsResponseExpected ();
113+ return ! useOpMsg () || requireOpMsgResponse ();
106114 }
107115
108116 ReadPreference getReadPreference () {
@@ -134,9 +142,9 @@ protected EncodingMetadata encodeMessageBodyWithMetadata(final BsonOutput bsonOu
134142 }
135143
136144 // Write the flag bits
137- bsonOutput .writeInt32 (flagPosition , getFlagBits ());
145+ bsonOutput .writeInt32 (flagPosition , getOpMsgFlagBits ());
138146 } else {
139- bsonOutput .writeInt32 (0 );
147+ bsonOutput .writeInt32 (getOpQueryFlagBits () );
140148 bsonOutput .writeCString (namespace .getFullName ());
141149 bsonOutput .writeInt32 (0 );
142150 bsonOutput .writeInt32 (-1 );
@@ -167,20 +175,44 @@ private void addDocumentWithPayload(final BsonOutput bsonOutput, final int messa
167175 getCodec (commandToEncode ).encode (bsonWriter , commandToEncode , EncoderContext .builder ().build ());
168176 }
169177
170- private int getFlagBits () {
171- if (calculateIsResponseExpected ()) {
178+ private int getOpMsgFlagBits () {
179+ return getOpMsgResponseExpectedFlagBit ();
180+ }
181+
182+ private int getOpMsgResponseExpectedFlagBit () {
183+ if (requireOpMsgResponse ()) {
172184 return 0 ;
173185 } else {
174186 return 1 << 1 ;
175187 }
176188 }
177189
178- private boolean calculateIsResponseExpected () {
179- // If there is another message in the payload require that the response is acknowledged
180- if (!responseExpected && useOpMsg () && payload != null && payload .hasAnotherSplit ()) {
190+ private boolean requireOpMsgResponse () {
191+ if (responseExpected ) {
181192 return true ;
193+ } else {
194+ return payload != null && payload .hasAnotherSplit ();
182195 }
183- return responseExpected ;
196+ }
197+
198+ private int getOpQueryFlagBits () {
199+ return getOpQuerySlaveOkFlagBit ();
200+ }
201+
202+ private int getOpQuerySlaveOkFlagBit () {
203+ if (isSlaveOk ()) {
204+ return 1 << 2 ;
205+ } else {
206+ return 0 ;
207+ }
208+ }
209+
210+ private boolean isSlaveOk () {
211+ return readPreference .isSlaveOk () || isDirectConnectionToNonShardRouter ();
212+ }
213+
214+ private boolean isDirectConnectionToNonShardRouter () {
215+ return clusterConnectionMode == SINGLE && getSettings ().getServerType () != SHARD_ROUTER ;
184216 }
185217
186218 private boolean useOpMsg () {
@@ -206,10 +238,13 @@ private List<BsonElement> getExtraElements(final SessionContext sessionContext)
206238 }
207239 if (!isDefaultReadPreference (getReadPreference ())) {
208240 extraElements .add (new BsonElement ("$readPreference" , getReadPreference ().toDocument ()));
241+ } else if (isDirectConnectionToNonShardRouter ()) {
242+ extraElements .add (new BsonElement ("$readPreference" , primaryPreferred ().toDocument ()));
209243 }
210244 return extraElements ;
211245 }
212246
247+ @ SuppressWarnings ("BooleanMethodIsAlwaysInverted" )
213248 private boolean isDefaultReadPreference (final ReadPreference readPreference ) {
214249 return readPreference .equals (primary ());
215250 }
0 commit comments