diff options
author | Cheahuychou Mao <cheahuychou.mao@mongodb.com> | 2019-12-06 16:30:41 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-12-06 16:30:41 +0000 |
commit | eba76c558b3e7d784c146b51ced16d48b1d0efe7 (patch) | |
tree | eb43d876af50dfd29a6596878f15ed9ab500a30b /src/mongo | |
parent | 13944bb3fedc8d91c02c56bb66bb5c76a0a558d0 (diff) | |
download | mongo-eba76c558b3e7d784c146b51ced16d48b1d0efe7.tar.gz |
SERVER-44719 Make createIndexes, dropIndexes, and collMod check shard versions
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/catalog/coll_mod.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/catalog/drop_indexes.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/commands/create_indexes.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/index_builds_coordinator.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/index_builds_coordinator_mongod.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/s/collection_metadata_filtering_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.h | 5 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/operation_sharding_state.cpp | 31 | ||||
-rw-r--r-- | src/mongo/db/s/operation_sharding_state.h | 9 | ||||
-rw-r--r-- | src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_mongod.cpp | 8 | ||||
-rw-r--r-- | src/mongo/s/stale_exception.h | 17 |
15 files changed, 132 insertions, 22 deletions
diff --git a/src/mongo/db/catalog/coll_mod.cpp b/src/mongo/db/catalog/coll_mod.cpp index c9018e46350..49a666c9c67 100644 --- a/src/mongo/db/catalog/coll_mod.cpp +++ b/src/mongo/db/catalog/coll_mod.cpp @@ -294,7 +294,7 @@ Status _collModInternal(OperationContext* opCtx, return Status(ErrorCodes::NamespaceNotFound, "ns does not exist"); } - // This is necessary to set up CurOp and update the Top stats. + // This is necessary to set up CurOp, update the Top stats, and check shard version. OldClientContext ctx(opCtx, nss.ns()); bool userInitiatedWritesAndNotPrimary = opCtx->writesAreReplicated() && diff --git a/src/mongo/db/catalog/drop_indexes.cpp b/src/mongo/db/catalog/drop_indexes.cpp index d09a7a2044d..6a14fa7505f 100644 --- a/src/mongo/db/catalog/drop_indexes.cpp +++ b/src/mongo/db/catalog/drop_indexes.cpp @@ -224,6 +224,8 @@ Status dropIndexes(OperationContext* opCtx, collection->uuid()); WriteUnitOfWork wunit(opCtx); + + // This is necessary to check shard version. OldClientContext ctx(opCtx, nss.ns()); // Use an empty BSONObjBuilder to avoid duplicate appends to result on retry loops. diff --git a/src/mongo/db/commands/create_indexes.cpp b/src/mongo/db/commands/create_indexes.cpp index 4e6167326b4..d0da95f2847 100644 --- a/src/mongo/db/commands/create_indexes.cpp +++ b/src/mongo/db/commands/create_indexes.cpp @@ -341,6 +341,13 @@ void checkDatabaseShardingState(OperationContext* opCtx, StringData dbName) { } /** + * Checks collection sharding state. Throws exception on error. + */ +void checkCollectionShardingState(OperationContext* opCtx, const NamespaceString& ns) { + CollectionShardingState::get(opCtx, ns)->checkShardVersionOrThrow(opCtx, true); +} + +/** * Opens or creates database for index creation. * On database creation, the lock will be made exclusive. */ @@ -440,6 +447,7 @@ bool runCreateIndexesForMobile(OperationContext* opCtx, opCtx->recoveryUnit()->abandonSnapshot(); boost::optional<Lock::CollectionLock> exclusiveCollectionLock( boost::in_place_init, opCtx, ns, MODE_X); + checkCollectionShardingState(opCtx, ns); // Index builds can safely ignore prepare conflicts and perform writes. On primaries, an // exclusive lock in the final drain phase conflicts with prepared transactions. @@ -701,6 +709,10 @@ bool runCreateIndexesWithCoordinator(OperationContext* opCtx, opCtx->recoveryUnit()->abandonSnapshot(); Lock::CollectionLock collLock(opCtx, ns, MODE_X); + // This check is for optimization purposes only as this lock is released immediately after + // this and is acquired again when we build the index. + checkCollectionShardingState(opCtx, ns); + auto collection = getOrCreateCollection(opCtx, db, ns, cmdObj, &errmsg, &result); collectionUUID = collection->uuid(); } diff --git a/src/mongo/db/index_builds_coordinator.cpp b/src/mongo/db/index_builds_coordinator.cpp index ca37985492b..e406d9fa688 100644 --- a/src/mongo/db/index_builds_coordinator.cpp +++ b/src/mongo/db/index_builds_coordinator.cpp @@ -986,6 +986,13 @@ IndexBuildsCoordinator::_filterSpecsAndRegisterBuild( auto collection = autoColl.getCollection(); const auto& nss = collection->ns(); + // This check is for optimization purposes only as since this lock is released after this, + // and is acquired again when we build the index in _setUpIndexBuild. + auto status = CollectionShardingState::get(opCtx, nss)->checkShardVersionNoThrow(opCtx, true); + if (!status.isOK()) { + return status; + } + // Lock from when we ascertain what indexes to build through to when the build is registered // on the Coordinator and persistedly set up in the catalog. This serializes setting up an // index build so that no attempts are made to register the same build twice. @@ -1012,7 +1019,7 @@ IndexBuildsCoordinator::_filterSpecsAndRegisterBuild( buildUUID, collectionUUID, dbName.toString(), filteredSpecs, protocol, commitQuorum); replIndexBuildState->stats.numIndexesBefore = _getNumIndexesTotal(opCtx, collection); - Status status = _registerIndexBuild(lk, replIndexBuildState); + status = _registerIndexBuild(lk, replIndexBuildState); if (!status.isOK()) { return status; } @@ -1035,6 +1042,14 @@ Status IndexBuildsCoordinator::_setUpIndexBuild(OperationContext* opCtx, AutoGetCollection autoColl(opCtx, nssOrUuid, MODE_X); auto collection = autoColl.getCollection(); const auto& nss = collection->ns(); + auto status = CollectionShardingState::get(opCtx, nss)->checkShardVersionNoThrow(opCtx, true); + if (!status.isOK()) { + // We need to unregister the index build to allow retries to succeed. + stdx::unique_lock<Latch> lk(_mutex); + _unregisterIndexBuild(lk, replIndexBuildState); + + return status; + } auto replCoord = repl::ReplicationCoordinator::get(opCtx); const bool replSetAndNotPrimary = @@ -1082,7 +1097,7 @@ Status IndexBuildsCoordinator::_setUpIndexBuild(OperationContext* opCtx, : IndexBuildsManager::IndexConstraints::kEnforce; options.protocol = replIndexBuildState->protocol; - auto status = [&] { + status = [&] { if (!replSetAndNotPrimary) { // On standalones and primaries, call setUpIndexBuild(), which makes the initial catalog // write. On primaries, this replicates the startIndexBuild oplog entry. diff --git a/src/mongo/db/index_builds_coordinator_mongod.cpp b/src/mongo/db/index_builds_coordinator_mongod.cpp index 41a5a2ea25a..36968811cc2 100644 --- a/src/mongo/db/index_builds_coordinator_mongod.cpp +++ b/src/mongo/db/index_builds_coordinator_mongod.cpp @@ -38,6 +38,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/index_build_entry_helpers.h" #include "mongo/db/operation_context.h" +#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/service_context.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point.h" @@ -139,6 +140,13 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx, const auto deadline = opCtx->getDeadline(); const auto timeoutError = opCtx->getTimeoutError(); + const NamespaceStringOrUUID nssOrUuid{dbName, collectionUUID}; + const auto nss = CollectionCatalog::get(opCtx).resolveNamespaceStringOrUUID(opCtx, nssOrUuid); + + const auto& oss = OperationShardingState::get(opCtx); + const auto shardVersion = oss.getShardVersion(nss); + const auto dbVersion = oss.getDbVersion(dbName); + // Task in thread pool should have similar CurOp representation to the caller so that it can be // identified as a createIndexes operation. LogicalOp logicalOp = LogicalOp::opInvalid; @@ -165,6 +173,7 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx, buildUUID, collectionUUID, dbName, + nss, deadline, indexBuildOptions, logicalOp, @@ -172,7 +181,9 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx, replState, startPromise = std::move(startPromise), startTimestamp, - timeoutError + timeoutError, + shardVersion, + dbVersion ](auto status) mutable noexcept { // Clean up if we failed to schedule the task. if (!status.isOK()) { @@ -185,6 +196,9 @@ IndexBuildsCoordinatorMongod::startIndexBuild(OperationContext* opCtx, auto opCtx = Client::getCurrent()->makeOperationContext(); opCtx->setDeadlineByDate(deadline, timeoutError); + auto& oss = OperationShardingState::get(opCtx.get()); + oss.initializeClientRoutingVersions(nss, shardVersion, dbVersion); + { stdx::unique_lock<Client> lk(*opCtx->getClient()); auto curOp = CurOp::get(opCtx.get()); diff --git a/src/mongo/db/s/collection_metadata_filtering_test.cpp b/src/mongo/db/s/collection_metadata_filtering_test.cpp index da000652d40..b0effe64772 100644 --- a/src/mongo/db/s/collection_metadata_filtering_test.cpp +++ b/src/mongo/db/s/collection_metadata_filtering_test.cpp @@ -112,7 +112,7 @@ protected: const auto version = cm->getVersion(ShardId("0")); BSONObjBuilder builder; version.appendToCommand(&builder); - oss.initializeClientRoutingVersions(kNss, builder.obj()); + oss.initializeClientRoutingVersionsFromCommand(kNss, builder.obj()); } std::shared_ptr<MetadataManager> _manager; diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 5fc2a88d16a..5ea0a4756cc 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -203,6 +203,16 @@ void CollectionShardingState::checkShardVersionOrThrow(OperationContext* opCtx, (void)_getMetadataWithVersionCheckAt(opCtx, boost::none, isCollection); } +Status CollectionShardingState::checkShardVersionNoThrow(OperationContext* opCtx, + bool isCollection) noexcept { + try { + checkShardVersionOrThrow(opCtx, isCollection); + return Status::OK(); + } catch (const DBException& ex) { + return ex.toStatus(); + } +} + boost::optional<ScopedCollectionMetadata> CollectionShardingState::_getMetadataWithVersionCheckAt( OperationContext* opCtx, const boost::optional<mongo::LogicalTime>& atClusterTime, @@ -250,13 +260,9 @@ boost::optional<ScopedCollectionMetadata> CollectionShardingState::_getMetadataW }(); if (criticalSectionSignal) { - // Set migration critical section on operation sharding state: operation will wait for the - // migration to finish before returning failure and retrying. - auto& oss = OperationShardingState::get(opCtx); - oss.setMigrationCriticalSectionSignal(criticalSectionSignal); - - uasserted(StaleConfigInfo(_nss, receivedShardVersion, wantedShardVersion), - str::stream() << "migration commit in progress for " << _nss.ns()); + uasserted( + StaleConfigInfo(_nss, receivedShardVersion, wantedShardVersion, criticalSectionSignal), + str::stream() << "migration commit in progress for " << _nss.ns()); } if (receivedShardVersion.isWriteCompatibleWith(wantedShardVersion)) { diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index cc246dacdab..d411a9defa9 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -123,6 +123,11 @@ public: void checkShardVersionOrThrow(OperationContext* opCtx, bool isCollection); /** + * Similar to checkShardVersionOrThrow but returns a status instead of throwing. + */ + Status checkShardVersionNoThrow(OperationContext* opCtx, bool isCollection) noexcept; + + /** * Methods to control the collection's critical section. Methods listed below must be called * with both the collection lock and CollectionShardingRuntimeLock held in exclusive mode. * diff --git a/src/mongo/db/s/collection_sharding_state_test.cpp b/src/mongo/db/s/collection_sharding_state_test.cpp index 1ee6cfbeed8..a4bd7b58864 100644 --- a/src/mongo/db/s/collection_sharding_state_test.cpp +++ b/src/mongo/db/s/collection_sharding_state_test.cpp @@ -69,7 +69,7 @@ protected: const auto version = metadata.getShardVersion(); BSONObjBuilder builder; version.appendToCommand(&builder); - oss.initializeClientRoutingVersions(kTestNss, builder.obj()); + oss.initializeClientRoutingVersionsFromCommand(kTestNss, builder.obj()); } }; diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp index 0a726714fd6..b5e01ee4dff 100644 --- a/src/mongo/db/s/operation_sharding_state.cpp +++ b/src/mongo/db/s/operation_sharding_state.cpp @@ -82,14 +82,17 @@ bool OperationShardingState::allowImplicitCollectionCreation() const { return _allowImplicitCollectionCreation; } -void OperationShardingState::initializeClientRoutingVersions(NamespaceString nss, - const BSONObj& cmdObj) { +void OperationShardingState::initializeClientRoutingVersionsFromCommand(NamespaceString nss, + const BSONObj& cmdObj) { invariant(_shardVersions.empty()); invariant(_databaseVersions.empty()); + boost::optional<ChunkVersion> shardVersion; + boost::optional<DatabaseVersion> dbVersion; + const auto shardVersionElem = cmdObj.getField(ChunkVersion::kShardVersionField); if (!shardVersionElem.eoo()) { - _shardVersions[nss.ns()] = uassertStatusOK(ChunkVersion::parseFromCommand(cmdObj)); + shardVersion = uassertStatusOK(ChunkVersion::parseFromCommand(cmdObj)); } const auto dbVersionElem = cmdObj.getField(kDbVersionField); @@ -98,11 +101,29 @@ void OperationShardingState::initializeClientRoutingVersions(NamespaceString nss str::stream() << "expected databaseVersion element to be an object, got " << dbVersionElem, dbVersionElem.type() == BSONType::Object); + + dbVersion = DatabaseVersion::parse(IDLParserErrorContext("initializeClientRoutingVersions"), + dbVersionElem.Obj()); + } + + initializeClientRoutingVersions(nss, shardVersion, dbVersion); +} + +void OperationShardingState::initializeClientRoutingVersions( + NamespaceString nss, + const boost::optional<ChunkVersion>& shardVersion, + const boost::optional<DatabaseVersion>& dbVersion) { + invariant(_shardVersions.empty()); + invariant(_databaseVersions.empty()); + + if (shardVersion) { + _shardVersions[nss.ns()] = *shardVersion; + } + if (dbVersion) { // Unforunately this is a bit ugly; it's because a command comes with a shardVersion or // databaseVersion, and the assumption is that those versions are applied to whatever is // returned by the Command's parseNs(), which can either be a full namespace or just a db. - _databaseVersions[nss.db().empty() ? nss.ns() : nss.db()] = DatabaseVersion::parse( - IDLParserErrorContext("initializeClientRoutingVersions"), dbVersionElem.Obj()); + _databaseVersions[nss.db().empty() ? nss.ns() : nss.db()] = *dbVersion; } } diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h index d99227e0254..77dd848d9c4 100644 --- a/src/mongo/db/s/operation_sharding_state.h +++ b/src/mongo/db/s/operation_sharding_state.h @@ -96,7 +96,14 @@ public: * This initialization may only be performed once for the lifetime of the object, which * coincides with the lifetime of the client's request. */ - void initializeClientRoutingVersions(NamespaceString nss, const BSONObj& cmdObj); + void initializeClientRoutingVersionsFromCommand(NamespaceString nss, const BSONObj& cmdObj); + + /** + * Stores the given shardVersion and databaseVersion for the given namespace. + */ + void initializeClientRoutingVersions(NamespaceString nss, + const boost::optional<ChunkVersion>& shardVersion, + const boost::optional<DatabaseVersion>& dbVersion); /** * Returns whether or not there is a shard version associated with this operation. diff --git a/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp b/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp index 625f62a51f3..4d544948696 100644 --- a/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp +++ b/src/mongo/db/s/scoped_operation_completion_sharding_actions.cpp @@ -73,6 +73,13 @@ ScopedOperationCompletionShardingActions::~ScopedOperationCompletionShardingActi } if (auto staleInfo = status->extraInfo<StaleConfigInfo>()) { + if (staleInfo->getCriticalSectionSignal()) { + // Set migration critical section on operation sharding state: operation will wait for + // the migration to finish before returning. + auto& oss = OperationShardingState::get(_opCtx); + oss.setMigrationCriticalSectionSignal(staleInfo->getCriticalSectionSignal()); + } + auto handleMismatchStatus = onShardVersionMismatchNoExcept( _opCtx, staleInfo->getNss(), staleInfo->getVersionReceived()); if (!handleMismatchStatus.isOK()) diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 3bff6836fa9..58db096f7c0 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -959,7 +959,7 @@ void execCommandDatabase(OperationContext* opCtx, readConcernArgs.getLevel() != repl::ReadConcernLevel::kAvailableReadConcern && (iAmPrimary || (readConcernArgs.hasLevel() || readConcernArgs.getArgsAfterClusterTime()))) { - oss.initializeClientRoutingVersions(invocation->ns(), request.body); + oss.initializeClientRoutingVersionsFromCommand(invocation->ns(), request.body); auto const shardingState = ShardingState::get(opCtx); if (oss.hasShardVersion() || oss.hasDbVersion()) { diff --git a/src/mongo/db/service_entry_point_mongod.cpp b/src/mongo/db/service_entry_point_mongod.cpp index c01f3260dc6..178b912a9f0 100644 --- a/src/mongo/db/service_entry_point_mongod.cpp +++ b/src/mongo/db/service_entry_point_mongod.cpp @@ -40,6 +40,7 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/speculative_majority_read_info.h" #include "mongo/db/s/implicit_create_collection.h" +#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/scoped_operation_completion_sharding_actions.h" #include "mongo/db/s/shard_filtering_metadata_refresh.h" #include "mongo/db/s/sharding_config_optime_gossip.h" @@ -182,6 +183,13 @@ public: void handleException(const DBException& e, OperationContext* opCtx) const override { // If we got a stale config, wait in case the operation is stuck in a critical section if (auto sce = e.extraInfo<StaleConfigInfo>()) { + if (sce->getCriticalSectionSignal()) { + // Set migration critical section on operation sharding state: operation will wait + // for the migration to finish before returning. + auto& oss = OperationShardingState::get(opCtx); + oss.setMigrationCriticalSectionSignal(sce->getCriticalSectionSignal()); + } + if (!opCtx->getClient()->isInDirectClient()) { // We already have the StaleConfig exception, so just swallow any errors due to // refresh diff --git a/src/mongo/s/stale_exception.h b/src/mongo/s/stale_exception.h index aab3af48514..a081e4dad2a 100644 --- a/src/mongo/s/stale_exception.h +++ b/src/mongo/s/stale_exception.h @@ -32,6 +32,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/s/chunk_version.h" #include "mongo/s/database_version_gen.h" +#include "mongo/util/concurrency/notification.h" namespace mongo { @@ -41,8 +42,12 @@ public: StaleConfigInfo(NamespaceString nss, ChunkVersion received, - boost::optional<ChunkVersion> wanted) - : _nss(std::move(nss)), _received(received), _wanted(wanted) {} + boost::optional<ChunkVersion> wanted, + std::shared_ptr<Notification<void>> criticalSectionSignal = nullptr) + : _nss(std::move(nss)), + _received(received), + _wanted(wanted), + _criticalSectionSignal(criticalSectionSignal) {} const auto& getNss() const { return _nss; @@ -56,6 +61,10 @@ public: return _wanted; } + auto getCriticalSectionSignal() const { + return _criticalSectionSignal; + } + void serialize(BSONObjBuilder* bob) const override; static std::shared_ptr<const ErrorExtraInfo> parse(const BSONObj&); static StaleConfigInfo parseFromCommandError(const BSONObj& commandError); @@ -64,6 +73,10 @@ private: NamespaceString _nss; ChunkVersion _received; boost::optional<ChunkVersion> _wanted; + + // This signal does not get serialized and therefore does not get propagated + // to the router. + std::shared_ptr<Notification<void>> _criticalSectionSignal; }; using StaleConfigException = ExceptionFor<ErrorCodes::StaleConfig>; |