Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/sdam/server_selection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export function sameServerSelector(description?: ServerDescription): ServerSelec
* server potentially for a write on a secondary.
*/
export function secondaryWritableServerSelector(
wireVersion?: number,
wireVersion: number,
readPreference?: ReadPreference
): ServerSelector {
// If server version < 5.0, read preference always primary.
Expand Down
2 changes: 1 addition & 1 deletion src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
return result;
}

get commonWireVersion(): number | undefined {
get commonWireVersion(): number {
return this.description.commonWireVersion;
}

Expand Down
2 changes: 1 addition & 1 deletion src/sdam/topology_description.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ export class TopologyDescription {

// update common wire version
if (serverDescription.maxWireVersion !== 0) {
if (commonWireVersion == null) {
if (commonWireVersion === 0) {
commonWireVersion = serverDescription.maxWireVersion;
} else {
commonWireVersion = Math.min(commonWireVersion, serverDescription.maxWireVersion);
Expand Down
69 changes: 69 additions & 0 deletions test/integration/crud/aggregation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -870,4 +870,73 @@ describe('Aggregation', function () {
.finally(() => client.close());
}
});

it(
'should perform aggregations with a write stage on secondary when readPreference is secondary',
{
metadata: { requires: { topology: 'replicaset' } },
async test() {
const databaseName = this.configuration.db;
const client = this.configuration.newClient(this.configuration.writeConcernMax(), {
maxPoolSize: 1,
monitorCommands: true
});

const events = [];
client.on('commandStarted', filterForCommands(['hello', 'aggregate'], events));

// Discover primary to be able to check the actual server address
await client.db('admin').command({ hello: 1 });
const [helloEvent] = events;
const primaryAddress = helloEvent.address;

// Clear events
events.length = 0;

const src = client.db(databaseName).collection('read_pref_src');
const outMerge = client.db(databaseName).collection('read_pref_merge_out');
const outOut = client.db(databaseName).collection('read_pref_out_out');

await Promise.all([src.deleteMany({}), outMerge.deleteMany({}), outOut.deleteMany({})]);
await src.insertMany([{ a: 1 }, { a: 2 }]);
await Promise.all([
src
.aggregate(
[
{
$merge: {
into: 'read_pref_merge_out',
whenMatched: 'replace',
whenNotMatched: 'insert'
}
}
],
{ readPreference: 'secondary' }
)
.toArray(),
src
.aggregate(
[
{
$out: 'read_pref_out_out'
}
],
{ readPreference: 'secondary' }
)
.toArray()
]);

expect(events).to.have.length(2);
events.forEach(event => {
expect(event).to.have.property('commandName', 'aggregate');
expect(event.address).to.not.equal(primaryAddress);
expect(event).to.have.deep.nested.property('command.$readPreference', {
mode: 'secondary'
});
});

await client.close();
}
}
);
});
72 changes: 2 additions & 70 deletions test/unit/sdam/server_selection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -259,23 +259,6 @@ describe('server selection', function () {
});
});
});

context('when a common wire version is not provided', function () {
const topologyDescription = new TopologyDescription(
TopologyType.ReplicaSetWithPrimary,
serverDescriptions,
'test',
MIN_SECONDARY_WRITE_WIRE_VERSION,
new ObjectId(),
MIN_SECONDARY_WRITE_WIRE_VERSION
);
const selector = secondaryWritableServerSelector(undefined, ReadPreference.secondary);
const servers = selector(topologyDescription, Array.from(serverDescriptions.values()));

it('selects a primary', function () {
expect(servers).to.deep.equal([primary]);
});
});
});

context('when the topology is sharded', function () {
Expand Down Expand Up @@ -345,23 +328,6 @@ describe('server selection', function () {
});
});
});

context('when a common wire version is not provided', function () {
const topologyDescription = new TopologyDescription(
TopologyType.Sharded,
serverDescriptions,
'test',
MIN_SECONDARY_WRITE_WIRE_VERSION,
new ObjectId(),
MIN_SECONDARY_WRITE_WIRE_VERSION
);
const selector = secondaryWritableServerSelector();
const servers = selector(topologyDescription, Array.from(serverDescriptions.values()));

it('selects a mongos', function () {
expect(servers).to.deep.equal([mongos]);
});
});
});

context('when the topology is load balanced', function () {
Expand Down Expand Up @@ -431,23 +397,6 @@ describe('server selection', function () {
});
});
});

context('when a common wire version is not provided', function () {
const topologyDescription = new TopologyDescription(
TopologyType.LoadBalanced,
serverDescriptions,
'test',
MIN_SECONDARY_WRITE_WIRE_VERSION,
new ObjectId(),
MIN_SECONDARY_WRITE_WIRE_VERSION
);
const selector = secondaryWritableServerSelector();
const servers = selector(topologyDescription, Array.from(serverDescriptions.values()));

it('selects a load balancer', function () {
expect(servers).to.deep.equal([loadBalancer]);
});
});
});

context('when the topology is single', function () {
Expand Down Expand Up @@ -517,23 +466,6 @@ describe('server selection', function () {
});
});
});

context('when a common wire version is not provided', function () {
const topologyDescription = new TopologyDescription(
TopologyType.Single,
serverDescriptions,
'test',
MIN_SECONDARY_WRITE_WIRE_VERSION,
new ObjectId(),
MIN_SECONDARY_WRITE_WIRE_VERSION
);
const selector = secondaryWritableServerSelector();
const servers = selector(topologyDescription, Array.from(serverDescriptions.values()));

it('selects a standalone', function () {
expect(servers).to.deep.equal([single]);
});
});
});

context('localThresholdMS is respected as an option', function () {
Expand Down Expand Up @@ -580,7 +512,7 @@ describe('server selection', function () {
new ObjectId(),
MIN_SECONDARY_WRITE_WIRE_VERSION
);
const selector = secondaryWritableServerSelector();
const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION);
const servers = selector(topologyDescription, Array.from(serverDescriptions.values()));
expect(servers).to.have.lengthOf(2);
const selectedAddresses = new Set(servers.map(({ address }) => address));
Expand All @@ -599,7 +531,7 @@ describe('server selection', function () {
MIN_SECONDARY_WRITE_WIRE_VERSION,
{ localThresholdMS: 5 }
);
const selector = secondaryWritableServerSelector();
const selector = secondaryWritableServerSelector(MIN_SECONDARY_WRITE_WIRE_VERSION);
const servers = selector(topologyDescription, Array.from(serverDescriptions.values()));
expect(servers).to.have.lengthOf(1);
const selectedAddresses = new Set(servers.map(({ address }) => address));
Expand Down
63 changes: 63 additions & 0 deletions test/unit/sdam/topology_description.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { expect } from 'chai';

import { TopologyType } from '../../../src/sdam/common';
import { ServerDescription } from '../../../src/sdam/server_description';
import { TopologyDescription } from '../../../src/sdam/topology_description';

describe('TopologyDescription', function () {
describe('#constructor', function () {
it('sets commonWireVersion to 0', function () {
const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary);

expect(initial.commonWireVersion).to.equal(0);
});
});

describe('update()', function () {
it('initializes commonWireVersion from first non-zero maxWireVersion', function () {
const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary);

const sd1 = new ServerDescription('a:27017', {
maxWireVersion: 25
});

const updated = initial.update(sd1);

expect(updated.commonWireVersion).to.equal(25);
});

it('tracks the minimum non-zero maxWireVersion across updates in commonWireVersion', function () {
const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary);

const sd1 = new ServerDescription('a:27017', {
maxWireVersion: 25
});

const sd2 = new ServerDescription('b:27017', {
maxWireVersion: 21
});

let updated = initial.update(sd1);
updated = updated.update(sd2);

expect(updated.commonWireVersion).to.equal(21);
});

it('ignores servers with maxWireVersion === 0 when computing commonWireVersion', function () {
const initial = new TopologyDescription(TopologyType.ReplicaSetWithPrimary);

const sd1 = new ServerDescription('a:27017', {
maxWireVersion: 25
});

const sdUnknown = new ServerDescription('b:27017', {
maxWireVersion: 0
});

let updated = initial.update(sd1);
updated = updated.update(sdUnknown);

expect(updated.commonWireVersion).to.equal(25);
});
});
});