diff --git a/src/sdam/server_selection.ts b/src/sdam/server_selection.ts index 03b6e95938..ed8ad73dbe 100644 --- a/src/sdam/server_selection.ts +++ b/src/sdam/server_selection.ts @@ -56,7 +56,7 @@ export function sameServerSelector(description?: ServerDescription): ServerSelec * server potentially for a write on a secondary. */ export function secondaryWritableServerSelector( - wireVersion?: number, + wireVersion: number, readPreference?: ReadPreference ): ServerSelector { // If server version < 5.0, read preference always primary. diff --git a/src/sdam/topology.ts b/src/sdam/topology.ts index eba356b020..f052cbd6c3 100644 --- a/src/sdam/topology.ts +++ b/src/sdam/topology.ts @@ -771,7 +771,7 @@ export class Topology extends TypedEventEmitter { return result; } - get commonWireVersion(): number | undefined { + get commonWireVersion(): number { return this.description.commonWireVersion; } diff --git a/src/sdam/topology_description.ts b/src/sdam/topology_description.ts index 55eb48bb35..a1c301dbc2 100644 --- a/src/sdam/topology_description.ts +++ b/src/sdam/topology_description.ts @@ -192,7 +192,7 @@ export class TopologyDescription { // update common wire version if (serverDescription.maxWireVersion !== 0) { - if (commonWireVersion == null) { + if (commonWireVersion === 0) { commonWireVersion = serverDescription.maxWireVersion; } else { commonWireVersion = Math.min(commonWireVersion, serverDescription.maxWireVersion); diff --git a/test/integration/crud/aggregation.test.ts b/test/integration/crud/aggregation.test.ts index 92602f7b87..c8310e8bdd 100644 --- a/test/integration/crud/aggregation.test.ts +++ b/test/integration/crud/aggregation.test.ts @@ -870,4 +870,73 @@ describe('Aggregation', function () { .finally(() => client.close()); } }); + + it( + 'should perform aggregations with a write stage on secondary when readPreference is secondary', + { + metadata: { requires: { topology: 'replicaset' } }, + async test() { + const databaseName = this.configuration.db; + const client = this.configuration.newClient(this.configuration.writeConcernMax(), { + maxPoolSize: 1, + monitorCommands: true + }); + + const events = []; + client.on('commandStarted', filterForCommands(['hello', 'aggregate'], events)); + + // Discover primary to be able to check the actual server address + await client.db('admin').command({ hello: 1 }); + const [helloEvent] = events; + const primaryAddress = helloEvent.address; + + // Clear events + events.length = 0; + + const src = client.db(databaseName).collection('read_pref_src'); + const outMerge = client.db(databaseName).collection('read_pref_merge_out'); + const outOut = client.db(databaseName).collection('read_pref_out_out'); + + await Promise.all([src.deleteMany({}), outMerge.deleteMany({}), outOut.deleteMany({})]); + await src.insertMany([{ a: 1 }, { a: 2 }]); + await Promise.all([ + src + .aggregate( + [ + { + $merge: { + into: 'read_pref_merge_out', + whenMatched: 'replace', + whenNotMatched: 'insert' + } + } + ], + { readPreference: 'secondary' } + ) + .toArray(), + src + .aggregate( + [ + { + $out: 'read_pref_out_out' + } + ], + { readPreference: 'secondary' } + ) + .toArray() + ]); + + expect(events).to.have.length(2); + events.forEach(event => { + expect(event).to.have.property('commandName', 'aggregate'); + expect(event.address).to.not.equal(primaryAddress); + expect(event).to.have.deep.nested.property('command.$readPreference', { + mode: 'secondary' + }); + }); + + await client.close(); + } + } + ); }); diff --git a/test/unit/sdam/server_selection.test.ts b/test/unit/sdam/server_selection.test.ts index 4c0a45985d..93fc7fbf27 100644 --- a/test/unit/sdam/server_selection.test.ts +++ b/test/unit/sdam/server_selection.test.ts @@ -259,23 +259,6 @@ describe('server selection', function () { }); }); }); - - context('when a common wire version is not provided', function () { - const topologyDescription = new TopologyDescription( - TopologyType.ReplicaSetWithPrimary, - serverDescriptions, - 'test', - MIN_SECONDARY_WRITE_WIRE_VERSION, - new ObjectId(), - MIN_SECONDARY_WRITE_WIRE_VERSION - ); - const selector = secondaryWritableServerSelector(undefined, ReadPreference.secondary); - const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); - - it('selects a primary', function () { - expect(servers).to.deep.equal([primary]); - }); - }); }); context('when the topology is sharded', function () { @@ -345,23 +328,6 @@ describe('server selection', function () { }); }); }); - - context('when a common wire version is not provided', function () { - const topologyDescription = new TopologyDescription( - TopologyType.Sharded, - serverDescriptions, - 'test', - MIN_SECONDARY_WRITE_WIRE_VERSION, - new ObjectId(), - MIN_SECONDARY_WRITE_WIRE_VERSION - ); - const selector = secondaryWritableServerSelector(); - const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); - - it('selects a mongos', function () { - expect(servers).to.deep.equal([mongos]); - }); - }); }); context('when the topology is load balanced', function () { @@ -431,23 +397,6 @@ describe('server selection', function () { }); }); }); - - context('when a common wire version is not provided', function () { - const topologyDescription = new TopologyDescription( - TopologyType.LoadBalanced, - serverDescriptions, - 'test', - MIN_SECONDARY_WRITE_WIRE_VERSION, - new ObjectId(), - MIN_SECONDARY_WRITE_WIRE_VERSION - ); - const selector = secondaryWritableServerSelector(); - const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); - - it('selects a load balancer', function () { - expect(servers).to.deep.equal([loadBalancer]); - }); - }); }); context('when the topology is single', function () { @@ -517,23 +466,6 @@ describe('server selection', function () { }); }); }); - - context('when a common wire version is not provided', function () { - const topologyDescription = new TopologyDescription( - TopologyType.Single, - serverDescriptions, - 'test', - MIN_SECONDARY_WRITE_WIRE_VERSION, - new ObjectId(), - MIN_SECONDARY_WRITE_WIRE_VERSION - ); - const selector = secondaryWritableServerSelector(); - const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); - - it('selects a standalone', function () { - expect(servers).to.deep.equal([single]); - }); - }); }); context('localThresholdMS is respected as an option', function () { @@ -580,7 +512,7 @@ describe('server selection', function () { new ObjectId(), MIN_SECONDARY_WRITE_WIRE_VERSION ); - const selector = secondaryWritableServerSelector(); + const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION); const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); expect(servers).to.have.lengthOf(2); const selectedAddresses = new Set(servers.map(({ address }) => address)); @@ -599,7 +531,7 @@ describe('server selection', function () { MIN_SECONDARY_WRITE_WIRE_VERSION, { localThresholdMS: 5 } ); - const selector = secondaryWritableServerSelector(); + const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION); const servers = selector(topologyDescription, Array.from(serverDescriptions.values())); expect(servers).to.have.lengthOf(1); const selectedAddresses = new Set(servers.map(({ address }) => address)); diff --git a/test/unit/sdam/topology_description.test.ts b/test/unit/sdam/topology_description.test.ts new file mode 100644 index 0000000000..01c572cea9 --- /dev/null +++ b/test/unit/sdam/topology_description.test.ts @@ -0,0 +1,63 @@ +import { expect } from 'chai'; + +import { TopologyType } from '../../../src/sdam/common'; +import { ServerDescription } from '../../../src/sdam/server_description'; +import { TopologyDescription } from '../../../src/sdam/topology_description'; + +describe('TopologyDescription', function () { + describe('#constructor', function () { + it('sets commonWireVersion to 0', function () { + const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary); + + expect(initial.commonWireVersion).to.equal(0); + }); + }); + + describe('update()', function () { + it('initializes commonWireVersion from first non-zero maxWireVersion', function () { + const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary); + + const sd1 = new ServerDescription('a:27017', { + maxWireVersion: 25 + }); + + const updated = initial.update(sd1); + + expect(updated.commonWireVersion).to.equal(25); + }); + + it('tracks the minimum non-zero maxWireVersion across updates in commonWireVersion', function () { + const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary); + + const sd1 = new ServerDescription('a:27017', { + maxWireVersion: 25 + }); + + const sd2 = new ServerDescription('b:27017', { + maxWireVersion: 21 + }); + + let updated = initial.update(sd1); + updated = updated.update(sd2); + + expect(updated.commonWireVersion).to.equal(21); + }); + + it('ignores servers with maxWireVersion === 0 when computing commonWireVersion', function () { + const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary); + + const sd1 = new ServerDescription('a:27017', { + maxWireVersion: 25 + }); + + const sdUnknown = new ServerDescription('b:27017', { + maxWireVersion: 0 + }); + + let updated = initial.update(sd1); + updated = updated.update(sdUnknown); + + expect(updated.commonWireVersion).to.equal(25); + }); + }); +});