summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheahuychou Mao <cheahuychou.mao@mongodb.com>2019-12-06 16:30:41 +0000
committerevergreen <evergreen@mongodb.com>2019-12-06 16:30:41 +0000
commiteba76c558b3e7d784c146b51ced16d48b1d0efe7 (patch)
treeeb43d876af50dfd29a6596878f15ed9ab500a30b
parent13944bb3fedc8d91c02c56bb66bb5c76a0a558d0 (diff)
downloadmongo-eba76c558b3e7d784c146b51ced16d48b1d0efe7.tar.gz
SERVER-44719 Make createIndexes, dropIndexes, and collMod check shard versions
-rw-r--r--jstests/libs/chunk_manipulation_util.js2
-rw-r--r--jstests/sharding/index_and_collection_option_propagation.js13
-rw-r--r--jstests/sharding/index_commands_shard_targeting.js241
-rw-r--r--jstests/sharding/track_unsharded_collections_check_shard_version.js1
-rw-r--r--src/mongo/db/catalog/coll_mod.cpp2
-rw-r--r--src/mongo/db/catalog/drop_indexes.cpp2
-rw-r--r--src/mongo/db/commands/create_indexes.cpp12
-rw-r--r--src/mongo/db/index_builds_coordinator.cpp19
-rw-r--r--src/mongo/db/index_builds_coordinator_mongod.cpp16
-rw-r--r--src/mongo/db/s/collection_metadata_filtering_test.cpp2
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp20
-rw-r--r--src/mongo/db/s/collection_sharding_state.h5
-rw-r--r--src/mongo/db/s/collection_sharding_state_test.cpp2
-rw-r--r--src/mongo/db/s/operation_sharding_state.cpp31
-rw-r--r--src/mongo/db/s/operation_sharding_state.h9
-rw-r--r--src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp7
-rw-r--r--src/mongo/db/service_entry_point_common.cpp2
-rw-r--r--src/mongo/db/service_entry_point_mongod.cpp8
-rw-r--r--src/mongo/s/stale_exception.h17
19 files changed, 307 insertions, 104 deletions
diff --git a/jstests/libs/chunk_manipulation_util.js b/jstests/libs/chunk_manipulation_util.js
index 389c3120169..90ea743ecf3 100644
--- a/jstests/libs/chunk_manipulation_util.js
+++ b/jstests/libs/chunk_manipulation_util.js
@@ -240,8 +240,6 @@ function waitForMigrateStep(shardConnection, stepNumber) {
//
function runCommandDuringTransferMods(
mongos, staticMongod, ns, bounds, fromShard, toShard, cmdFunc) {
- let configDB = mongos.getDB('config');
-
// Turn on the fail point and wait for moveChunk to hit the fail point.
pauseMoveChunkAtStep(fromShard, moveChunkStepNames.startedMoveChunk);
let joinMoveChunk =
diff --git a/jstests/sharding/index_and_collection_option_propagation.js b/jstests/sharding/index_and_collection_option_propagation.js
index 15f61961b0a..3a814ca76e9 100644
--- a/jstests/sharding/index_and_collection_option_propagation.js
+++ b/jstests/sharding/index_and_collection_option_propagation.js
@@ -134,18 +134,7 @@ checkShardIndexes("idx2", [st.shard0, st.shard1], [st.shard2]);
// dropIndex
res = st.s.getDB(dbName).getCollection(collName).dropIndex("idx1_1");
assert.commandWorked(res);
-// TODO SERVER-44719: Once createIndex is made to check shard versions, after the createIndex
-// above, each shard should have refreshed its cache. So the mongos will not need to retry the
-// dropIndex here and will not get IndexNotFound from shard0 (which is ignored and causes the
-// response from shard0 to be empty).
-if (jsTestOptions().mongosBinVersion == "last-stable") {
- assert.eq(res.raw[st.shard0.host].ok, 1, tojson(res));
-} else {
- // dropIndexes checks shard versions so causes the first try to succeed on shard0 but not
- // on shard1. When it retries after the refresh, it fails with IndexNotFound on shard0
- // so the response from shard0 is empty.
- assert.eq(undefined, res.raw[st.shard0.host], tojson(res));
-}
+assert.eq(res.raw[st.shard0.host].ok, 1, tojson(res));
assert.eq(res.raw[st.shard1.host].ok, 1, tojson(res));
assert.eq(undefined, res.raw[st.shard2.host], tojson(res));
checkShardIndexes("idx1", [], [st.shard0, st.shard1, st.shard2]);
diff --git a/jstests/sharding/index_commands_shard_targeting.js b/jstests/sharding/index_commands_shard_targeting.js
index 0eaf9d1914f..f0cf3d0386d 100644
--- a/jstests/sharding/index_commands_shard_targeting.js
+++ b/jstests/sharding/index_commands_shard_targeting.js
@@ -1,11 +1,16 @@
/*
- * Test that the index commands send shard versions, and only target the primary
- * shard and the shards that have chunks for the collection.
+ * Test that the index commands send and check shard versions, and only target the primary
+ * shard and the shards that have chunks for the collection. Also test that the commands fail
+ * if they are run when the critical section is in progress, and block until the critical
+ * section is over.
* @tags: [requires_fcv_44]
*/
(function() {
"use strict";
+load('jstests/libs/chunk_manipulation_util.js');
+load("jstests/libs/fail_point_util.js");
+
/*
* Returns the metadata for the collection in the shard's catalog cache.
*/
@@ -16,6 +21,25 @@ function getMetadataOnShard(shard, ns) {
}
/*
+ * Asserts that the collection version for the collection in the shard's catalog cache
+ * is equal to the given collection version.
+ */
+function assertCollectionVersionEquals(shard, ns, collectionVersion) {
+ assert.eq(getMetadataOnShard(shard, ns).collVersion, collectionVersion);
+}
+
+/*
+ * Asserts that the collection version for the collection in the shard's catalog cache
+ * is older than the given collection version.
+ */
+function assertCollectionVersionOlderThan(shard, ns, collectionVersion) {
+ let shardCollectionVersion = getMetadataOnShard(shard, ns).collVersion;
+ if (shardCollectionVersion != undefined) {
+ assert.lt(shardCollectionVersion.t, collectionVersion.t);
+ }
+}
+
+/*
* Asserts that the shard version of the shard in its catalog cache is equal to the
* given shard version.
*/
@@ -24,6 +48,17 @@ function assertShardVersionEquals(shard, ns, shardVersion) {
}
/*
+ * Moves the chunk that matches the given query to toShard. Forces fromShard to skip the
+ * recipient metadata refresh post-migration commit.
+ */
+function moveChunkNotRefreshRecipient(mongos, ns, fromShard, toShard, findQuery) {
+ let failPoint = configureFailPoint(fromShard, "doNotRefreshRecipientAfterCommit");
+ assert.commandWorked(mongos.adminCommand(
+ {moveChunk: ns, find: findQuery, to: toShard.shardName, _waitForDelete: true}));
+ failPoint.off();
+}
+
+/*
* Asserts that the shard has an index for the collection with the given index key.
*/
function assertIndexExistsOnShard(shard, dbName, collName, targetIndexKey) {
@@ -53,21 +88,83 @@ function assertIndexDoesNotExistOnShard(shard, dbName, collName, targetIndexKey)
}
/*
- * Performs chunk operations to make the primary shard (shard0) not own any chunks for collection,
- * and only a subset of non-primary shards (shard1 and shard2) own chunks for collection.
+ * Runs the command after performing chunk operations to make the primary shard (shard0) not own
+ * any chunks for the collection, and the subset of non-primary shards (shard1 and shard2) that
+ * own chunks for the collection have stale catalog cache.
+ *
+ * Asserts that the command checks shard versions by checking that the shards to refresh their
+ * cache after the command is run.
*/
-function setUpShards(st, ns) {
+function assertCommandChecksShardVersions(st, dbName, collName, testCase) {
+ const ns = dbName + "." + collName;
+
// Move the initial chunk out of the primary shard.
- assert.commandWorked(st.s.adminCommand(
- {moveChunk: ns, find: {_id: MinKey}, to: st.shard1.shardName, _waitForDelete: true}));
+ moveChunkNotRefreshRecipient(st.s, ns, st.shard0, st.shard1, {_id: MinKey});
// Split the chunk to create two chunks on shard1. Move one of the chunks to shard2.
assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}}));
- assert.commandWorked(st.s.adminCommand(
- {moveChunk: ns, find: {_id: 0}, to: st.shard2.shardName, _waitForDelete: true}));
+ moveChunkNotRefreshRecipient(st.s, ns, st.shard1, st.shard2, {_id: 0});
// Assert that primary shard does not have any chunks for the collection.
assertShardVersionEquals(st.shard0, ns, Timestamp(0, 0));
+
+ const mongosCollectionVersion = st.s.adminCommand({getShardVersion: ns}).version;
+
+ // Assert that besides the latest donor shard (shard1), all shards have stale collection
+ // version.
+ assertCollectionVersionOlderThan(st.shard0, ns, mongosCollectionVersion);
+ assertCollectionVersionEquals(st.shard1, ns, mongosCollectionVersion);
+ assertCollectionVersionOlderThan(st.shard2, ns, mongosCollectionVersion);
+ assertCollectionVersionOlderThan(st.shard3, ns, mongosCollectionVersion);
+
+ if (testCase.setUp) {
+ testCase.setUp();
+ }
+ assert.commandWorked(st.s.getDB(dbName).runCommand(testCase.command));
+
+ // Assert that primary shard still has stale collection version after the command is run
+ // because both the shard version in the command and in the shard's cache are UNSHARDED
+ // (no chunks).
+ assertCollectionVersionOlderThan(st.shard0, ns, mongosCollectionVersion);
+
+ // Assert that the other shards have the latest collection version after the command is run.
+ assertCollectionVersionEquals(st.shard1, ns, mongosCollectionVersion);
+ assertCollectionVersionEquals(st.shard2, ns, mongosCollectionVersion);
+ assertCollectionVersionEquals(st.shard3, ns, mongosCollectionVersion);
+}
+
+/*
+ * Runs the command during a chunk migration after the donor enters the read-only phase of the
+ * critical section. Asserts that the command is blocked behind the critical section.
+ *
+ * Assumes that shard0 is the primary shard.
+ */
+function assertCommandBlocksIfCriticalSectionInProgress(
+ st, staticMongod, dbName, collName, testCase) {
+ const ns = dbName + "." + collName;
+ const fromShard = st.shard0;
+ const toShard = st.shard1;
+
+ assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}}));
+
+ // Turn on the fail point and wait for moveChunk to hit the fail point.
+ pauseMoveChunkAtStep(fromShard, moveChunkStepNames.chunkDataCommitted);
+ let joinMoveChunk =
+ moveChunkParallel(staticMongod, st.s.host, {_id: 0}, null, ns, toShard.shardName);
+ waitForMoveChunkStep(fromShard, moveChunkStepNames.chunkDataCommitted);
+
+ if (testCase.setUp) {
+ testCase.setUp();
+ }
+
+ // Run the command and assert that it eventually times out.
+ const cmdWithMaxTimeMS = Object.assign({}, testCase.command, {maxTimeMS: 500});
+ assert.commandFailedWithCode(st.s.getDB(dbName).runCommand(cmdWithMaxTimeMS),
+ ErrorCodes.MaxTimeMSExpired);
+
+ // Turn off the fail point and wait for moveChunk to complete.
+ unpauseMoveChunkAtStep(fromShard, moveChunkStepNames.chunkDataCommitted);
+ joinMoveChunk();
}
const numShards = 4;
@@ -87,84 +184,94 @@ const index = {
name: "x_1"
};
-const expectedTargetedShards = new Set([st.shard0, st.shard1, st.shard2]);
-assert.lt(expectedTargetedShards.size, numShards);
+const testCases = {
+ createIndexes: collName => {
+ return {
+ command: {createIndexes: collName, indexes: [index]},
+ assertCommandRanOnShard: (shard) => {
+ assertIndexExistsOnShard(shard, dbName, collName, index.key);
+ },
+ assertCommandDidNotRunOnShard: (shard) => {
+ assertIndexDoesNotExistOnShard(shard, dbName, collName, index.key);
+ }
+ };
+ },
+ dropIndexes: collName => {
+ return {
+ command: {dropIndexes: collName, index: index.name},
+ setUp: () => {
+ // Create the index directly on all the shards.
+ allShards.forEach(function(shard) {
+ assert.commandWorked(shard.getDB(dbName).runCommand(
+ {createIndexes: collName, indexes: [index]}));
+ });
+ },
+ assertCommandRanOnShard: (shard) => {
+ assertIndexDoesNotExistOnShard(shard, dbName, collName, index.key);
+ },
+ assertCommandDidNotRunOnShard: (shard) => {
+ assertIndexExistsOnShard(shard, dbName, collName, index.key);
+ }
+ };
+ },
+ collMod: collName => {
+ return {
+ command: {collMod: collName, validator: {x: {$type: "string"}}},
+ assertCommandRanOnShard: (shard) => {
+ assert.commandFailedWithCode(
+ shard.getCollection(dbName + "." + collName).insert({x: 1}),
+ ErrorCodes.DocumentValidationFailure);
+ },
+ assertCommandDidNotRunOnShard: (shard) => {
+ assert.commandWorked(shard.getCollection(dbName + "." + collName).insert({x: 1}));
+ }
+ };
+ },
+};
assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
st.ensurePrimaryShard(dbName, st.shard0.shardName);
-jsTest.log("Test createIndexes command...");
-
-(() => {
- let testColl = testDB.testCreateIndexes;
- let collName = testColl.getName();
- let ns = testColl.getFullName();
-
- assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: shardKey}));
- setUpShards(st, ns);
- assert.commandWorked(testDB.runCommand({createIndexes: collName, indexes: [index]}));
-
- // Assert that the index exists on the targeted shards but not on the untargeted shards.
- allShards.forEach(function(shard) {
- if (expectedTargetedShards.has(shard)) {
- assertIndexExistsOnShard(shard, dbName, collName, index.key);
- } else {
- assertIndexDoesNotExistOnShard(shard, dbName, collName, index.key);
- }
- });
-})();
-
-jsTest.log("Test dropIndexes command...");
+// Test that the indexes commands send and check shard vesions, and only target the primary
+// shard and the shards that own chunks for the collection.
+const expectedTargetedShards = new Set([st.shard0, st.shard1, st.shard2]);
+assert.lt(expectedTargetedShards.size, numShards);
-(() => {
- let testColl = testDB.testDropIndexes;
- let collName = testColl.getName();
- let ns = testColl.getFullName();
+for (const command of Object.keys(testCases)) {
+ jsTest.log(`Testing that ${command} sends and checks shard version...`);
+ let collName = command;
+ let ns = dbName + "." + collName;
+ let testCase = testCases[command](collName);
assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: shardKey}));
- setUpShards(st, ns);
-
- // Create the index directly on all the shards.
- allShards.forEach(function(shard) {
- assert.commandWorked(
- shard.getDB(dbName).runCommand({createIndexes: collName, indexes: [index]}));
- });
-
- // Drop the index.
- assert.commandWorked(testDB.runCommand({dropIndexes: collName, index: index.name}));
+ assertCommandChecksShardVersions(st, dbName, collName, testCase);
- // Assert that the index no longer exists on the targeted shards but still exists on the
- // untargeted shards.
allShards.forEach(function(shard) {
if (expectedTargetedShards.has(shard)) {
- assertIndexDoesNotExistOnShard(shard, dbName, collName, index.key);
+ testCase.assertCommandRanOnShard(shard);
} else {
- assertIndexExistsOnShard(shard, dbName, collName, index.key);
+ testCase.assertCommandDidNotRunOnShard(shard);
}
});
-})();
+}
-jsTest.log("Test collMod command...");
+// Test that the indexes commands are blocked behind the critical section.
+const staticMongod = MongoRunner.runMongod({});
-(() => {
- let testColl = testDB.testCollMod;
- let collName = testColl.getName();
- let ns = testColl.getFullName();
+for (const command of Object.keys(testCases)) {
+ jsTest.log(`Testing that ${command} is blocked behind the critical section...`);
+ let collName = command + "CriticalSection";
+ let ns = dbName + "." + collName;
+ let testCase = testCases[command](collName);
assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: shardKey}));
- setUpShards(st, ns);
- assert.commandWorked(testDB.runCommand({collMod: collName, validator: {x: {$type: "string"}}}));
+ assertCommandBlocksIfCriticalSectionInProgress(st, staticMongod, dbName, collName, testCase);
- // Assert that the targeted shards do document validation, and the untargeted shards do not.
allShards.forEach(function(shard) {
- if (expectedTargetedShards.has(shard)) {
- assert.commandFailedWithCode(shard.getCollection(ns).insert({x: 1}),
- ErrorCodes.DocumentValidationFailure);
- } else {
- assert.commandWorked(shard.getCollection(ns).insert({x: 1}));
- }
+ testCase.assertCommandDidNotRunOnShard(shard);
});
-})();
+}
st.stop();
+MongoRunner.stopMongod(staticMongod);
})();
diff --git a/jstests/sharding/track_unsharded_collections_check_shard_version.js b/jstests/sharding/track_unsharded_collections_check_shard_version.js
index 7b530acd2f9..9b37756978c 100644
--- a/jstests/sharding/track_unsharded_collections_check_shard_version.js
+++ b/jstests/sharding/track_unsharded_collections_check_shard_version.js
@@ -106,7 +106,6 @@ let testCases = {
createIndexes: {
implicitlyCreatesCollection: true,
whenNamespaceIsViewFailsWith: ErrorCodes.CommandNotSupportedOnView,
- doesNotCheckShardVersion: true,
command: collName => {
return {createIndexes: collName, indexes: [{key: {a: 1}, name: "index"}]};
},
diff --git a/src/mongo/db/catalog/coll_mod.cpp b/src/mongo/db/catalog/coll_mod.cpp
index c9018e46350..49a666c9c67 100644
--- a/src/mongo/db/catalog/coll_mod.cpp
+++ b/src/mongo/db/catalog/coll_mod.cpp
@@ -294,7 +294,7 @@ Status _collModInternal(OperationContext* opCtx,
return Status(ErrorCodes::NamespaceNotFound, "ns does not exist");
}
- // This is necessary to set up CurOp and update the Top stats.
+ // This is necessary to set up CurOp, update the Top stats, and check shard version.
OldClientContext ctx(opCtx, nss.ns());
bool userInitiatedWritesAndNotPrimary = opCtx->writesAreReplicated() &&
diff --git a/src/mongo/db/catalog/drop_indexes.cpp b/src/mongo/db/catalog/drop_indexes.cpp
index d09a7a2044d..6a14fa7505f 100644
--- a/src/mongo/db/catalog/drop_indexes.cpp
+++ b/src/mongo/db/catalog/drop_indexes.cpp
@@ -224,6 +224,8 @@ Status dropIndexes(OperationContext* opCtx,
collection->uuid());
WriteUnitOfWork wunit(opCtx);
+
+ // This is necessary to check shard version.
OldClientContext ctx(opCtx, nss.ns());
// Use an empty BSONObjBuilder to avoid duplicate appends to result on retry loops.
diff --git a/src/mongo/db/commands/create_indexes.cpp b/src/mongo/db/commands/create_indexes.cpp
index 4e6167326b4..d0da95f2847 100644
--- a/src/mongo/db/commands/create_indexes.cpp
+++ b/src/mongo/db/commands/create_indexes.cpp
@@ -341,6 +341,13 @@ void checkDatabaseShardingState(OperationContext* opCtx, StringData dbName) {
}
/**
+ * Checks collection sharding state. Throws exception on error.
+ */
+void checkCollectionShardingState(OperationContext* opCtx, const NamespaceString& ns) {
+ CollectionShardingState::get(opCtx, ns)->checkShardVersionOrThrow(opCtx, true);
+}
+
+/**
* Opens or creates database for index creation.
* On database creation, the lock will be made exclusive.
*/
@@ -440,6 +447,7 @@ bool runCreateIndexesForMobile(OperationContext* opCtx,
opCtx->recoveryUnit()->abandonSnapshot();
boost::optional<Lock::CollectionLock> exclusiveCollectionLock(
boost::in_place_init, opCtx, ns, MODE_X);
+ checkCollectionShardingState(opCtx, ns);
// Index builds can safely ignore prepare conflicts and perform writes. On primaries, an
// exclusive lock in the final drain phase conflicts with prepared transactions.
@@ -701,6 +709,10 @@ bool runCreateIndexesWithCoordinator(OperationContext* opCtx,
opCtx->recoveryUnit()->abandonSnapshot();
Lock::CollectionLock collLock(opCtx, ns, MODE_X);
+ // This check is for optimization purposes only as this lock is released immediately after
+ // this and is acquired again when we build the index.
+ checkCollectionShardingState(opCtx, ns);
+
auto collection = getOrCreateCollection(opCtx, db, ns, cmdObj, &errmsg, &result);
collectionUUID = collection->uuid();
}
diff --git a/src/mongo/db/index_builds_coordinator.cpp b/src/mongo/db/index_builds_coordinator.cpp
index ca37985492b..e406d9fa688 100644
--- a/src/mongo/db/index_builds_coordinator.cpp
+++ b/src/mongo/db/index_builds_coordinator.cpp
@@ -986,6 +986,13 @@ IndexBuildsCoordinator::_filterSpecsAndRegisterBuild(
auto collection = autoColl.getCollection();
const auto& nss = collection->ns();
+ // This check is for optimization purposes only as since this lock is released after this,
+ // and is acquired again when we build the index in _setUpIndexBuild.
+ auto status = CollectionShardingState::get(opCtx, nss)->checkShardVersionNoThrow(opCtx, true);
+ if (!status.isOK()) {
+ return status;
+ }
+
// Lock from when we ascertain what indexes to build through to when the build is registered
// on the Coordinator and persistedly set up in the catalog. This serializes setting up an
// index build so that no attempts are made to register the same build twice.
@@ -1012,7 +1019,7 @@ IndexBuildsCoordinator::_filterSpecsAndRegisterBuild(
buildUUID, collectionUUID, dbName.toString(), filteredSpecs, protocol, commitQuorum);
replIndexBuildState->stats.numIndexesBefore = _getNumIndexesTotal(opCtx, collection);
- Status status = _registerIndexBuild(lk, replIndexBuildState);
+ status = _registerIndexBuild(lk, replIndexBuildState);
if (!status.isOK()) {
return status;
}
@@ -1035,6 +1042,14 @@ Status IndexBuildsCoordinator::_setUpIndexBuild(OperationContext* opCtx,
AutoGetCollection autoColl(opCtx, nssOrUuid, MODE_X);
auto collection = autoColl.getCollection();
const auto& nss = collection->ns();
+ auto status = CollectionShardingState::get(opCtx, nss)->checkShardVersionNoThrow(opCtx, true);
+ if (!status.isOK()) {
+ // We need to unregister the index build to allow retries to succeed.
+ stdx::unique_lock<Latch> lk(_mutex);
+ _unregisterIndexBuild(lk, replIndexBuildState);
+
+ return status;
+ }
auto replCoord = repl::ReplicationCoordinator::get(opCtx);
const bool replSetAndNotPrimary =
@@ -1082,7 +1097,7 @@ Status IndexBuildsCoordinator::_setUpIndexBuild(OperationContext* opCtx,
: IndexBuildsManager::IndexConstraints::kEnforce;
options.protocol = replIndexBuildState->protocol;
- auto status = [&] {
+ status = [&] {
if (!replSetAndNotPrimary) {
// On standalones and primaries, call setUpIndexBuild(), which makes the initial catalog
// write. On primaries, this replicates the startIndexBuild oplog entry.
diff --git a/src/mongo/db/index_builds_coordinator_mongod.cpp b/src/mongo/db/index_builds_coordinator_mongod.cpp
index 41a5a2ea25a..36968811cc2 100644
--- a/src/mongo/db/index_builds_coordinator_mongod.cpp
+++ b/src/mongo/db/index_builds_coordinator_mongod.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/index_build_entry_helpers.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/service_context.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point.h"
@@ -139,6 +140,13 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx,
const auto deadline = opCtx->getDeadline();
const auto timeoutError = opCtx->getTimeoutError();
+ const NamespaceStringOrUUID nssOrUuid{dbName, collectionUUID};
+ const auto nss = CollectionCatalog::get(opCtx).resolveNamespaceStringOrUUID(opCtx, nssOrUuid);
+
+ const auto& oss = OperationShardingState::get(opCtx);
+ const auto shardVersion = oss.getShardVersion(nss);
+ const auto dbVersion = oss.getDbVersion(dbName);
+
// Task in thread pool should have similar CurOp representation to the caller so that it can be
// identified as a createIndexes operation.
LogicalOp logicalOp = LogicalOp::opInvalid;
@@ -165,6 +173,7 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx,
buildUUID,
collectionUUID,
dbName,
+ nss,
deadline,
indexBuildOptions,
logicalOp,
@@ -172,7 +181,9 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx,
replState,
startPromise = std::move(startPromise),
startTimestamp,
- timeoutError
+ timeoutError,
+ shardVersion,
+ dbVersion
](auto status) mutable noexcept {
// Clean up if we failed to schedule the task.
if (!status.isOK()) {
@@ -185,6 +196,9 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx,
auto opCtx = Client::getCurrent()->makeOperationContext();
opCtx->setDeadlineByDate(deadline, timeoutError);
+ auto& oss = OperationShardingState::get(opCtx.get());
+ oss.initializeClientRoutingVersions(nss, shardVersion, dbVersion);
+
{
stdx::unique_lock<Client> lk(*opCtx->getClient());
auto curOp = CurOp::get(opCtx.get());
diff --git a/src/mongo/db/s/collection_metadata_filtering_test.cpp b/src/mongo/db/s/collection_metadata_filtering_test.cpp
index da000652d40..b0effe64772 100644
--- a/src/mongo/db/s/collection_metadata_filtering_test.cpp
+++ b/src/mongo/db/s/collection_metadata_filtering_test.cpp
@@ -112,7 +112,7 @@ protected:
const auto version = cm->getVersion(ShardId("0"));
BSONObjBuilder builder;
version.appendToCommand(&builder);
- oss.initializeClientRoutingVersions(kNss, builder.obj());
+ oss.initializeClientRoutingVersionsFromCommand(kNss, builder.obj());
}
std::shared_ptr<MetadataManager> _manager;
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 5fc2a88d16a..5ea0a4756cc 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -203,6 +203,16 @@ void CollectionShardingState::checkShardVersionOrThrow(OperationContext* opCtx,
(void)_getMetadataWithVersionCheckAt(opCtx, boost::none, isCollection);
}
+Status CollectionShardingState::checkShardVersionNoThrow(OperationContext* opCtx,
+ bool isCollection) noexcept {
+ try {
+ checkShardVersionOrThrow(opCtx, isCollection);
+ return Status::OK();
+ } catch (const DBException& ex) {
+ return ex.toStatus();
+ }
+}
+
boost::optional<ScopedCollectionMetadata> CollectionShardingState::_getMetadataWithVersionCheckAt(
OperationContext* opCtx,
const boost::optional<mongo::LogicalTime>& atClusterTime,
@@ -250,13 +260,9 @@ boost::optional<ScopedCollectionMetadata> CollectionShardingState::_getMetadataW
}();
if (criticalSectionSignal) {
- // Set migration critical section on operation sharding state: operation will wait for the
- // migration to finish before returning failure and retrying.
- auto& oss = OperationShardingState::get(opCtx);
- oss.setMigrationCriticalSectionSignal(criticalSectionSignal);
-
- uasserted(StaleConfigInfo(_nss, receivedShardVersion, wantedShardVersion),
- str::stream() << "migration commit in progress for " << _nss.ns());
+ uasserted(
+ StaleConfigInfo(_nss, receivedShardVersion, wantedShardVersion, criticalSectionSignal),
+ str::stream() << "migration commit in progress for " << _nss.ns());
}
if (receivedShardVersion.isWriteCompatibleWith(wantedShardVersion)) {
diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h
index cc246dacdab..d411a9defa9 100644
--- a/src/mongo/db/s/collection_sharding_state.h
+++ b/src/mongo/db/s/collection_sharding_state.h
@@ -123,6 +123,11 @@ public:
void checkShardVersionOrThrow(OperationContext* opCtx, bool isCollection);
/**
+ * Similar to checkShardVersionOrThrow but returns a status instead of throwing.
+ */
+ Status checkShardVersionNoThrow(OperationContext* opCtx, bool isCollection) noexcept;
+
+ /**
* Methods to control the collection's critical section. Methods listed below must be called
* with both the collection lock and CollectionShardingRuntimeLock held in exclusive mode.
*
diff --git a/src/mongo/db/s/collection_sharding_state_test.cpp b/src/mongo/db/s/collection_sharding_state_test.cpp
index 1ee6cfbeed8..a4bd7b58864 100644
--- a/src/mongo/db/s/collection_sharding_state_test.cpp
+++ b/src/mongo/db/s/collection_sharding_state_test.cpp
@@ -69,7 +69,7 @@ protected:
const auto version = metadata.getShardVersion();
BSONObjBuilder builder;
version.appendToCommand(&builder);
- oss.initializeClientRoutingVersions(kTestNss, builder.obj());
+ oss.initializeClientRoutingVersionsFromCommand(kTestNss, builder.obj());
}
};
diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp
index 0a726714fd6..b5e01ee4dff 100644
--- a/src/mongo/db/s/operation_sharding_state.cpp
+++ b/src/mongo/db/s/operation_sharding_state.cpp
@@ -82,14 +82,17 @@ bool OperationShardingState::allowImplicitCollectionCreation() const {
return _allowImplicitCollectionCreation;
}
-void OperationShardingState::initializeClientRoutingVersions(NamespaceString nss,
- const BSONObj& cmdObj) {
+void OperationShardingState::initializeClientRoutingVersionsFromCommand(NamespaceString nss,
+ const BSONObj& cmdObj) {
invariant(_shardVersions.empty());
invariant(_databaseVersions.empty());
+ boost::optional<ChunkVersion> shardVersion;
+ boost::optional<DatabaseVersion> dbVersion;
+
const auto shardVersionElem = cmdObj.getField(ChunkVersion::kShardVersionField);
if (!shardVersionElem.eoo()) {
- _shardVersions[nss.ns()] = uassertStatusOK(ChunkVersion::parseFromCommand(cmdObj));
+ shardVersion = uassertStatusOK(ChunkVersion::parseFromCommand(cmdObj));
}
const auto dbVersionElem = cmdObj.getField(kDbVersionField);
@@ -98,11 +101,29 @@ void OperationShardingState::initializeClientRoutingVersions(NamespaceString nss
str::stream() << "expected databaseVersion element to be an object, got "
<< dbVersionElem,
dbVersionElem.type() == BSONType::Object);
+
+ dbVersion = DatabaseVersion::parse(IDLParserErrorContext("initializeClientRoutingVersions"),
+ dbVersionElem.Obj());
+ }
+
+ initializeClientRoutingVersions(nss, shardVersion, dbVersion);
+}
+
+void OperationShardingState::initializeClientRoutingVersions(
+ NamespaceString nss,
+ const boost::optional<ChunkVersion>& shardVersion,
+ const boost::optional<DatabaseVersion>& dbVersion) {
+ invariant(_shardVersions.empty());
+ invariant(_databaseVersions.empty());
+
+ if (shardVersion) {
+ _shardVersions[nss.ns()] = *shardVersion;
+ }
+ if (dbVersion) {
// Unforunately this is a bit ugly; it's because a command comes with a shardVersion or
// databaseVersion, and the assumption is that those versions are applied to whatever is
// returned by the Command's parseNs(), which can either be a full namespace or just a db.
- _databaseVersions[nss.db().empty() ? nss.ns() : nss.db()] = DatabaseVersion::parse(
- IDLParserErrorContext("initializeClientRoutingVersions"), dbVersionElem.Obj());
+ _databaseVersions[nss.db().empty() ? nss.ns() : nss.db()] = *dbVersion;
}
}
diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h
index d99227e0254..77dd848d9c4 100644
--- a/src/mongo/db/s/operation_sharding_state.h
+++ b/src/mongo/db/s/operation_sharding_state.h
@@ -96,7 +96,14 @@ public:
* This initialization may only be performed once for the lifetime of the object, which
* coincides with the lifetime of the client's request.
*/
- void initializeClientRoutingVersions(NamespaceString nss, const BSONObj& cmdObj);
+ void initializeClientRoutingVersionsFromCommand(NamespaceString nss, const BSONObj& cmdObj);
+
+ /**
+ * Stores the given shardVersion and databaseVersion for the given namespace.
+ */
+ void initializeClientRoutingVersions(NamespaceString nss,
+ const boost::optional<ChunkVersion>& shardVersion,
+ const boost::optional<DatabaseVersion>& dbVersion);
/**
* Returns whether or not there is a shard version associated with this operation.
diff --git a/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp b/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp
index 625f62a51f3..4d544948696 100644
--- a/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp
+++ b/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp
@@ -73,6 +73,13 @@ ScopedOperationCompletionShardingActions::~ScopedOperationCompletionShardingActi
}
if (auto staleInfo = status->extraInfo<StaleConfigInfo>()) {
+ if (staleInfo->getCriticalSectionSignal()) {
+ // Set migration critical section on operation sharding state: operation will wait for
+ // the migration to finish before returning.
+ auto& oss = OperationShardingState::get(_opCtx);
+ oss.setMigrationCriticalSectionSignal(staleInfo->getCriticalSectionSignal());
+ }
+
auto handleMismatchStatus = onShardVersionMismatchNoExcept(
_opCtx, staleInfo->getNss(), staleInfo->getVersionReceived());
if (!handleMismatchStatus.isOK())
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 3bff6836fa9..58db096f7c0 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -959,7 +959,7 @@ void execCommandDatabase(OperationContext* opCtx,
readConcernArgs.getLevel() != repl::ReadConcernLevel::kAvailableReadConcern &&
(iAmPrimary ||
(readConcernArgs.hasLevel() || readConcernArgs.getArgsAfterClusterTime()))) {
- oss.initializeClientRoutingVersions(invocation->ns(), request.body);
+ oss.initializeClientRoutingVersionsFromCommand(invocation->ns(), request.body);
auto const shardingState = ShardingState::get(opCtx);
if (oss.hasShardVersion() || oss.hasDbVersion()) {
diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp
index c01f3260dc6..178b912a9f0 100644
--- a/src/mongo/db/service_entry_point_mongod.cpp
+++ b/src/mongo/db/service_entry_point_mongod.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/speculative_majority_read_info.h"
#include "mongo/db/s/implicit_create_collection.h"
+#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/scoped_operation_completion_sharding_actions.h"
#include "mongo/db/s/shard_filtering_metadata_refresh.h"
#include "mongo/db/s/sharding_config_optime_gossip.h"
@@ -182,6 +183,13 @@ public:
void handleException(const DBException& e, OperationContext* opCtx) const override {
// If we got a stale config, wait in case the operation is stuck in a critical section
if (auto sce = e.extraInfo<StaleConfigInfo>()) {
+ if (sce->getCriticalSectionSignal()) {
+ // Set migration critical section on operation sharding state: operation will wait
+ // for the migration to finish before returning.
+ auto& oss = OperationShardingState::get(opCtx);
+ oss.setMigrationCriticalSectionSignal(sce->getCriticalSectionSignal());
+ }
+
if (!opCtx->getClient()->isInDirectClient()) {
// We already have the StaleConfig exception, so just swallow any errors due to
// refresh
diff --git a/src/mongo/s/stale_exception.h b/src/mongo/s/stale_exception.h
index aab3af48514..a081e4dad2a 100644
--- a/src/mongo/s/stale_exception.h
+++ b/src/mongo/s/stale_exception.h
@@ -32,6 +32,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/database_version_gen.h"
+#include "mongo/util/concurrency/notification.h"
namespace mongo {
@@ -41,8 +42,12 @@ public:
StaleConfigInfo(NamespaceString nss,
ChunkVersion received,
- boost::optional<ChunkVersion> wanted)
- : _nss(std::move(nss)), _received(received), _wanted(wanted) {}
+ boost::optional<ChunkVersion> wanted,
+ std::shared_ptr<Notification<void>> criticalSectionSignal = nullptr)
+ : _nss(std::move(nss)),
+ _received(received),
+ _wanted(wanted),
+ _criticalSectionSignal(criticalSectionSignal) {}
const auto& getNss() const {
return _nss;
@@ -56,6 +61,10 @@ public:
return _wanted;
}
+ auto getCriticalSectionSignal() const {
+ return _criticalSectionSignal;
+ }
+
void serialize(BSONObjBuilder* bob) const override;
static std::shared_ptr<const ErrorExtraInfo> parse(const BSONObj&);
static StaleConfigInfo parseFromCommandError(const BSONObj& commandError);
@@ -64,6 +73,10 @@ private:
NamespaceString _nss;
ChunkVersion _received;
boost::optional<ChunkVersion> _wanted;
+
+ // This signal does not get serialized and therefore does not get propagated
+ // to the router.
+ std::shared_ptr<Notification<void>> _criticalSectionSignal;
};
using StaleConfigException = ExceptionFor<ErrorCodes::StaleConfig>;