Skip to content

Commit 5cbe016

Browse files
SERVER-42152 Delete existing chunks on new epoch in persisted routing table cache
1 parent b33f7eb commit 5cbe016

File tree

6 files changed

+139
-2
lines changed

6 files changed

+139
-2
lines changed

buildscripts/resmokeconfig/suites/sharding_continuous_config_stepdown.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ selector:
206206
# TODO (SERVER-42143): The tests below need transactional support for refineCollectionShardKey.
207207
- jstests/sharding/refine_collection_shard_key_basic.js
208208
- jstests/sharding/refine_collection_shard_key_jumbo.js
209+
- jstests/sharding/refine_collection_shard_key_drops_chunks.js
209210

210211
executor:
211212
config:

buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ selector:
142142
- jstests/sharding/explain_exec_stats_on_shards.js
143143
- jstests/sharding/refine_collection_shard_key_basic.js
144144
- jstests/sharding/refine_collection_shard_key_jumbo.js
145+
- jstests/sharding/refine_collection_shard_key_drops_chunks.js
145146
- jstests/sharding/move_primary_clone_test.js
146147
- jstests/sharding/database_versioning_safe_secondary_reads.js
147148
- jstests/sharding/clone_catalog_data.js
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
//
2+
// Tests that refineCollectionShardKey deletes all existing chunks in the persisted routing table
3+
// cache.
4+
//
5+
6+
(function() {
7+
'use strict';
8+
load('jstests/sharding/libs/sharded_transactions_helpers.js');
9+
10+
const st = new ShardingTest({shards: 1});
11+
const mongos = st.s0;
12+
const shard = st.shard0;
13+
const kDbName = 'db';
14+
const kCollName = 'foo';
15+
const kNsName = kDbName + '.' + kCollName;
16+
const kConfigCacheChunks = 'config.cache.chunks.' + kNsName;
17+
const oldKeyDoc = {
18+
a: 1,
19+
b: 1
20+
};
21+
const newKeyDoc = {
22+
a: 1,
23+
b: 1,
24+
c: 1,
25+
d: 1
26+
};
27+
28+
assert.commandWorked(mongos.adminCommand({enableSharding: kDbName}));
29+
assert.commandWorked(mongos.adminCommand({shardCollection: kNsName, key: oldKeyDoc}));
30+
assert.commandWorked(mongos.getCollection(kNsName).createIndex(newKeyDoc));
31+
32+
// Ensure that there exist three chunks belonging to 'db.foo' covering the entire key range.
33+
//
34+
// Chunk 1: {a: MinKey, b: MinKey} -->> {a: 0, b: 0}
35+
// Chunk 2: {a: 0, b: 0} -->> {a: 5, b: 5}
36+
// Chunk 3: {a: 5, b: 5} -->> {a: MaxKey, b: MaxKey}
37+
assert.commandWorked(mongos.adminCommand({split: kNsName, middle: {a: 0, b: 0}}));
38+
assert.commandWorked(mongos.adminCommand({split: kNsName, middle: {a: 5, b: 5}}));
39+
40+
// Flush the routing table cache and verify that 'config.cache.chunks.db.foo' is as expected
41+
// before refineCollectionShardKey.
42+
assert.commandWorked(shard.adminCommand({_flushRoutingTableCacheUpdates: kNsName}));
43+
let chunkArr = shard.getCollection(kConfigCacheChunks).find({}).sort({min: 1}).toArray();
44+
assert.eq(3, chunkArr.length);
45+
assert.eq({a: MinKey, b: MinKey}, chunkArr[0]._id);
46+
assert.eq({a: 0, b: 0}, chunkArr[0].max);
47+
assert.eq({a: 0, b: 0}, chunkArr[1]._id);
48+
assert.eq({a: 5, b: 5}, chunkArr[1].max);
49+
assert.eq({a: 5, b: 5}, chunkArr[2]._id);
50+
assert.eq({a: MaxKey, b: MaxKey}, chunkArr[2].max);
51+
52+
assert.commandWorked(mongos.adminCommand({refineCollectionShardKey: kNsName, key: newKeyDoc}));
53+
54+
// Enable failpoint 'hangPersistCollectionAndChangedChunksAfterDropChunks' and flush the routing
55+
// table cache.
56+
assert.commandWorked(shard.adminCommand({
57+
configureFailPoint: 'hangPersistCollectionAndChangedChunksAfterDropChunks',
58+
mode: 'alwaysOn'
59+
}));
60+
const awaitShellToFlushRoutingTableCacheUpdates = startParallelShell(() => {
61+
assert.commandWorked(db.adminCommand({_flushRoutingTableCacheUpdates: 'db.foo'}));
62+
}, st.rs0.getPrimary().port);
63+
64+
// Verify that all chunks belonging to 'db.foo' have been deleted.
65+
waitForFailpoint('Hit hangPersistCollectionAndChangedChunksAfterDropChunks', 1);
66+
chunkArr = shard.getCollection(kConfigCacheChunks).find({}).sort({min: 1}).toArray();
67+
assert.eq(0, chunkArr.length);
68+
69+
// Disable failpoint 'hangPersistCollectionAndChangedChunksAfterDropChunks' and continue
70+
// flushing the routing table cache.
71+
assert.commandWorked(shard.adminCommand(
72+
{configureFailPoint: 'hangPersistCollectionAndChangedChunksAfterDropChunks', mode: 'off'}));
73+
awaitShellToFlushRoutingTableCacheUpdates();
74+
75+
// Verify that 'config.cache.chunks.db.foo' is as expected after refineCollectionShardKey.
76+
chunkArr = shard.getCollection(kConfigCacheChunks).find({}).sort({min: 1}).toArray();
77+
assert.eq(3, chunkArr.length);
78+
assert.eq({a: MinKey, b: MinKey, c: MinKey, d: MinKey}, chunkArr[0]._id);
79+
assert.eq({a: 0, b: 0, c: MinKey, d: MinKey}, chunkArr[0].max);
80+
assert.eq({a: 0, b: 0, c: MinKey, d: MinKey}, chunkArr[1]._id);
81+
assert.eq({a: 5, b: 5, c: MinKey, d: MinKey}, chunkArr[1].max);
82+
assert.eq({a: 5, b: 5, c: MinKey, d: MinKey}, chunkArr[2]._id);
83+
assert.eq({a: MaxKey, b: MaxKey, c: MaxKey, d: MaxKey}, chunkArr[2].max);
84+
85+
st.stop();
86+
})();

src/mongo/db/s/shard_metadata_util.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,21 @@ Status dropChunksAndDeleteCollectionsEntry(OperationContext* opCtx, const Namesp
432432
}
433433
}
434434

435+
void dropChunks(OperationContext* opCtx, const NamespaceString& nss) {
436+
DBDirectClient client(opCtx);
437+
438+
// Drop the config.chunks collection associated with namespace 'nss'.
439+
BSONObj result;
440+
if (!client.dropCollection(ChunkType::ShardNSPrefix + nss.ns(), kLocalWriteConcern, &result)) {
441+
auto status = getStatusFromCommandResult(result);
442+
if (status != ErrorCodes::NamespaceNotFound) {
443+
uassertStatusOK(status);
444+
}
445+
}
446+
447+
LOG(1) << "Successfully cleared persisted chunk metadata for collection '" << nss << "'.";
448+
}
449+
435450
Status deleteDatabasesEntry(OperationContext* opCtx, StringData dbName) {
436451
try {
437452
DBDirectClient client(opCtx);

src/mongo/db/s/shard_metadata_util.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,11 @@ Status updateShardChunks(OperationContext* opCtx,
210210
*/
211211
Status dropChunksAndDeleteCollectionsEntry(OperationContext* opCtx, const NamespaceString& nss);
212212

213+
/**
214+
* Drops locally persisted chunk metadata associated with 'nss': only drops the chunks collection.
215+
*/
216+
void dropChunks(OperationContext* opCtx, const NamespaceString& nss);
217+
213218
/**
214219
* Deletes locally persisted database metadata associated with 'dbName': removes the databases
215220
* collection entry.

src/mongo/db/s/shard_server_catalog_cache_loader.cpp

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
#include "mongo/s/catalog/type_shard_database.h"
4747
#include "mongo/s/client/shard_registry.h"
4848
#include "mongo/s/grid.h"
49+
#include "mongo/util/fail_point_service.h"
4950
#include "mongo/util/log.h"
5051

5152
namespace mongo {
@@ -56,6 +57,8 @@ using CollectionAndChangedChunks = CatalogCacheLoader::CollectionAndChangedChunk
5657

5758
namespace {
5859

60+
MONGO_FAIL_POINT_DEFINE(hangPersistCollectionAndChangedChunksAfterDropChunks);
61+
5962
AtomicWord<unsigned long long> taskIdGenerator{0};
6063

6164
/**
@@ -75,6 +78,24 @@ ThreadPool::Options makeDefaultThreadPoolOptions() {
7578
return options;
7679
}
7780

81+
void dropChunksIfEpochChanged(OperationContext* opCtx,
82+
const NamespaceString& nss,
83+
const CollectionAndChangedChunks& collAndChunks,
84+
const ChunkVersion& maxLoaderVersion) {
85+
if (collAndChunks.epoch != maxLoaderVersion.epoch() &&
86+
maxLoaderVersion != ChunkVersion::UNSHARDED()) {
87+
// If the collection has a new epoch, delete all existing chunks in the persisted routing
88+
// table cache.
89+
dropChunks(opCtx, nss);
90+
91+
if (MONGO_FAIL_POINT(hangPersistCollectionAndChangedChunksAfterDropChunks)) {
92+
log() << "Hit hangPersistCollectionAndChangedChunksAfterDropChunks failpoint";
93+
MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(
94+
opCtx, hangPersistCollectionAndChangedChunksAfterDropChunks);
95+
}
96+
}
97+
}
98+
7899
/**
79100
* Takes a CollectionAndChangedChunks object and persists the changes to the shard's metadata
80101
* collections.
@@ -83,7 +104,8 @@ ThreadPool::Options makeDefaultThreadPoolOptions() {
83104
*/
84105
Status persistCollectionAndChangedChunks(OperationContext* opCtx,
85106
const NamespaceString& nss,
86-
const CollectionAndChangedChunks& collAndChunks) {
107+
const CollectionAndChangedChunks& collAndChunks,
108+
const ChunkVersion& maxLoaderVersion) {
87109
// Update the collections collection entry for 'nss' in case there are any new updates.
88110
ShardCollectionType update = ShardCollectionType(
89111
nss, collAndChunks.epoch, collAndChunks.shardKeyPattern, collAndChunks.shardKeyIsUnique);
@@ -107,6 +129,12 @@ Status persistCollectionAndChangedChunks(OperationContext* opCtx,
107129
}
108130

109131
// Update the chunks.
132+
try {
133+
dropChunksIfEpochChanged(opCtx, nss, collAndChunks, maxLoaderVersion);
134+
} catch (const DBException& ex) {
135+
return ex.toStatus();
136+
}
137+
110138
status = updateShardChunks(opCtx, nss, collAndChunks.changedChunks, collAndChunks.epoch);
111139
if (!status.isOK()) {
112140
return status;
@@ -1036,7 +1064,8 @@ void ShardServerCatalogCacheLoader::_updatePersistedCollAndChunksMetadata(
10361064
}
10371065

10381066
uassertStatusOKWithContext(
1039-
persistCollectionAndChangedChunks(opCtx, nss, *task.collectionAndChangedChunks),
1067+
persistCollectionAndChangedChunks(
1068+
opCtx, nss, *task.collectionAndChangedChunks, task.minQueryVersion),
10401069
str::stream() << "Failed to update the persisted chunk metadata for collection '"
10411070
<< nss.ns() << "' from '" << task.minQueryVersion.toString() << "' to '"
10421071
<< task.maxQueryVersion.toString() << "'. Will be retried.");

0 commit comments

Comments
 (0)