diff options
author | Antonio Fuschetto <antonio.fuschetto@mongodb.com> | 2022-12-20 15:04:40 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-12-20 16:06:19 +0000 |
commit | 4dccf13844ceb965fd9c141758d81f5618693c30 (patch) | |
tree | cfa0c460ecea8bb934f3189509b6186e757452f9 | |
parent | f7313bb1cf96e3e542ac4c8ad9f932ec68164b84 (diff) | |
download | mongo-4dccf13844ceb965fd9c141758d81f5618693c30.tar.gz |
SERVER-71202 Adapt existing movePrimary logic to new DDL coordinator
-rw-r--r-- | buildscripts/resmokeconfig/fully_disabled_feature_flags.yml | 3 | ||||
-rw-r--r-- | jstests/sharding/move_primary_with_writes.js | 18 | ||||
-rw-r--r-- | src/mongo/db/s/database_sharding_state.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/s/database_sharding_state.h | 34 | ||||
-rw-r--r-- | src/mongo/db/s/move_primary_coordinator.cpp | 377 | ||||
-rw-r--r-- | src/mongo/db/s/move_primary_coordinator.h | 99 | ||||
-rw-r--r-- | src/mongo/db/s/move_primary_coordinator_document.idl | 13 | ||||
-rw-r--r-- | src/mongo/db/s/move_primary_source_manager.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/s/move_primary_source_manager.h | 2 | ||||
-rw-r--r-- | src/mongo/db/s/shardsvr_move_primary_command.cpp | 16 | ||||
-rw-r--r-- | src/mongo/s/sharding_feature_flags.idl | 2 |
11 files changed, 516 insertions, 64 deletions
diff --git a/buildscripts/resmokeconfig/fully_disabled_feature_flags.yml b/buildscripts/resmokeconfig/fully_disabled_feature_flags.yml index da4801f2b2a..92bc3b3e815 100644 --- a/buildscripts/resmokeconfig/fully_disabled_feature_flags.yml +++ b/buildscripts/resmokeconfig/fully_disabled_feature_flags.yml @@ -9,8 +9,5 @@ # released create the transactions collection index and is only meant to be enabled adhoc, so only # its targeted tests should enable it. - featureFlagAlwaysCreateConfigTransactionsPartialIndexOnStepUp -# Disable the new logic for the resilient movePrimary for non-dedicated tests until the logic is -# mature enough to be enabled for all tests. -- featureFlagResilientMovePrimary # Disable the feature flag for catalog shard until most of the codebase can run in this mode. - featureFlagCatalogShard diff --git a/jstests/sharding/move_primary_with_writes.js b/jstests/sharding/move_primary_with_writes.js index 9de2338a31a..9b28a7838bb 100644 --- a/jstests/sharding/move_primary_with_writes.js +++ b/jstests/sharding/move_primary_with_writes.js @@ -5,6 +5,7 @@ 'use strict'; load('jstests/libs/fail_point_util.js'); +load("jstests/libs/feature_flag_util.js"); let st = new ShardingTest({ mongos: 2, @@ -285,11 +286,20 @@ st.forEachConnection(shard => { } }); +let cloningCatalogDataFPName = "hangBeforeCloningCatalogData"; +let cleaningStaleDataFPName = "hangBeforeCleaningStaleData"; + +// TODO (SERVER-71309): Remove once 7.0 becomes last LTS. +if (!FeatureFlagUtil.isEnabled(st.configRS.getPrimary().getDB('admin'), "ResilientMovePrimary")) { + cloningCatalogDataFPName = "hangInCloneStage"; + cleaningStaleDataFPName = "hangInCleanStaleDataStage"; +} + createCollections(); let fromShard = st.getPrimaryShard(dbName); let toShard = st.getOther(fromShard); -testMovePrimary('hangInCloneStage', fromShard, toShard, st.s.getDB(dbName), true, false); +testMovePrimary(cloningCatalogDataFPName, fromShard, toShard, st.s.getDB(dbName), true, false); verifyDocuments(toShard.getDB(dbName), 3); verifyDocuments(fromShard.getDB(dbName), 0); @@ -297,19 +307,19 @@ createCollections(); fromShard = st.getPrimaryShard(dbName); toShard = st.getOther(fromShard); -testMovePrimary('hangInCloneStage', fromShard, toShard, st.s.getDB(dbName), false, true); +testMovePrimary(cloningCatalogDataFPName, fromShard, toShard, st.s.getDB(dbName), false, true); verifyDocuments(toShard.getDB(dbName), 3); verifyDocuments(fromShard.getDB(dbName), 0); createCollections(); fromShard = st.getPrimaryShard(dbName); toShard = st.getOther(fromShard); -testMovePrimaryDDL('hangInCloneStage', fromShard, toShard, st.s.getDB("admin"), false, true); +testMovePrimaryDDL(cloningCatalogDataFPName, fromShard, toShard, st.s.getDB("admin"), false, true); createCollections(); fromShard = st.getPrimaryShard(dbName); toShard = st.getOther(fromShard); -testMovePrimary('hangInCleanStaleDataStage', fromShard, toShard, st.s.getDB(dbName), false, false); +testMovePrimary(cleaningStaleDataFPName, fromShard, toShard, st.s.getDB(dbName), false, false); overrideDDLLockTimeoutFPs.forEach(fp => fp.off()); diff --git a/src/mongo/db/s/database_sharding_state.cpp b/src/mongo/db/s/database_sharding_state.cpp index f3c957e97a0..b55614fc6ff 100644 --- a/src/mongo/db/s/database_sharding_state.cpp +++ b/src/mongo/db/s/database_sharding_state.cpp @@ -156,18 +156,14 @@ void DatabaseShardingState::exitCriticalSectionNoChecks(OperationContext* opCtx) _critSec.exitCriticalSectionNoChecks(); } -void DatabaseShardingState::setMovePrimarySourceManager(OperationContext* opCtx, - MovePrimarySourceManager* sourceMgr) { +void DatabaseShardingState::setMovePrimaryInProgress(OperationContext* opCtx) { invariant(opCtx->lockState()->isDbLockedForMode(_dbName, MODE_X)); - invariant(sourceMgr); - invariant(!_sourceMgr); - - _sourceMgr = sourceMgr; + _movePrimaryInProgress = true; } -void DatabaseShardingState::clearMovePrimarySourceManager(OperationContext* opCtx) { +void DatabaseShardingState::unsetMovePrimaryInProgress(OperationContext* opCtx) { invariant(opCtx->lockState()->isDbLockedForMode(_dbName, MODE_IX)); - _sourceMgr = nullptr; + _movePrimaryInProgress = false; } void DatabaseShardingState::setDbMetadataRefreshFuture(SharedSemiFuture<void> future, diff --git a/src/mongo/db/s/database_sharding_state.h b/src/mongo/db/s/database_sharding_state.h index 344635dd0e0..c079c23a619 100644 --- a/src/mongo/db/s/database_sharding_state.h +++ b/src/mongo/db/s/database_sharding_state.h @@ -35,8 +35,6 @@ namespace mongo { -class MovePrimarySourceManager; - enum class DSSAcquisitionMode { kShared, kExclusive }; /** @@ -112,25 +110,28 @@ public: } /** - * Returns the active movePrimary source manager, if one is available. + * Returns `true` whether a `movePrimary` operation on this database is in progress, `false` + * otherwise. */ bool isMovePrimaryInProgress() const { - return _sourceMgr; + return _movePrimaryInProgress; } /** - * Attaches a movePrimary source manager to this database's sharding state. Must be called with - * the database lock in X mode. May not be called if there is a movePrimary source manager - * already installed. Must be followed by a call to clearMovePrimarySourceManager. + * Declares that a `movePrimary` operation on this database is in progress. This causes write + * operations on this database to fail with the `MovePrimaryInProgress` error. + * + * Must be called with the database locked in X mode. */ - void setMovePrimarySourceManager(OperationContext* opCtx, MovePrimarySourceManager* sourceMgr); + void setMovePrimaryInProgress(OperationContext* opCtx); /** - * Removes a movePrimary source manager from this database's sharding state. Must be called with - * with the database lock in X mode. May not be called if there isn't a movePrimary source - * manager installed already through a previous call to setMovePrimarySourceManager. + * Declares that the `movePrimary` operation on this database is over. This re-enables write + * operations on this database. + * + * Must be called with the database locked in IX mode. */ - void clearMovePrimarySourceManager(OperationContext* opCtx); + void unsetMovePrimaryInProgress(OperationContext* opCtx); /** * Sets the database metadata refresh future for other threads to wait on it. @@ -174,12 +175,9 @@ private: ShardingMigrationCriticalSection _critSec; - // If this database is serving as a source shard for a movePrimary, the source manager will be - // non-null. To write this value, there needs to be X-lock on the database in order to - // synchronize with other callers which will read the source manager. - // - // NOTE: The source manager is not owned by this class. - MovePrimarySourceManager* _sourceMgr{nullptr}; + // Is `true` when this database is serving as a source shard for a movePrimary, `false` + // otherwise. + bool _movePrimaryInProgress{false}; // Tracks the ongoing database metadata refresh. Possibly keeps a future for other threads to // wait on it, and a cancellation source to cancel the ongoing database metadata refresh. diff --git a/src/mongo/db/s/move_primary_coordinator.cpp b/src/mongo/db/s/move_primary_coordinator.cpp index 3cf9b179d19..a9e3dd1a6e5 100644 --- a/src/mongo/db/s/move_primary_coordinator.cpp +++ b/src/mongo/db/s/move_primary_coordinator.cpp @@ -1,5 +1,5 @@ /** - * Copyright (C) 2021-present MongoDB, Inc. + * Copyright (C) 2022-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -29,14 +29,36 @@ #include "mongo/db/s/move_primary_coordinator.h" +#include "mongo/bson/util/bson_extract.h" +#include "mongo/db/catalog/database_holder.h" +#include "mongo/db/catalog_raii.h" +#include "mongo/db/s/database_sharding_state.h" +#include "mongo/db/s/sharding_logging.h" +#include "mongo/db/s/sharding_recovery_service.h" +#include "mongo/db/s/sharding_state.h" +#include "mongo/db/vector_clock_mutable.h" +#include "mongo/db/write_block_bypass.h" +#include "mongo/s/grid.h" +#include "mongo/s/request_types/move_primary_gen.h" + #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding namespace mongo { +MONGO_FAIL_POINT_DEFINE(hangBeforeCloningCatalogData); +MONGO_FAIL_POINT_DEFINE(hangBeforeCleaningStaleData); + MovePrimaryCoordinator::MovePrimaryCoordinator(ShardingDDLCoordinatorService* service, const BSONObj& initialState) : RecoverableShardingDDLCoordinator(service, "MovePrimaryCoordinator", initialState), - _dbName(nss().dbName()) {} + _dbName(nss().dbName()), + _csReason([&] { + BSONObjBuilder builder; + builder.append("command", "movePrimary"); + builder.append("db", _dbName.toString()); + builder.append("to", _doc.getToShardId()); + return builder.obj(); + }()) {} bool MovePrimaryCoordinator::canAlwaysStartWhenUserWritesAreDisabled() const { return true; @@ -66,45 +88,364 @@ ExecutorFuture<void> MovePrimaryCoordinator::_runImpl( std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept { return ExecutorFuture<void>(**executor) - .then(_buildPhaseHandler(Phase::kCheckPreconditions, - [this, anchor = shared_from_this()] { - auto opCtxHolder = cc().makeOperationContext(); - auto* opCtx = opCtxHolder.get(); - getForwardableOpMetadata().setOn(opCtx); - })) - .then(_buildPhaseHandler(Phase::kCloneCatalogData, + .then([this, executor, token, anchor = shared_from_this()] { + const auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + getForwardableOpMetadata().setOn(opCtx); + + if (_doc.getToShardId() == ShardingState::get(opCtx)->shardId()) { + LOGV2(7120200, + "Database already on requested primary shard", + "db"_attr = _dbName, + "to"_attr = _doc.getToShardId()); + + return ExecutorFuture<void>(**executor); + } + + return runMovePrimaryWorkflow(executor, token); + }); +} + +ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& token) noexcept { + return ExecutorFuture<void>(**executor) + .then(_buildPhaseHandler( + Phase::kClone, + [this, anchor = shared_from_this()] { + const auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + getForwardableOpMetadata().setOn(opCtx); + + LOGV2(7120201, + "Running movePrimary operation", + "db"_attr = _dbName, + "to"_attr = _doc.getToShardId()); + + logChange(opCtx, "start"); + + ScopeGuard unblockWritesLegacyOnExit([&] { + // TODO (SERVER-71444): Fix to be interruptible or document exception. + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); // NOLINT + unblockWritesLegacy(opCtx); + }); + + if (!_firstExecution) { + uasserted( + 7120202, + "movePrimary operation on database {} failed cloning data to shard {}"_format( + _dbName.toString(), _doc.getToShardId().toString())); + } + + blockWritesLegacy(opCtx); + + if (MONGO_unlikely(hangBeforeCloningCatalogData.shouldFail())) { + LOGV2(7120203, "Hit hangBeforeCloningCatalogData"); + hangBeforeCloningCatalogData.pauseWhileSet(opCtx); + } + + _doc.setCollectionsToClone(getUnshardedCollections(opCtx)); + _updateStateDocument(opCtx, StateDoc(_doc)); + + const auto cloneResponse = cloneDataToRecipient(opCtx); + const auto cloneStatus = Shard::CommandResponse::getEffectiveStatus(cloneResponse); + if (!cloneStatus.isOK() || !checkClonedData(cloneResponse.getValue())) { + uasserted( + cloneStatus.isOK() ? 7120204 : cloneStatus.code(), + "movePrimary operation on database {} failed cloning data to shard {}"_format( + _dbName.toString(), _doc.getToShardId().toString())); + } + + // TODO (SERVER-71566): Temporary solution to cover the case of stepping down before + // actually entering the `kCatchup` phase. + blockWrites(opCtx); + })) + .then(_buildPhaseHandler(Phase::kCatchup, [this, anchor = shared_from_this()] { - auto opCtxHolder = cc().makeOperationContext(); + const auto opCtxHolder = cc().makeOperationContext(); auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); + + blockWrites(opCtx); })) .then(_buildPhaseHandler(Phase::kEnterCriticalSection, [this, anchor = shared_from_this()] { - auto opCtxHolder = cc().makeOperationContext(); + const auto opCtxHolder = cc().makeOperationContext(); auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); + + blockReads(opCtx); })) - .then(_buildPhaseHandler(Phase::kCommitMetadataChanges, + .then(_buildPhaseHandler( + Phase::kCommit, + [this, anchor = shared_from_this()] { + const auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + getForwardableOpMetadata().setOn(opCtx); + + invariant(_doc.getDatabaseVersion()); + const auto& preCommitDbVersion = *_doc.getDatabaseVersion(); + + const auto commitResponse = commitMetadataToConfig(opCtx, preCommitDbVersion); + if (commitResponse == ErrorCodes::ShardNotFound) { + unblockReadsAndWrites(opCtx); + } + uassertStatusOKWithContext( + Shard::CommandResponse::getEffectiveStatus(commitResponse), + "movePrimary operation on database {} failed to commit metadata changes"_format( + _dbName.toString())); + + assertChangedMetadataOnConfig(opCtx, preCommitDbVersion); + + // Checkpoint the vector clock to ensure causality in the event of a crash or + // shutdown. + VectorClockMutable::get(opCtx)->waitForDurableConfigTime().get(opCtx); + + clearDbMetadataOnPrimary(opCtx); + + logChange(opCtx, "commit"); + })) + .then(_buildPhaseHandler(Phase::kExitCriticalSection, [this, anchor = shared_from_this()] { - auto opCtxHolder = cc().makeOperationContext(); + const auto opCtxHolder = cc().makeOperationContext(); auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); + + unblockReadsAndWrites(opCtx); })) - .then(_buildPhaseHandler(Phase::kCleanStaleData, + .then(_buildPhaseHandler(Phase::kClean, [this, anchor = shared_from_this()] { - auto opCtxHolder = cc().makeOperationContext(); + const auto opCtxHolder = cc().makeOperationContext(); auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); + + if (MONGO_unlikely(hangBeforeCleaningStaleData.shouldFail())) { + LOGV2(7120205, "Hit hangBeforeCleaningStaleData"); + hangBeforeCleaningStaleData.pauseWhileSet(opCtx); + } + + dropStaleDataOnDonor(opCtx); + + LOGV2(7120206, + "Completed movePrimary operation", + "db"_attr = _dbName, + "to"_attr = _doc.getToShardId()); + + logChange(opCtx, "end"); })) .onError([this, anchor = shared_from_this()](const Status& status) { - LOGV2_ERROR(7120000, - "Error running movePrimary", - "database"_attr = nss(), + const auto opCtxHolder = cc().makeOperationContext(); + auto* opCtx = opCtxHolder.get(); + getForwardableOpMetadata().setOn(opCtx); + + LOGV2_ERROR(7120207, + "Failed movePrimary operation", + "db"_attr = _dbName, "to"_attr = _doc.getToShardId(), "error"_attr = redact(status)); + logChange(opCtx, "error"); + return status; }); } +void MovePrimaryCoordinator::logChange(OperationContext* opCtx, const std::string& what) const { + BSONObjBuilder details; + details.append("from", ShardingState::get(opCtx)->shardId()); + details.append("to", _doc.getToShardId()); + ShardingLogging::get(opCtx)->logChange( + opCtx, "movePrimary.{}"_format(what), _dbName.toString(), details.obj()); +} + +std::vector<NamespaceString> MovePrimaryCoordinator::getUnshardedCollections( + OperationContext* opCtx) { + const auto allCollections = [&] { + DBDirectClient dbClient(opCtx); + const auto collInfos = dbClient.getCollectionInfos(_dbName, + BSON("type" + << "collection")); + + std::vector<NamespaceString> colls; + for (const auto& collInfo : collInfos) { + std::string collName; + uassertStatusOK(bsonExtractStringField(collInfo, "name", &collName)); + + const NamespaceString nss(_dbName, collName); + if (!nss.isSystem() || + nss.isLegalClientSystemNS(serverGlobalParams.featureCompatibility)) { + colls.push_back(nss); + } + } + + std::sort(colls.begin(), colls.end()); + return colls; + }(); + + const auto shardedCollections = [&] { + auto colls = Grid::get(opCtx)->catalogClient()->getAllShardedCollectionsForDb( + opCtx, _dbName.toString(), repl::ReadConcernLevel::kMajorityReadConcern); + + std::sort(colls.begin(), colls.end()); + return colls; + }(); + + std::vector<NamespaceString> unshardedCollections; + std::set_difference(allCollections.cbegin(), + allCollections.cend(), + shardedCollections.cbegin(), + shardedCollections.cend(), + std::back_inserter(unshardedCollections)); + + return unshardedCollections; +} + +StatusWith<Shard::CommandResponse> MovePrimaryCoordinator::cloneDataToRecipient( + OperationContext* opCtx) const { + // Enable write blocking bypass to allow cloning of catalog data even if writes are disallowed. + WriteBlockBypass::get(opCtx).set(true); + + const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); + const auto fromShard = + uassertStatusOK(shardRegistry->getShard(opCtx, ShardingState::get(opCtx)->shardId())); + const auto toShard = uassertStatusOK(shardRegistry->getShard(opCtx, _doc.getToShardId())); + + const auto cloneCommand = [&] { + BSONObjBuilder commandBuilder; + commandBuilder.append("_shardsvrCloneCatalogData", _dbName.toString()); + commandBuilder.append("from", fromShard->getConnString().toString()); + return commandBuilder.obj(); + }(); + + return toShard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + NamespaceString::kAdminDb.toString(), + CommandHelpers::appendMajorityWriteConcern(cloneCommand), + Shard::RetryPolicy::kNotIdempotent); +} + +bool MovePrimaryCoordinator::checkClonedData(Shard::CommandResponse cloneResponse) const { + invariant(_doc.getCollectionsToClone()); + const auto& collectionToClone = *_doc.getCollectionsToClone(); + + const auto clonedCollections = [&] { + std::vector<NamespaceString> colls; + for (const auto& bsonElem : cloneResponse.response["clonedColls"].Obj()) { + if (bsonElem.type() == String) { + colls.push_back(NamespaceString(bsonElem.String())); + } + } + + std::sort(colls.begin(), colls.end()); + return colls; + }(); + + return collectionToClone.size() == clonedCollections.size() && + std::equal( + collectionToClone.cbegin(), collectionToClone.cend(), clonedCollections.cbegin()); +} + +StatusWith<Shard::CommandResponse> MovePrimaryCoordinator::commitMetadataToConfig( + OperationContext* opCtx, const DatabaseVersion& preCommitDbVersion) const { + const auto commitCommand = [&] { + ConfigsvrCommitMovePrimary commitRequest(_dbName, preCommitDbVersion, _doc.getToShardId()); + commitRequest.setDbName(NamespaceString::kAdminDb); + return commitRequest.toBSON({}); + }(); + + const auto config = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + return config->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + NamespaceString::kAdminDb.toString(), + CommandHelpers::appendMajorityWriteConcern(commitCommand), + Shard::RetryPolicy::kIdempotent); +} + +void MovePrimaryCoordinator::assertChangedMetadataOnConfig( + OperationContext* opCtx, const DatabaseVersion& preCommitDbVersion) const { + const auto postCommitDbType = [&]() { + const auto config = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + auto findResponse = uassertStatusOK( + config->exhaustiveFindOnConfig(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString::kConfigDatabasesNamespace, + BSON(DatabaseType::kNameFieldName << _dbName.toString()), + BSONObj(), + 1)); + + const auto databases = std::move(findResponse.docs); + uassert(ErrorCodes::IncompatibleShardingMetadata, + "Tried to find version for database {}, but found no databases"_format( + _dbName.toString()), + !databases.empty()); + + return DatabaseType::parse(IDLParserContext("DatabaseType"), databases.front()); + }(); + tassert(7120208, + "Error committing movePrimary: database version went backwards", + postCommitDbType.getVersion() > preCommitDbVersion); + uassert(7120209, + "Error committing movePrimary: update of config.databases failed", + postCommitDbType.getPrimary() != ShardingState::get(opCtx)->shardId()); +} + +void MovePrimaryCoordinator::clearDbMetadataOnPrimary(OperationContext* opCtx) const { + AutoGetDb autoDb(opCtx, _dbName, MODE_X); + DatabaseHolder::get(opCtx)->clearDbInfo(opCtx, _dbName); +} + +void MovePrimaryCoordinator::dropStaleDataOnDonor(OperationContext* opCtx) const { + // Enable write blocking bypass to allow cleaning of stale data even if writes are disallowed. + WriteBlockBypass::get(opCtx).set(true); + + DBDirectClient dbClient(opCtx); + invariant(_doc.getCollectionsToClone()); + for (const auto& nss : *_doc.getCollectionsToClone()) { + const auto dropStatus = [&] { + BSONObj dropResult; + dbClient.runCommand(_dbName.toString(), BSON("drop" << nss.coll()), dropResult); + return getStatusFromCommandResult(dropResult); + }(); + + if (!dropStatus.isOK()) { + LOGV2_WARNING(7120210, + "Failed to drop stale collection on donor", + "namespace"_attr = nss, + "error"_attr = redact(dropStatus)); + } + } +} + +void MovePrimaryCoordinator::blockWritesLegacy(OperationContext* opCtx) const { + AutoGetDb autoDb(opCtx, _dbName, MODE_X); + auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire( + opCtx, _dbName, DSSAcquisitionMode::kExclusive); + scopedDss->setMovePrimaryInProgress(opCtx); +} + +void MovePrimaryCoordinator::unblockWritesLegacy(OperationContext* opCtx) const { + AutoGetDb autoDb(opCtx, _dbName, MODE_IX); + auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire( + opCtx, _dbName, DSSAcquisitionMode::kExclusive); + scopedDss->unsetMovePrimaryInProgress(opCtx); +} + +void MovePrimaryCoordinator::blockWrites(OperationContext* opCtx) const { + ShardingRecoveryService::get(opCtx)->acquireRecoverableCriticalSectionBlockWrites( + opCtx, NamespaceString(_dbName), _csReason, ShardingCatalogClient::kLocalWriteConcern); +} + +void MovePrimaryCoordinator::blockReads(OperationContext* opCtx) const { + ShardingRecoveryService::get(opCtx)->promoteRecoverableCriticalSectionToBlockAlsoReads( + opCtx, NamespaceString(_dbName), _csReason, ShardingCatalogClient::kLocalWriteConcern); +} + +void MovePrimaryCoordinator::unblockReadsAndWrites(OperationContext* opCtx) const { + ShardingRecoveryService::get(opCtx)->releaseRecoverableCriticalSection( + opCtx, NamespaceString(_dbName), _csReason, ShardingCatalogClient::kLocalWriteConcern); +} + } // namespace mongo diff --git a/src/mongo/db/s/move_primary_coordinator.h b/src/mongo/db/s/move_primary_coordinator.h index f0e00e3ad91..944799689e4 100644 --- a/src/mongo/db/s/move_primary_coordinator.h +++ b/src/mongo/db/s/move_primary_coordinator.h @@ -1,5 +1,5 @@ /** - * Copyright (C) 2021-present MongoDB, Inc. + * Copyright (C) 2022-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -31,6 +31,7 @@ #include "mongo/db/s/move_primary_coordinator_document_gen.h" #include "mongo/db/s/sharding_ddl_coordinator.h" +#include "mongo/s/client/shard.h" namespace mongo { @@ -42,7 +43,7 @@ public: using Phase = MovePrimaryCoordinatorPhaseEnum; MovePrimaryCoordinator(ShardingDDLCoordinatorService* service, const BSONObj& initialState); - ~MovePrimaryCoordinator() = default; + virtual ~MovePrimaryCoordinator() = default; void checkIfOptionsConflict(const BSONObj& doc) const override; bool canAlwaysStartWhenUserWritesAreDisabled() const override; @@ -53,7 +54,101 @@ private: ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) noexcept override; + ExecutorFuture<void> runMovePrimaryWorkflow( + std::shared_ptr<executor::ScopedTaskExecutor> executor, + const CancellationToken& token) noexcept; + + /** + * Logs in the `config.changelog` collection a specific event for `movePrimary` operations. + */ + void logChange(OperationContext* opCtx, const std::string& what) const; + + /** + * Returns the list of unsharded collections for the given database. These are the collections + * the recipient is expected to clone. + */ + std::vector<NamespaceString> getUnshardedCollections(OperationContext* opCtx); + + /** + * Requests to the recipient to clone all the collections of the given database currently owned + * by this shard. Once the cloning is complete, the recipient returns the list of the actually + * cloned collections as part of the response. + */ + StatusWith<Shard::CommandResponse> cloneDataToRecipient(OperationContext* opCtx) const; + + /** + * Returns `true` whether the list of actually cloned collections (returned by the cloning + * command response) matches the list of collection to clone (persisted in the coordinator + * document), `false` otherwise. + */ + bool checkClonedData(Shard::CommandResponse cloneResponse) const; + + /** + * Commits the new primary shard for the given database to the config server. The database + * version is passed to the config server's command as an idempotency key. + */ + StatusWith<Shard::CommandResponse> commitMetadataToConfig( + OperationContext* opCtx, const DatabaseVersion& preCommitDbVersion) const; + + /** + * Ensures that the metadata changes have been actually commited on the config server, asserting + * otherwise. This is a pedantic check to rule out any potentially disastrous problems. + */ + void assertChangedMetadataOnConfig(OperationContext* opCtx, + const DatabaseVersion& preCommitDbVersion) const; + + /** + * Clears the database metadata in the local catalog cache. Secondary nodes clear the database + * metadata as a result of exiting the critical section of the primary node + * (`kExitCriticalSection` phase). + */ + void clearDbMetadataOnPrimary(OperationContext* opCtx) const; + + /** + * Drops stale collections on the donor. + */ + void dropStaleDataOnDonor(OperationContext* opCtx) const; + + /** + * Blocks write operations on the database, causing them to fail with the + * `MovePrimaryInProgress` error. + * + * TODO (SERVER-71566): This is a synchronization mechanism specifically designed for + * `movePrimary` operations. It will likely be replaced by the critical section once the time + * frame in which writes are blocked is reduced. Writes are already blocked in the `kCatchup` + * phase. + */ + void blockWritesLegacy(OperationContext* opCtx) const; + + /** + * Unblocks write operations on the database. + * + * TODO (SERVER-71566): This is a synchronization mechanism specifically designed for + * `movePrimary` operations. It will likely be replaced by the critical section once the time + * frame in which writes are blocked is reduced. Reads and writes are already unblocked in the + // `kExitCriticalSection` phase. + */ + void unblockWritesLegacy(OperationContext* opCtx) const; + + /** + * Blocks write operations on the database, causing them to wait until the critical section has + * entered. + */ + void blockWrites(OperationContext* opCtx) const; + + /** + * Blocks read operations on the database, causing them to wait until the critical section has + * entered. + */ + void blockReads(OperationContext* opCtx) const; + + /** + * Unblocks read and write operations on the database. + */ + void unblockReadsAndWrites(OperationContext* opCtx) const; + const DatabaseName _dbName; + const BSONObj _csReason; }; } // namespace mongo diff --git a/src/mongo/db/s/move_primary_coordinator_document.idl b/src/mongo/db/s/move_primary_coordinator_document.idl index d1201309a1b..4fddbf862a4 100644 --- a/src/mongo/db/s/move_primary_coordinator_document.idl +++ b/src/mongo/db/s/move_primary_coordinator_document.idl @@ -42,11 +42,12 @@ enums: type: string values: kUnset: "unset" - kCheckPreconditions: "checkPreconditions" - kCloneCatalogData: "cloneCatalogData" + kClone: "clone" + kCatchup: "catchup" kEnterCriticalSection: "enterCriticalSection" - kCommitMetadataChanges: "commitMetadataChanges" - kCleanStaleData: "cleanStaleData" + kCommit: "commit" + kExitCriticalSection: "exitCriticalSection" + kClean: "clean" structs: MovePrimaryCoordinatorDocument: @@ -63,3 +64,7 @@ structs: toShardId: type: shard_id description: "Destination shard of the database." + collectionsToClone: + type: array<namespacestring> + description: "List of collections to be cloned by the destination shard." + optional: true diff --git a/src/mongo/db/s/move_primary_source_manager.cpp b/src/mongo/db/s/move_primary_source_manager.cpp index c4df7775f6a..2739c02ae8f 100644 --- a/src/mongo/db/s/move_primary_source_manager.cpp +++ b/src/mongo/db/s/move_primary_source_manager.cpp @@ -108,7 +108,7 @@ Status MovePrimarySourceManager::clone(OperationContext* opCtx) { auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire( opCtx, getNss().dbName(), DSSAcquisitionMode::kExclusive); - scopedDss->setMovePrimarySourceManager(opCtx, this); + scopedDss->setMovePrimaryInProgress(opCtx); } _state = kCloning; @@ -511,7 +511,7 @@ void MovePrimarySourceManager::_cleanup(OperationContext* opCtx) { auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire( opCtx, getNss().dbName(), DSSAcquisitionMode::kExclusive); - scopedDss->clearMovePrimarySourceManager(opCtx); + scopedDss->unsetMovePrimaryInProgress(opCtx); DatabaseHolder::get(opCtx)->clearDbInfo(opCtx, getNss().dbName()); // Leave the critical section if we're still registered. diff --git a/src/mongo/db/s/move_primary_source_manager.h b/src/mongo/db/s/move_primary_source_manager.h index 09bd637e0ee..c4de0f2254f 100644 --- a/src/mongo/db/s/move_primary_source_manager.h +++ b/src/mongo/db/s/move_primary_source_manager.h @@ -60,6 +60,8 @@ class Status; * * At any point in time it is safe to let the MovePrimarySourceManager object go out of scope in * which case the destructor will take care of clean up based on how far we have advanced. + * + * TODO (SERVER-71309): Remove once 7.0 becomes last LTS. */ class MovePrimarySourceManager { MovePrimarySourceManager(const MovePrimarySourceManager&) = delete; diff --git a/src/mongo/db/s/shardsvr_move_primary_command.cpp b/src/mongo/db/s/shardsvr_move_primary_command.cpp index 324cd8c48fb..55170519081 100644 --- a/src/mongo/db/s/shardsvr_move_primary_command.cpp +++ b/src/mongo/db/s/shardsvr_move_primary_command.cpp @@ -97,7 +97,7 @@ public: const auto dbName = parseNs({boost::none, ""}, cmdObj).dbName(); const NamespaceString dbNss(dbName); - const auto toShard = movePrimaryRequest.getTo(); + const auto toShardArg = movePrimaryRequest.getTo(); uassert( ErrorCodes::InvalidNamespace, @@ -110,7 +110,7 @@ public: uassert(ErrorCodes::InvalidOptions, str::stream() << "you have to specify where you want to move it", - !toShard.empty()); + !toShardArg.empty()); CommandHelpers::uassertCommandRunWithMajority(getName(), opCtx->getWriteConcern()); @@ -127,7 +127,7 @@ public: MovePrimaryCoordinatorDocument doc; doc.setShardingDDLCoordinatorMetadata( {{dbNss, DDLCoordinatorTypeEnum::kMovePrimaryNoResilient}}); - doc.setToShardId(toShard.toString()); + doc.setToShardId(toShardArg.toString()); return doc.toBSON(); }(); @@ -140,11 +140,19 @@ public: return coordinator->getCompletionFuture(); } + auto shardRegistry = Grid::get(opCtx)->shardRegistry(); + // Ensure that the shard information is up-to-date as possible to catch the case where + // a shard with the same name, but with a different host, has been removed/re-added. + shardRegistry->reload(opCtx); + const auto toShard = uassertStatusOKWithContext( + shardRegistry->getShard(opCtx, toShardArg.toString()), + "Requested primary shard {} does not exist"_format(toShardArg)); + const auto coordinatorDoc = [&] { MovePrimaryCoordinatorDocument doc; doc.setShardingDDLCoordinatorMetadata( {{dbNss, DDLCoordinatorTypeEnum::kMovePrimary}}); - doc.setToShardId(toShard.toString()); + doc.setToShardId(toShard->getId()); return doc.toBSON(); }(); diff --git a/src/mongo/s/sharding_feature_flags.idl b/src/mongo/s/sharding_feature_flags.idl index 146fa60fcac..3af51ae0393 100644 --- a/src/mongo/s/sharding_feature_flags.idl +++ b/src/mongo/s/sharding_feature_flags.idl @@ -85,7 +85,7 @@ feature_flags: # TODO (SERVER-71309): Remove once 7.0 becomes last LTS. featureFlagResilientMovePrimary: description: "Enable the resilient coordinator for the movePrimary command in order to improve - the tolerance in case of a failure on donor and recipient nodes" + the tolerance in case of a failure on donor and recipient nodes" cpp_varname: feature_flags::gResilientMovePrimary default: false featureFlagConfigSettingsSchema: |