diff options
author | Cheahuychou Mao <cheahuychou.mao@mongodb.com> | 2019-12-06 16:30:41 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-12-06 16:30:41 +0000 |
commit | eba76c558b3e7d784c146b51ced16d48b1d0efe7 (patch) | |
tree | eb43d876af50dfd29a6596878f15ed9ab500a30b | |
parent | 13944bb3fedc8d91c02c56bb66bb5c76a0a558d0 (diff) | |
download | mongo-eba76c558b3e7d784c146b51ced16d48b1d0efe7.tar.gz |
SERVER-44719 Make createIndexes, dropIndexes, and collMod check shard versions
19 files changed, 307 insertions, 104 deletions
diff --git a/jstests/libs/chunk_manipulation_util.js b/jstests/libs/chunk_manipulation_util.js index 389c3120169..90ea743ecf3 100644 --- a/jstests/libs/chunk_manipulation_util.js +++ b/jstests/libs/chunk_manipulation_util.js @@ -240,8 +240,6 @@ function waitForMigrateStep(shardConnection, stepNumber) { // function runCommandDuringTransferMods( mongos, staticMongod, ns, bounds, fromShard, toShard, cmdFunc) { - let configDB = mongos.getDB('config'); - // Turn on the fail point and wait for moveChunk to hit the fail point. pauseMoveChunkAtStep(fromShard, moveChunkStepNames.startedMoveChunk); let joinMoveChunk = diff --git a/jstests/sharding/index_and_collection_option_propagation.js b/jstests/sharding/index_and_collection_option_propagation.js index 15f61961b0a..3a814ca76e9 100644 --- a/jstests/sharding/index_and_collection_option_propagation.js +++ b/jstests/sharding/index_and_collection_option_propagation.js @@ -134,18 +134,7 @@ checkShardIndexes("idx2", [st.shard0, st.shard1], [st.shard2]); // dropIndex res = st.s.getDB(dbName).getCollection(collName).dropIndex("idx1_1"); assert.commandWorked(res); -// TODO SERVER-44719: Once createIndex is made to check shard versions, after the createIndex -// above, each shard should have refreshed its cache. So the mongos will not need to retry the -// dropIndex here and will not get IndexNotFound from shard0 (which is ignored and causes the -// response from shard0 to be empty). -if (jsTestOptions().mongosBinVersion == "last-stable") { - assert.eq(res.raw[st.shard0.host].ok, 1, tojson(res)); -} else { - // dropIndexes checks shard versions so causes the first try to succeed on shard0 but not - // on shard1. When it retries after the refresh, it fails with IndexNotFound on shard0 - // so the response from shard0 is empty. - assert.eq(undefined, res.raw[st.shard0.host], tojson(res)); -} +assert.eq(res.raw[st.shard0.host].ok, 1, tojson(res)); assert.eq(res.raw[st.shard1.host].ok, 1, tojson(res)); assert.eq(undefined, res.raw[st.shard2.host], tojson(res)); checkShardIndexes("idx1", [], [st.shard0, st.shard1, st.shard2]); diff --git a/jstests/sharding/index_commands_shard_targeting.js b/jstests/sharding/index_commands_shard_targeting.js index 0eaf9d1914f..f0cf3d0386d 100644 --- a/jstests/sharding/index_commands_shard_targeting.js +++ b/jstests/sharding/index_commands_shard_targeting.js @@ -1,11 +1,16 @@ /* - * Test that the index commands send shard versions, and only target the primary - * shard and the shards that have chunks for the collection. + * Test that the index commands send and check shard versions, and only target the primary + * shard and the shards that have chunks for the collection. Also test that the commands fail + * if they are run when the critical section is in progress, and block until the critical + * section is over. * @tags: [requires_fcv_44] */ (function() { "use strict"; +load('jstests/libs/chunk_manipulation_util.js'); +load("jstests/libs/fail_point_util.js"); + /* * Returns the metadata for the collection in the shard's catalog cache. */ @@ -16,6 +21,25 @@ function getMetadataOnShard(shard, ns) { } /* + * Asserts that the collection version for the collection in the shard's catalog cache + * is equal to the given collection version. + */ +function assertCollectionVersionEquals(shard, ns, collectionVersion) { + assert.eq(getMetadataOnShard(shard, ns).collVersion, collectionVersion); +} + +/* + * Asserts that the collection version for the collection in the shard's catalog cache + * is older than the given collection version. + */ +function assertCollectionVersionOlderThan(shard, ns, collectionVersion) { + let shardCollectionVersion = getMetadataOnShard(shard, ns).collVersion; + if (shardCollectionVersion != undefined) { + assert.lt(shardCollectionVersion.t, collectionVersion.t); + } +} + +/* * Asserts that the shard version of the shard in its catalog cache is equal to the * given shard version. */ @@ -24,6 +48,17 @@ function assertShardVersionEquals(shard, ns, shardVersion) { } /* + * Moves the chunk that matches the given query to toShard. Forces fromShard to skip the + * recipient metadata refresh post-migration commit. + */ +function moveChunkNotRefreshRecipient(mongos, ns, fromShard, toShard, findQuery) { + let failPoint = configureFailPoint(fromShard, "doNotRefreshRecipientAfterCommit"); + assert.commandWorked(mongos.adminCommand( + {moveChunk: ns, find: findQuery, to: toShard.shardName, _waitForDelete: true})); + failPoint.off(); +} + +/* * Asserts that the shard has an index for the collection with the given index key. */ function assertIndexExistsOnShard(shard, dbName, collName, targetIndexKey) { @@ -53,21 +88,83 @@ function assertIndexDoesNotExistOnShard(shard, dbName, collName, targetIndexKey) } /* - * Performs chunk operations to make the primary shard (shard0) not own any chunks for collection, - * and only a subset of non-primary shards (shard1 and shard2) own chunks for collection. + * Runs the command after performing chunk operations to make the primary shard (shard0) not own + * any chunks for the collection, and the subset of non-primary shards (shard1 and shard2) that + * own chunks for the collection have stale catalog cache. + * + * Asserts that the command checks shard versions by checking that the shards to refresh their + * cache after the command is run. */ -function setUpShards(st, ns) { +function assertCommandChecksShardVersions(st, dbName, collName, testCase) { + const ns = dbName + "." + collName; + // Move the initial chunk out of the primary shard. - assert.commandWorked(st.s.adminCommand( - {moveChunk: ns, find: {_id: MinKey}, to: st.shard1.shardName, _waitForDelete: true})); + moveChunkNotRefreshRecipient(st.s, ns, st.shard0, st.shard1, {_id: MinKey}); // Split the chunk to create two chunks on shard1. Move one of the chunks to shard2. assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}})); - assert.commandWorked(st.s.adminCommand( - {moveChunk: ns, find: {_id: 0}, to: st.shard2.shardName, _waitForDelete: true})); + moveChunkNotRefreshRecipient(st.s, ns, st.shard1, st.shard2, {_id: 0}); // Assert that primary shard does not have any chunks for the collection. assertShardVersionEquals(st.shard0, ns, Timestamp(0, 0)); + + const mongosCollectionVersion = st.s.adminCommand({getShardVersion: ns}).version; + + // Assert that besides the latest donor shard (shard1), all shards have stale collection + // version. + assertCollectionVersionOlderThan(st.shard0, ns, mongosCollectionVersion); + assertCollectionVersionEquals(st.shard1, ns, mongosCollectionVersion); + assertCollectionVersionOlderThan(st.shard2, ns, mongosCollectionVersion); + assertCollectionVersionOlderThan(st.shard3, ns, mongosCollectionVersion); + + if (testCase.setUp) { + testCase.setUp(); + } + assert.commandWorked(st.s.getDB(dbName).runCommand(testCase.command)); + + // Assert that primary shard still has stale collection version after the command is run + // because both the shard version in the command and in the shard's cache are UNSHARDED + // (no chunks). + assertCollectionVersionOlderThan(st.shard0, ns, mongosCollectionVersion); + + // Assert that the other shards have the latest collection version after the command is run. + assertCollectionVersionEquals(st.shard1, ns, mongosCollectionVersion); + assertCollectionVersionEquals(st.shard2, ns, mongosCollectionVersion); + assertCollectionVersionEquals(st.shard3, ns, mongosCollectionVersion); +} + +/* + * Runs the command during a chunk migration after the donor enters the read-only phase of the + * critical section. Asserts that the command is blocked behind the critical section. + * + * Assumes that shard0 is the primary shard. + */ +function assertCommandBlocksIfCriticalSectionInProgress( + st, staticMongod, dbName, collName, testCase) { + const ns = dbName + "." + collName; + const fromShard = st.shard0; + const toShard = st.shard1; + + assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}})); + + // Turn on the fail point and wait for moveChunk to hit the fail point. + pauseMoveChunkAtStep(fromShard, moveChunkStepNames.chunkDataCommitted); + let joinMoveChunk = + moveChunkParallel(staticMongod, st.s.host, {_id: 0}, null, ns, toShard.shardName); + waitForMoveChunkStep(fromShard, moveChunkStepNames.chunkDataCommitted); + + if (testCase.setUp) { + testCase.setUp(); + } + + // Run the command and assert that it eventually times out. + const cmdWithMaxTimeMS = Object.assign({}, testCase.command, {maxTimeMS: 500}); + assert.commandFailedWithCode(st.s.getDB(dbName).runCommand(cmdWithMaxTimeMS), + ErrorCodes.MaxTimeMSExpired); + + // Turn off the fail point and wait for moveChunk to complete. + unpauseMoveChunkAtStep(fromShard, moveChunkStepNames.chunkDataCommitted); + joinMoveChunk(); } const numShards = 4; @@ -87,84 +184,94 @@ const index = { name: "x_1" }; -const expectedTargetedShards = new Set([st.shard0, st.shard1, st.shard2]); -assert.lt(expectedTargetedShards.size, numShards); +const testCases = { + createIndexes: collName => { + return { + command: {createIndexes: collName, indexes: [index]}, + assertCommandRanOnShard: (shard) => { + assertIndexExistsOnShard(shard, dbName, collName, index.key); + }, + assertCommandDidNotRunOnShard: (shard) => { + assertIndexDoesNotExistOnShard(shard, dbName, collName, index.key); + } + }; + }, + dropIndexes: collName => { + return { + command: {dropIndexes: collName, index: index.name}, + setUp: () => { + // Create the index directly on all the shards. + allShards.forEach(function(shard) { + assert.commandWorked(shard.getDB(dbName).runCommand( + {createIndexes: collName, indexes: [index]})); + }); + }, + assertCommandRanOnShard: (shard) => { + assertIndexDoesNotExistOnShard(shard, dbName, collName, index.key); + }, + assertCommandDidNotRunOnShard: (shard) => { + assertIndexExistsOnShard(shard, dbName, collName, index.key); + } + }; + }, + collMod: collName => { + return { + command: {collMod: collName, validator: {x: {$type: "string"}}}, + assertCommandRanOnShard: (shard) => { + assert.commandFailedWithCode( + shard.getCollection(dbName + "." + collName).insert({x: 1}), + ErrorCodes.DocumentValidationFailure); + }, + assertCommandDidNotRunOnShard: (shard) => { + assert.commandWorked(shard.getCollection(dbName + "." + collName).insert({x: 1})); + } + }; + }, +}; assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); st.ensurePrimaryShard(dbName, st.shard0.shardName); -jsTest.log("Test createIndexes command..."); - -(() => { - let testColl = testDB.testCreateIndexes; - let collName = testColl.getName(); - let ns = testColl.getFullName(); - - assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: shardKey})); - setUpShards(st, ns); - assert.commandWorked(testDB.runCommand({createIndexes: collName, indexes: [index]})); - - // Assert that the index exists on the targeted shards but not on the untargeted shards. - allShards.forEach(function(shard) { - if (expectedTargetedShards.has(shard)) { - assertIndexExistsOnShard(shard, dbName, collName, index.key); - } else { - assertIndexDoesNotExistOnShard(shard, dbName, collName, index.key); - } - }); -})(); - -jsTest.log("Test dropIndexes command..."); +// Test that the indexes commands send and check shard vesions, and only target the primary +// shard and the shards that own chunks for the collection. +const expectedTargetedShards = new Set([st.shard0, st.shard1, st.shard2]); +assert.lt(expectedTargetedShards.size, numShards); -(() => { - let testColl = testDB.testDropIndexes; - let collName = testColl.getName(); - let ns = testColl.getFullName(); +for (const command of Object.keys(testCases)) { + jsTest.log(`Testing that ${command} sends and checks shard version...`); + let collName = command; + let ns = dbName + "." + collName; + let testCase = testCases[command](collName); assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: shardKey})); - setUpShards(st, ns); - - // Create the index directly on all the shards. - allShards.forEach(function(shard) { - assert.commandWorked( - shard.getDB(dbName).runCommand({createIndexes: collName, indexes: [index]})); - }); - - // Drop the index. - assert.commandWorked(testDB.runCommand({dropIndexes: collName, index: index.name})); + assertCommandChecksShardVersions(st, dbName, collName, testCase); - // Assert that the index no longer exists on the targeted shards but still exists on the - // untargeted shards. allShards.forEach(function(shard) { if (expectedTargetedShards.has(shard)) { - assertIndexDoesNotExistOnShard(shard, dbName, collName, index.key); + testCase.assertCommandRanOnShard(shard); } else { - assertIndexExistsOnShard(shard, dbName, collName, index.key); + testCase.assertCommandDidNotRunOnShard(shard); } }); -})(); +} -jsTest.log("Test collMod command..."); +// Test that the indexes commands are blocked behind the critical section. +const staticMongod = MongoRunner.runMongod({}); -(() => { - let testColl = testDB.testCollMod; - let collName = testColl.getName(); - let ns = testColl.getFullName(); +for (const command of Object.keys(testCases)) { + jsTest.log(`Testing that ${command} is blocked behind the critical section...`); + let collName = command + "CriticalSection"; + let ns = dbName + "." + collName; + let testCase = testCases[command](collName); assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: shardKey})); - setUpShards(st, ns); - assert.commandWorked(testDB.runCommand({collMod: collName, validator: {x: {$type: "string"}}})); + assertCommandBlocksIfCriticalSectionInProgress(st, staticMongod, dbName, collName, testCase); - // Assert that the targeted shards do document validation, and the untargeted shards do not. allShards.forEach(function(shard) { - if (expectedTargetedShards.has(shard)) { - assert.commandFailedWithCode(shard.getCollection(ns).insert({x: 1}), - ErrorCodes.DocumentValidationFailure); - } else { - assert.commandWorked(shard.getCollection(ns).insert({x: 1})); - } + testCase.assertCommandDidNotRunOnShard(shard); }); -})(); +} st.stop(); +MongoRunner.stopMongod(staticMongod); })(); diff --git a/jstests/sharding/track_unsharded_collections_check_shard_version.js b/jstests/sharding/track_unsharded_collections_check_shard_version.js index 7b530acd2f9..9b37756978c 100644 --- a/jstests/sharding/track_unsharded_collections_check_shard_version.js +++ b/jstests/sharding/track_unsharded_collections_check_shard_version.js @@ -106,7 +106,6 @@ let testCases = { createIndexes: { implicitlyCreatesCollection: true, whenNamespaceIsViewFailsWith: ErrorCodes.CommandNotSupportedOnView, - doesNotCheckShardVersion: true, command: collName => { return {createIndexes: collName, indexes: [{key: {a: 1}, name: "index"}]}; }, diff --git a/src/mongo/db/catalog/coll_mod.cpp b/src/mongo/db/catalog/coll_mod.cpp index c9018e46350..49a666c9c67 100644 --- a/src/mongo/db/catalog/coll_mod.cpp +++ b/src/mongo/db/catalog/coll_mod.cpp @@ -294,7 +294,7 @@ Status _collModInternal(OperationContext* opCtx, return Status(ErrorCodes::NamespaceNotFound, "ns does not exist"); } - // This is necessary to set up CurOp and update the Top stats. + // This is necessary to set up CurOp, update the Top stats, and check shard version. OldClientContext ctx(opCtx, nss.ns()); bool userInitiatedWritesAndNotPrimary = opCtx->writesAreReplicated() && diff --git a/src/mongo/db/catalog/drop_indexes.cpp b/src/mongo/db/catalog/drop_indexes.cpp index d09a7a2044d..6a14fa7505f 100644 --- a/src/mongo/db/catalog/drop_indexes.cpp +++ b/src/mongo/db/catalog/drop_indexes.cpp @@ -224,6 +224,8 @@ Status dropIndexes(OperationContext* opCtx, collection->uuid()); WriteUnitOfWork wunit(opCtx); + + // This is necessary to check shard version. OldClientContext ctx(opCtx, nss.ns()); // Use an empty BSONObjBuilder to avoid duplicate appends to result on retry loops. diff --git a/src/mongo/db/commands/create_indexes.cpp b/src/mongo/db/commands/create_indexes.cpp index 4e6167326b4..d0da95f2847 100644 --- a/src/mongo/db/commands/create_indexes.cpp +++ b/src/mongo/db/commands/create_indexes.cpp @@ -341,6 +341,13 @@ void checkDatabaseShardingState(OperationContext* opCtx, StringData dbName) { } /** + * Checks collection sharding state. Throws exception on error. + */ +void checkCollectionShardingState(OperationContext* opCtx, const NamespaceString& ns) { + CollectionShardingState::get(opCtx, ns)->checkShardVersionOrThrow(opCtx, true); +} + +/** * Opens or creates database for index creation. * On database creation, the lock will be made exclusive. */ @@ -440,6 +447,7 @@ bool runCreateIndexesForMobile(OperationContext* opCtx, opCtx->recoveryUnit()->abandonSnapshot(); boost::optional<Lock::CollectionLock> exclusiveCollectionLock( boost::in_place_init, opCtx, ns, MODE_X); + checkCollectionShardingState(opCtx, ns); // Index builds can safely ignore prepare conflicts and perform writes. On primaries, an // exclusive lock in the final drain phase conflicts with prepared transactions. @@ -701,6 +709,10 @@ bool runCreateIndexesWithCoordinator(OperationContext* opCtx, opCtx->recoveryUnit()->abandonSnapshot(); Lock::CollectionLock collLock(opCtx, ns, MODE_X); + // This check is for optimization purposes only as this lock is released immediately after + // this and is acquired again when we build the index. + checkCollectionShardingState(opCtx, ns); + auto collection = getOrCreateCollection(opCtx, db, ns, cmdObj, &errmsg, &result); collectionUUID = collection->uuid(); } diff --git a/src/mongo/db/index_builds_coordinator.cpp b/src/mongo/db/index_builds_coordinator.cpp index ca37985492b..e406d9fa688 100644 --- a/src/mongo/db/index_builds_coordinator.cpp +++ b/src/mongo/db/index_builds_coordinator.cpp @@ -986,6 +986,13 @@ IndexBuildsCoordinator::_filterSpecsAndRegisterBuild( auto collection = autoColl.getCollection(); const auto& nss = collection->ns(); + // This check is for optimization purposes only as since this lock is released after this, + // and is acquired again when we build the index in _setUpIndexBuild. + auto status = CollectionShardingState::get(opCtx, nss)->checkShardVersionNoThrow(opCtx, true); + if (!status.isOK()) { + return status; + } + // Lock from when we ascertain what indexes to build through to when the build is registered // on the Coordinator and persistedly set up in the catalog. This serializes setting up an // index build so that no attempts are made to register the same build twice. @@ -1012,7 +1019,7 @@ IndexBuildsCoordinator::_filterSpecsAndRegisterBuild( buildUUID, collectionUUID, dbName.toString(), filteredSpecs, protocol, commitQuorum); replIndexBuildState->stats.numIndexesBefore = _getNumIndexesTotal(opCtx, collection); - Status status = _registerIndexBuild(lk, replIndexBuildState); + status = _registerIndexBuild(lk, replIndexBuildState); if (!status.isOK()) { return status; } @@ -1035,6 +1042,14 @@ Status IndexBuildsCoordinator::_setUpIndexBuild(OperationContext* opCtx, AutoGetCollection autoColl(opCtx, nssOrUuid, MODE_X); auto collection = autoColl.getCollection(); const auto& nss = collection->ns(); + auto status = CollectionShardingState::get(opCtx, nss)->checkShardVersionNoThrow(opCtx, true); + if (!status.isOK()) { + // We need to unregister the index build to allow retries to succeed. + stdx::unique_lock<Latch> lk(_mutex); + _unregisterIndexBuild(lk, replIndexBuildState); + + return status; + } auto replCoord = repl::ReplicationCoordinator::get(opCtx); const bool replSetAndNotPrimary = @@ -1082,7 +1097,7 @@ Status IndexBuildsCoordinator::_setUpIndexBuild(OperationContext* opCtx, : IndexBuildsManager::IndexConstraints::kEnforce; options.protocol = replIndexBuildState->protocol; - auto status = [&] { + status = [&] { if (!replSetAndNotPrimary) { // On standalones and primaries, call setUpIndexBuild(), which makes the initial catalog // write. On primaries, this replicates the startIndexBuild oplog entry. diff --git a/src/mongo/db/index_builds_coordinator_mongod.cpp b/src/mongo/db/index_builds_coordinator_mongod.cpp index 41a5a2ea25a..36968811cc2 100644 --- a/src/mongo/db/index_builds_coordinator_mongod.cpp +++ b/src/mongo/db/index_builds_coordinator_mongod.cpp @@ -38,6 +38,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/index_build_entry_helpers.h" #include "mongo/db/operation_context.h" +#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/service_context.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point.h" @@ -139,6 +140,13 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx, const auto deadline = opCtx->getDeadline(); const auto timeoutError = opCtx->getTimeoutError(); + const NamespaceStringOrUUID nssOrUuid{dbName, collectionUUID}; + const auto nss = CollectionCatalog::get(opCtx).resolveNamespaceStringOrUUID(opCtx, nssOrUuid); + + const auto& oss = OperationShardingState::get(opCtx); + const auto shardVersion = oss.getShardVersion(nss); + const auto dbVersion = oss.getDbVersion(dbName); + // Task in thread pool should have similar CurOp representation to the caller so that it can be // identified as a createIndexes operation. LogicalOp logicalOp = LogicalOp::opInvalid; @@ -165,6 +173,7 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx, buildUUID, collectionUUID, dbName, + nss, deadline, indexBuildOptions, logicalOp, @@ -172,7 +181,9 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx, replState, startPromise = std::move(startPromise), startTimestamp, - timeoutError + timeoutError, + shardVersion, + dbVersion ](auto status) mutable noexcept { // Clean up if we failed to schedule the task. if (!status.isOK()) { @@ -185,6 +196,9 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx, auto opCtx = Client::getCurrent()->makeOperationContext(); opCtx->setDeadlineByDate(deadline, timeoutError); + auto& oss = OperationShardingState::get(opCtx.get()); + oss.initializeClientRoutingVersions(nss, shardVersion, dbVersion); + { stdx::unique_lock<Client> lk(*opCtx->getClient()); auto curOp = CurOp::get(opCtx.get()); diff --git a/src/mongo/db/s/collection_metadata_filtering_test.cpp b/src/mongo/db/s/collection_metadata_filtering_test.cpp index da000652d40..b0effe64772 100644 --- a/src/mongo/db/s/collection_metadata_filtering_test.cpp +++ b/src/mongo/db/s/collection_metadata_filtering_test.cpp @@ -112,7 +112,7 @@ protected: const auto version = cm->getVersion(ShardId("0")); BSONObjBuilder builder; version.appendToCommand(&builder); - oss.initializeClientRoutingVersions(kNss, builder.obj()); + oss.initializeClientRoutingVersionsFromCommand(kNss, builder.obj()); } std::shared_ptr<MetadataManager> _manager; diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 5fc2a88d16a..5ea0a4756cc 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -203,6 +203,16 @@ void CollectionShardingState::checkShardVersionOrThrow(OperationContext* opCtx, (void)_getMetadataWithVersionCheckAt(opCtx, boost::none, isCollection); } +Status CollectionShardingState::checkShardVersionNoThrow(OperationContext* opCtx, + bool isCollection) noexcept { + try { + checkShardVersionOrThrow(opCtx, isCollection); + return Status::OK(); + } catch (const DBException& ex) { + return ex.toStatus(); + } +} + boost::optional<ScopedCollectionMetadata> CollectionShardingState::_getMetadataWithVersionCheckAt( OperationContext* opCtx, const boost::optional<mongo::LogicalTime>& atClusterTime, @@ -250,13 +260,9 @@ boost::optional<ScopedCollectionMetadata> CollectionShardingState::_getMetadataW }(); if (criticalSectionSignal) { - // Set migration critical section on operation sharding state: operation will wait for the - // migration to finish before returning failure and retrying. - auto& oss = OperationShardingState::get(opCtx); - oss.setMigrationCriticalSectionSignal(criticalSectionSignal); - - uasserted(StaleConfigInfo(_nss, receivedShardVersion, wantedShardVersion), - str::stream() << "migration commit in progress for " << _nss.ns()); + uasserted( + StaleConfigInfo(_nss, receivedShardVersion, wantedShardVersion, criticalSectionSignal), + str::stream() << "migration commit in progress for " << _nss.ns()); } if (receivedShardVersion.isWriteCompatibleWith(wantedShardVersion)) { diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index cc246dacdab..d411a9defa9 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -123,6 +123,11 @@ public: void checkShardVersionOrThrow(OperationContext* opCtx, bool isCollection); /** + * Similar to checkShardVersionOrThrow but returns a status instead of throwing. + */ + Status checkShardVersionNoThrow(OperationContext* opCtx, bool isCollection) noexcept; + + /** * Methods to control the collection's critical section. Methods listed below must be called * with both the collection lock and CollectionShardingRuntimeLock held in exclusive mode. * diff --git a/src/mongo/db/s/collection_sharding_state_test.cpp b/src/mongo/db/s/collection_sharding_state_test.cpp index 1ee6cfbeed8..a4bd7b58864 100644 --- a/src/mongo/db/s/collection_sharding_state_test.cpp +++ b/src/mongo/db/s/collection_sharding_state_test.cpp @@ -69,7 +69,7 @@ protected: const auto version = metadata.getShardVersion(); BSONObjBuilder builder; version.appendToCommand(&builder); - oss.initializeClientRoutingVersions(kTestNss, builder.obj()); + oss.initializeClientRoutingVersionsFromCommand(kTestNss, builder.obj()); } }; diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp index 0a726714fd6..b5e01ee4dff 100644 --- a/src/mongo/db/s/operation_sharding_state.cpp +++ b/src/mongo/db/s/operation_sharding_state.cpp @@ -82,14 +82,17 @@ bool OperationShardingState::allowImplicitCollectionCreation() const { return _allowImplicitCollectionCreation; } -void OperationShardingState::initializeClientRoutingVersions(NamespaceString nss, - const BSONObj& cmdObj) { +void OperationShardingState::initializeClientRoutingVersionsFromCommand(NamespaceString nss, + const BSONObj& cmdObj) { invariant(_shardVersions.empty()); invariant(_databaseVersions.empty()); + boost::optional<ChunkVersion> shardVersion; + boost::optional<DatabaseVersion> dbVersion; + const auto shardVersionElem = cmdObj.getField(ChunkVersion::kShardVersionField); if (!shardVersionElem.eoo()) { - _shardVersions[nss.ns()] = uassertStatusOK(ChunkVersion::parseFromCommand(cmdObj)); + shardVersion = uassertStatusOK(ChunkVersion::parseFromCommand(cmdObj)); } const auto dbVersionElem = cmdObj.getField(kDbVersionField); @@ -98,11 +101,29 @@ void OperationShardingState::initializeClientRoutingVersions(NamespaceString nss str::stream() << "expected databaseVersion element to be an object, got " << dbVersionElem, dbVersionElem.type() == BSONType::Object); + + dbVersion = DatabaseVersion::parse(IDLParserErrorContext("initializeClientRoutingVersions"), + dbVersionElem.Obj()); + } + + initializeClientRoutingVersions(nss, shardVersion, dbVersion); +} + +void OperationShardingState::initializeClientRoutingVersions( + NamespaceString nss, + const boost::optional<ChunkVersion>& shardVersion, + const boost::optional<DatabaseVersion>& dbVersion) { + invariant(_shardVersions.empty()); + invariant(_databaseVersions.empty()); + + if (shardVersion) { + _shardVersions[nss.ns()] = *shardVersion; + } + if (dbVersion) { // Unforunately this is a bit ugly; it's because a command comes with a shardVersion or // databaseVersion, and the assumption is that those versions are applied to whatever is // returned by the Command's parseNs(), which can either be a full namespace or just a db. - _databaseVersions[nss.db().empty() ? nss.ns() : nss.db()] = DatabaseVersion::parse( - IDLParserErrorContext("initializeClientRoutingVersions"), dbVersionElem.Obj()); + _databaseVersions[nss.db().empty() ? nss.ns() : nss.db()] = *dbVersion; } } diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h index d99227e0254..77dd848d9c4 100644 --- a/src/mongo/db/s/operation_sharding_state.h +++ b/src/mongo/db/s/operation_sharding_state.h @@ -96,7 +96,14 @@ public: * This initialization may only be performed once for the lifetime of the object, which * coincides with the lifetime of the client's request. */ - void initializeClientRoutingVersions(NamespaceString nss, const BSONObj& cmdObj); + void initializeClientRoutingVersionsFromCommand(NamespaceString nss, const BSONObj& cmdObj); + + /** + * Stores the given shardVersion and databaseVersion for the given namespace. + */ + void initializeClientRoutingVersions(NamespaceString nss, + const boost::optional<ChunkVersion>& shardVersion, + const boost::optional<DatabaseVersion>& dbVersion); /** * Returns whether or not there is a shard version associated with this operation. diff --git a/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp b/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp index 625f62a51f3..4d544948696 100644 --- a/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp +++ b/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp @@ -73,6 +73,13 @@ ScopedOperationCompletionShardingActions::~ScopedOperationCompletionShardingActi } if (auto staleInfo = status->extraInfo<StaleConfigInfo>()) { + if (staleInfo->getCriticalSectionSignal()) { + // Set migration critical section on operation sharding state: operation will wait for + // the migration to finish before returning. + auto& oss = OperationShardingState::get(_opCtx); + oss.setMigrationCriticalSectionSignal(staleInfo->getCriticalSectionSignal()); + } + auto handleMismatchStatus = onShardVersionMismatchNoExcept( _opCtx, staleInfo->getNss(), staleInfo->getVersionReceived()); if (!handleMismatchStatus.isOK()) diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 3bff6836fa9..58db096f7c0 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -959,7 +959,7 @@ void execCommandDatabase(OperationContext* opCtx, readConcernArgs.getLevel() != repl::ReadConcernLevel::kAvailableReadConcern && (iAmPrimary || (readConcernArgs.hasLevel() || readConcernArgs.getArgsAfterClusterTime()))) { - oss.initializeClientRoutingVersions(invocation->ns(), request.body); + oss.initializeClientRoutingVersionsFromCommand(invocation->ns(), request.body); auto const shardingState = ShardingState::get(opCtx); if (oss.hasShardVersion() || oss.hasDbVersion()) { diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp index c01f3260dc6..178b912a9f0 100644 --- a/src/mongo/db/service_entry_point_mongod.cpp +++ b/src/mongo/db/service_entry_point_mongod.cpp @@ -40,6 +40,7 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/speculative_majority_read_info.h" #include "mongo/db/s/implicit_create_collection.h" +#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/scoped_operation_completion_sharding_actions.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/sharding_config_optime_gossip.h" @@ -182,6 +183,13 @@ public: void handleException(const DBException& e, OperationContext* opCtx) const override { // If we got a stale config, wait in case the operation is stuck in a critical section if (auto sce = e.extraInfo<StaleConfigInfo>()) { + if (sce->getCriticalSectionSignal()) { + // Set migration critical section on operation sharding state: operation will wait + // for the migration to finish before returning. + auto& oss = OperationShardingState::get(opCtx); + oss.setMigrationCriticalSectionSignal(sce->getCriticalSectionSignal()); + } + if (!opCtx->getClient()->isInDirectClient()) { // We already have the StaleConfig exception, so just swallow any errors due to // refresh diff --git a/src/mongo/s/stale_exception.h b/src/mongo/s/stale_exception.h index aab3af48514..a081e4dad2a 100644 --- a/src/mongo/s/stale_exception.h +++ b/src/mongo/s/stale_exception.h @@ -32,6 +32,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/s/chunk_version.h" #include "mongo/s/database_version_gen.h" +#include "mongo/util/concurrency/notification.h" namespace mongo { @@ -41,8 +42,12 @@ public: StaleConfigInfo(NamespaceString nss, ChunkVersion received, - boost::optional<ChunkVersion> wanted) - : _nss(std::move(nss)), _received(received), _wanted(wanted) {} + boost::optional<ChunkVersion> wanted, + std::shared_ptr<Notification<void>> criticalSectionSignal = nullptr) + : _nss(std::move(nss)), + _received(received), + _wanted(wanted), + _criticalSectionSignal(criticalSectionSignal) {} const auto& getNss() const { return _nss; @@ -56,6 +61,10 @@ public: return _wanted; } + auto getCriticalSectionSignal() const { + return _criticalSectionSignal; + } + void serialize(BSONObjBuilder* bob) const override; static std::shared_ptr<const ErrorExtraInfo> parse(const BSONObj&); static StaleConfigInfo parseFromCommandError(const BSONObj& commandError); @@ -64,6 +73,10 @@ private: NamespaceString _nss; ChunkVersion _received; boost::optional<ChunkVersion> _wanted; + + // This signal does not get serialized and therefore does not get propagated + // to the router. + std::shared_ptr<Notification<void>> _criticalSectionSignal; }; using StaleConfigException = ExceptionFor<ErrorCodes::StaleConfig>; |