summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAntonio Fuschetto <antonio.fuschetto@mongodb.com>2022-12-20 15:04:40 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-12-20 16:06:19 +0000
commit4dccf13844ceb965fd9c141758d81f5618693c30 (patch)
treecfa0c460ecea8bb934f3189509b6186e757452f9
parentf7313bb1cf96e3e542ac4c8ad9f932ec68164b84 (diff)
downloadmongo-4dccf13844ceb965fd9c141758d81f5618693c30.tar.gz
SERVER-71202 Adapt existing movePrimary logic to new DDL coordinator
-rw-r--r--buildscripts/resmokeconfig/fully_disabled_feature_flags.yml3
-rw-r--r--jstests/sharding/move_primary_with_writes.js18
-rw-r--r--src/mongo/db/s/database_sharding_state.cpp12
-rw-r--r--src/mongo/db/s/database_sharding_state.h34
-rw-r--r--src/mongo/db/s/move_primary_coordinator.cpp377
-rw-r--r--src/mongo/db/s/move_primary_coordinator.h99
-rw-r--r--src/mongo/db/s/move_primary_coordinator_document.idl13
-rw-r--r--src/mongo/db/s/move_primary_source_manager.cpp4
-rw-r--r--src/mongo/db/s/move_primary_source_manager.h2
-rw-r--r--src/mongo/db/s/shardsvr_move_primary_command.cpp16
-rw-r--r--src/mongo/s/sharding_feature_flags.idl2
11 files changed, 516 insertions, 64 deletions
diff --git a/buildscripts/resmokeconfig/fully_disabled_feature_flags.yml b/buildscripts/resmokeconfig/fully_disabled_feature_flags.yml
index da4801f2b2a..92bc3b3e815 100644
--- a/buildscripts/resmokeconfig/fully_disabled_feature_flags.yml
+++ b/buildscripts/resmokeconfig/fully_disabled_feature_flags.yml
@@ -9,8 +9,5 @@
# released create the transactions collection index and is only meant to be enabled adhoc, so only
# its targeted tests should enable it.
- featureFlagAlwaysCreateConfigTransactionsPartialIndexOnStepUp
-# Disable the new logic for the resilient movePrimary for non-dedicated tests until the logic is
-# mature enough to be enabled for all tests.
-- featureFlagResilientMovePrimary
# Disable the feature flag for catalog shard until most of the codebase can run in this mode.
- featureFlagCatalogShard
diff --git a/jstests/sharding/move_primary_with_writes.js b/jstests/sharding/move_primary_with_writes.js
index 9de2338a31a..9b28a7838bb 100644
--- a/jstests/sharding/move_primary_with_writes.js
+++ b/jstests/sharding/move_primary_with_writes.js
@@ -5,6 +5,7 @@
'use strict';
load('jstests/libs/fail_point_util.js');
+load("jstests/libs/feature_flag_util.js");
let st = new ShardingTest({
mongos: 2,
@@ -285,11 +286,20 @@ st.forEachConnection(shard => {
}
});
+let cloningCatalogDataFPName = "hangBeforeCloningCatalogData";
+let cleaningStaleDataFPName = "hangBeforeCleaningStaleData";
+
+// TODO (SERVER-71309): Remove once 7.0 becomes last LTS.
+if (!FeatureFlagUtil.isEnabled(st.configRS.getPrimary().getDB('admin'), "ResilientMovePrimary")) {
+ cloningCatalogDataFPName = "hangInCloneStage";
+ cleaningStaleDataFPName = "hangInCleanStaleDataStage";
+}
+
createCollections();
let fromShard = st.getPrimaryShard(dbName);
let toShard = st.getOther(fromShard);
-testMovePrimary('hangInCloneStage', fromShard, toShard, st.s.getDB(dbName), true, false);
+testMovePrimary(cloningCatalogDataFPName, fromShard, toShard, st.s.getDB(dbName), true, false);
verifyDocuments(toShard.getDB(dbName), 3);
verifyDocuments(fromShard.getDB(dbName), 0);
@@ -297,19 +307,19 @@ createCollections();
fromShard = st.getPrimaryShard(dbName);
toShard = st.getOther(fromShard);
-testMovePrimary('hangInCloneStage', fromShard, toShard, st.s.getDB(dbName), false, true);
+testMovePrimary(cloningCatalogDataFPName, fromShard, toShard, st.s.getDB(dbName), false, true);
verifyDocuments(toShard.getDB(dbName), 3);
verifyDocuments(fromShard.getDB(dbName), 0);
createCollections();
fromShard = st.getPrimaryShard(dbName);
toShard = st.getOther(fromShard);
-testMovePrimaryDDL('hangInCloneStage', fromShard, toShard, st.s.getDB("admin"), false, true);
+testMovePrimaryDDL(cloningCatalogDataFPName, fromShard, toShard, st.s.getDB("admin"), false, true);
createCollections();
fromShard = st.getPrimaryShard(dbName);
toShard = st.getOther(fromShard);
-testMovePrimary('hangInCleanStaleDataStage', fromShard, toShard, st.s.getDB(dbName), false, false);
+testMovePrimary(cleaningStaleDataFPName, fromShard, toShard, st.s.getDB(dbName), false, false);
overrideDDLLockTimeoutFPs.forEach(fp => fp.off());
diff --git a/src/mongo/db/s/database_sharding_state.cpp b/src/mongo/db/s/database_sharding_state.cpp
index f3c957e97a0..b55614fc6ff 100644
--- a/src/mongo/db/s/database_sharding_state.cpp
+++ b/src/mongo/db/s/database_sharding_state.cpp
@@ -156,18 +156,14 @@ void DatabaseShardingState::exitCriticalSectionNoChecks(OperationContext* opCtx)
_critSec.exitCriticalSectionNoChecks();
}
-void DatabaseShardingState::setMovePrimarySourceManager(OperationContext* opCtx,
- MovePrimarySourceManager* sourceMgr) {
+void DatabaseShardingState::setMovePrimaryInProgress(OperationContext* opCtx) {
invariant(opCtx->lockState()->isDbLockedForMode(_dbName, MODE_X));
- invariant(sourceMgr);
- invariant(!_sourceMgr);
-
- _sourceMgr = sourceMgr;
+ _movePrimaryInProgress = true;
}
-void DatabaseShardingState::clearMovePrimarySourceManager(OperationContext* opCtx) {
+void DatabaseShardingState::unsetMovePrimaryInProgress(OperationContext* opCtx) {
invariant(opCtx->lockState()->isDbLockedForMode(_dbName, MODE_IX));
- _sourceMgr = nullptr;
+ _movePrimaryInProgress = false;
}
void DatabaseShardingState::setDbMetadataRefreshFuture(SharedSemiFuture<void> future,
diff --git a/src/mongo/db/s/database_sharding_state.h b/src/mongo/db/s/database_sharding_state.h
index 344635dd0e0..c079c23a619 100644
--- a/src/mongo/db/s/database_sharding_state.h
+++ b/src/mongo/db/s/database_sharding_state.h
@@ -35,8 +35,6 @@
namespace mongo {
-class MovePrimarySourceManager;
-
enum class DSSAcquisitionMode { kShared, kExclusive };
/**
@@ -112,25 +110,28 @@ public:
}
/**
- * Returns the active movePrimary source manager, if one is available.
+ * Returns `true` whether a `movePrimary` operation on this database is in progress, `false`
+ * otherwise.
*/
bool isMovePrimaryInProgress() const {
- return _sourceMgr;
+ return _movePrimaryInProgress;
}
/**
- * Attaches a movePrimary source manager to this database's sharding state. Must be called with
- * the database lock in X mode. May not be called if there is a movePrimary source manager
- * already installed. Must be followed by a call to clearMovePrimarySourceManager.
+ * Declares that a `movePrimary` operation on this database is in progress. This causes write
+ * operations on this database to fail with the `MovePrimaryInProgress` error.
+ *
+ * Must be called with the database locked in X mode.
*/
- void setMovePrimarySourceManager(OperationContext* opCtx, MovePrimarySourceManager* sourceMgr);
+ void setMovePrimaryInProgress(OperationContext* opCtx);
/**
- * Removes a movePrimary source manager from this database's sharding state. Must be called with
- * with the database lock in X mode. May not be called if there isn't a movePrimary source
- * manager installed already through a previous call to setMovePrimarySourceManager.
+ * Declares that the `movePrimary` operation on this database is over. This re-enables write
+ * operations on this database.
+ *
+ * Must be called with the database locked in IX mode.
*/
- void clearMovePrimarySourceManager(OperationContext* opCtx);
+ void unsetMovePrimaryInProgress(OperationContext* opCtx);
/**
* Sets the database metadata refresh future for other threads to wait on it.
@@ -174,12 +175,9 @@ private:
ShardingMigrationCriticalSection _critSec;
- // If this database is serving as a source shard for a movePrimary, the source manager will be
- // non-null. To write this value, there needs to be X-lock on the database in order to
- // synchronize with other callers which will read the source manager.
- //
- // NOTE: The source manager is not owned by this class.
- MovePrimarySourceManager* _sourceMgr{nullptr};
+ // Is `true` when this database is serving as a source shard for a movePrimary, `false`
+ // otherwise.
+ bool _movePrimaryInProgress{false};
// Tracks the ongoing database metadata refresh. Possibly keeps a future for other threads to
// wait on it, and a cancellation source to cancel the ongoing database metadata refresh.
diff --git a/src/mongo/db/s/move_primary_coordinator.cpp b/src/mongo/db/s/move_primary_coordinator.cpp
index 3cf9b179d19..a9e3dd1a6e5 100644
--- a/src/mongo/db/s/move_primary_coordinator.cpp
+++ b/src/mongo/db/s/move_primary_coordinator.cpp
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2021-present MongoDB, Inc.
+ * Copyright (C) 2022-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -29,14 +29,36 @@
#include "mongo/db/s/move_primary_coordinator.h"
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/db/catalog/database_holder.h"
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/s/database_sharding_state.h"
+#include "mongo/db/s/sharding_logging.h"
+#include "mongo/db/s/sharding_recovery_service.h"
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/vector_clock_mutable.h"
+#include "mongo/db/write_block_bypass.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/request_types/move_primary_gen.h"
+
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
namespace mongo {
+MONGO_FAIL_POINT_DEFINE(hangBeforeCloningCatalogData);
+MONGO_FAIL_POINT_DEFINE(hangBeforeCleaningStaleData);
+
MovePrimaryCoordinator::MovePrimaryCoordinator(ShardingDDLCoordinatorService* service,
const BSONObj& initialState)
: RecoverableShardingDDLCoordinator(service, "MovePrimaryCoordinator", initialState),
- _dbName(nss().dbName()) {}
+ _dbName(nss().dbName()),
+ _csReason([&] {
+ BSONObjBuilder builder;
+ builder.append("command", "movePrimary");
+ builder.append("db", _dbName.toString());
+ builder.append("to", _doc.getToShardId());
+ return builder.obj();
+ }()) {}
bool MovePrimaryCoordinator::canAlwaysStartWhenUserWritesAreDisabled() const {
return true;
@@ -66,45 +88,364 @@ ExecutorFuture<void> MovePrimaryCoordinator::_runImpl(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept {
return ExecutorFuture<void>(**executor)
- .then(_buildPhaseHandler(Phase::kCheckPreconditions,
- [this, anchor = shared_from_this()] {
- auto opCtxHolder = cc().makeOperationContext();
- auto* opCtx = opCtxHolder.get();
- getForwardableOpMetadata().setOn(opCtx);
- }))
- .then(_buildPhaseHandler(Phase::kCloneCatalogData,
+ .then([this, executor, token, anchor = shared_from_this()] {
+ const auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ getForwardableOpMetadata().setOn(opCtx);
+
+ if (_doc.getToShardId() == ShardingState::get(opCtx)->shardId()) {
+ LOGV2(7120200,
+ "Database already on requested primary shard",
+ "db"_attr = _dbName,
+ "to"_attr = _doc.getToShardId());
+
+ return ExecutorFuture<void>(**executor);
+ }
+
+ return runMovePrimaryWorkflow(executor, token);
+ });
+}
+
+ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& token) noexcept {
+ return ExecutorFuture<void>(**executor)
+ .then(_buildPhaseHandler(
+ Phase::kClone,
+ [this, anchor = shared_from_this()] {
+ const auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ getForwardableOpMetadata().setOn(opCtx);
+
+ LOGV2(7120201,
+ "Running movePrimary operation",
+ "db"_attr = _dbName,
+ "to"_attr = _doc.getToShardId());
+
+ logChange(opCtx, "start");
+
+ ScopeGuard unblockWritesLegacyOnExit([&] {
+ // TODO (SERVER-71444): Fix to be interruptible or document exception.
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState()); // NOLINT
+ unblockWritesLegacy(opCtx);
+ });
+
+ if (!_firstExecution) {
+ uasserted(
+ 7120202,
+ "movePrimary operation on database {} failed cloning data to shard {}"_format(
+ _dbName.toString(), _doc.getToShardId().toString()));
+ }
+
+ blockWritesLegacy(opCtx);
+
+ if (MONGO_unlikely(hangBeforeCloningCatalogData.shouldFail())) {
+ LOGV2(7120203, "Hit hangBeforeCloningCatalogData");
+ hangBeforeCloningCatalogData.pauseWhileSet(opCtx);
+ }
+
+ _doc.setCollectionsToClone(getUnshardedCollections(opCtx));
+ _updateStateDocument(opCtx, StateDoc(_doc));
+
+ const auto cloneResponse = cloneDataToRecipient(opCtx);
+ const auto cloneStatus = Shard::CommandResponse::getEffectiveStatus(cloneResponse);
+ if (!cloneStatus.isOK() || !checkClonedData(cloneResponse.getValue())) {
+ uasserted(
+ cloneStatus.isOK() ? 7120204 : cloneStatus.code(),
+ "movePrimary operation on database {} failed cloning data to shard {}"_format(
+ _dbName.toString(), _doc.getToShardId().toString()));
+ }
+
+ // TODO (SERVER-71566): Temporary solution to cover the case of stepping down before
+ // actually entering the `kCatchup` phase.
+ blockWrites(opCtx);
+ }))
+ .then(_buildPhaseHandler(Phase::kCatchup,
[this, anchor = shared_from_this()] {
- auto opCtxHolder = cc().makeOperationContext();
+ const auto opCtxHolder = cc().makeOperationContext();
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
+
+ blockWrites(opCtx);
}))
.then(_buildPhaseHandler(Phase::kEnterCriticalSection,
[this, anchor = shared_from_this()] {
- auto opCtxHolder = cc().makeOperationContext();
+ const auto opCtxHolder = cc().makeOperationContext();
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
+
+ blockReads(opCtx);
}))
- .then(_buildPhaseHandler(Phase::kCommitMetadataChanges,
+ .then(_buildPhaseHandler(
+ Phase::kCommit,
+ [this, anchor = shared_from_this()] {
+ const auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ getForwardableOpMetadata().setOn(opCtx);
+
+ invariant(_doc.getDatabaseVersion());
+ const auto& preCommitDbVersion = *_doc.getDatabaseVersion();
+
+ const auto commitResponse = commitMetadataToConfig(opCtx, preCommitDbVersion);
+ if (commitResponse == ErrorCodes::ShardNotFound) {
+ unblockReadsAndWrites(opCtx);
+ }
+ uassertStatusOKWithContext(
+ Shard::CommandResponse::getEffectiveStatus(commitResponse),
+ "movePrimary operation on database {} failed to commit metadata changes"_format(
+ _dbName.toString()));
+
+ assertChangedMetadataOnConfig(opCtx, preCommitDbVersion);
+
+ // Checkpoint the vector clock to ensure causality in the event of a crash or
+ // shutdown.
+ VectorClockMutable::get(opCtx)->waitForDurableConfigTime().get(opCtx);
+
+ clearDbMetadataOnPrimary(opCtx);
+
+ logChange(opCtx, "commit");
+ }))
+ .then(_buildPhaseHandler(Phase::kExitCriticalSection,
[this, anchor = shared_from_this()] {
- auto opCtxHolder = cc().makeOperationContext();
+ const auto opCtxHolder = cc().makeOperationContext();
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
+
+ unblockReadsAndWrites(opCtx);
}))
- .then(_buildPhaseHandler(Phase::kCleanStaleData,
+ .then(_buildPhaseHandler(Phase::kClean,
[this, anchor = shared_from_this()] {
- auto opCtxHolder = cc().makeOperationContext();
+ const auto opCtxHolder = cc().makeOperationContext();
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
+
+ if (MONGO_unlikely(hangBeforeCleaningStaleData.shouldFail())) {
+ LOGV2(7120205, "Hit hangBeforeCleaningStaleData");
+ hangBeforeCleaningStaleData.pauseWhileSet(opCtx);
+ }
+
+ dropStaleDataOnDonor(opCtx);
+
+ LOGV2(7120206,
+ "Completed movePrimary operation",
+ "db"_attr = _dbName,
+ "to"_attr = _doc.getToShardId());
+
+ logChange(opCtx, "end");
}))
.onError([this, anchor = shared_from_this()](const Status& status) {
- LOGV2_ERROR(7120000,
- "Error running movePrimary",
- "database"_attr = nss(),
+ const auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ getForwardableOpMetadata().setOn(opCtx);
+
+ LOGV2_ERROR(7120207,
+ "Failed movePrimary operation",
+ "db"_attr = _dbName,
"to"_attr = _doc.getToShardId(),
"error"_attr = redact(status));
+ logChange(opCtx, "error");
+
return status;
});
}
+void MovePrimaryCoordinator::logChange(OperationContext* opCtx, const std::string& what) const {
+ BSONObjBuilder details;
+ details.append("from", ShardingState::get(opCtx)->shardId());
+ details.append("to", _doc.getToShardId());
+ ShardingLogging::get(opCtx)->logChange(
+ opCtx, "movePrimary.{}"_format(what), _dbName.toString(), details.obj());
+}
+
+std::vector<NamespaceString> MovePrimaryCoordinator::getUnshardedCollections(
+ OperationContext* opCtx) {
+ const auto allCollections = [&] {
+ DBDirectClient dbClient(opCtx);
+ const auto collInfos = dbClient.getCollectionInfos(_dbName,
+ BSON("type"
+ << "collection"));
+
+ std::vector<NamespaceString> colls;
+ for (const auto& collInfo : collInfos) {
+ std::string collName;
+ uassertStatusOK(bsonExtractStringField(collInfo, "name", &collName));
+
+ const NamespaceString nss(_dbName, collName);
+ if (!nss.isSystem() ||
+ nss.isLegalClientSystemNS(serverGlobalParams.featureCompatibility)) {
+ colls.push_back(nss);
+ }
+ }
+
+ std::sort(colls.begin(), colls.end());
+ return colls;
+ }();
+
+ const auto shardedCollections = [&] {
+ auto colls = Grid::get(opCtx)->catalogClient()->getAllShardedCollectionsForDb(
+ opCtx, _dbName.toString(), repl::ReadConcernLevel::kMajorityReadConcern);
+
+ std::sort(colls.begin(), colls.end());
+ return colls;
+ }();
+
+ std::vector<NamespaceString> unshardedCollections;
+ std::set_difference(allCollections.cbegin(),
+ allCollections.cend(),
+ shardedCollections.cbegin(),
+ shardedCollections.cend(),
+ std::back_inserter(unshardedCollections));
+
+ return unshardedCollections;
+}
+
+StatusWith<Shard::CommandResponse> MovePrimaryCoordinator::cloneDataToRecipient(
+ OperationContext* opCtx) const {
+ // Enable write blocking bypass to allow cloning of catalog data even if writes are disallowed.
+ WriteBlockBypass::get(opCtx).set(true);
+
+ const auto shardRegistry = Grid::get(opCtx)->shardRegistry();
+ const auto fromShard =
+ uassertStatusOK(shardRegistry->getShard(opCtx, ShardingState::get(opCtx)->shardId()));
+ const auto toShard = uassertStatusOK(shardRegistry->getShard(opCtx, _doc.getToShardId()));
+
+ const auto cloneCommand = [&] {
+ BSONObjBuilder commandBuilder;
+ commandBuilder.append("_shardsvrCloneCatalogData", _dbName.toString());
+ commandBuilder.append("from", fromShard->getConnString().toString());
+ return commandBuilder.obj();
+ }();
+
+ return toShard->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ NamespaceString::kAdminDb.toString(),
+ CommandHelpers::appendMajorityWriteConcern(cloneCommand),
+ Shard::RetryPolicy::kNotIdempotent);
+}
+
+bool MovePrimaryCoordinator::checkClonedData(Shard::CommandResponse cloneResponse) const {
+ invariant(_doc.getCollectionsToClone());
+ const auto& collectionToClone = *_doc.getCollectionsToClone();
+
+ const auto clonedCollections = [&] {
+ std::vector<NamespaceString> colls;
+ for (const auto& bsonElem : cloneResponse.response["clonedColls"].Obj()) {
+ if (bsonElem.type() == String) {
+ colls.push_back(NamespaceString(bsonElem.String()));
+ }
+ }
+
+ std::sort(colls.begin(), colls.end());
+ return colls;
+ }();
+
+ return collectionToClone.size() == clonedCollections.size() &&
+ std::equal(
+ collectionToClone.cbegin(), collectionToClone.cend(), clonedCollections.cbegin());
+}
+
+StatusWith<Shard::CommandResponse> MovePrimaryCoordinator::commitMetadataToConfig(
+ OperationContext* opCtx, const DatabaseVersion& preCommitDbVersion) const {
+ const auto commitCommand = [&] {
+ ConfigsvrCommitMovePrimary commitRequest(_dbName, preCommitDbVersion, _doc.getToShardId());
+ commitRequest.setDbName(NamespaceString::kAdminDb);
+ return commitRequest.toBSON({});
+ }();
+
+ const auto config = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+ return config->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ NamespaceString::kAdminDb.toString(),
+ CommandHelpers::appendMajorityWriteConcern(commitCommand),
+ Shard::RetryPolicy::kIdempotent);
+}
+
+void MovePrimaryCoordinator::assertChangedMetadataOnConfig(
+ OperationContext* opCtx, const DatabaseVersion& preCommitDbVersion) const {
+ const auto postCommitDbType = [&]() {
+ const auto config = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+ auto findResponse = uassertStatusOK(
+ config->exhaustiveFindOnConfig(opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString::kConfigDatabasesNamespace,
+ BSON(DatabaseType::kNameFieldName << _dbName.toString()),
+ BSONObj(),
+ 1));
+
+ const auto databases = std::move(findResponse.docs);
+ uassert(ErrorCodes::IncompatibleShardingMetadata,
+ "Tried to find version for database {}, but found no databases"_format(
+ _dbName.toString()),
+ !databases.empty());
+
+ return DatabaseType::parse(IDLParserContext("DatabaseType"), databases.front());
+ }();
+ tassert(7120208,
+ "Error committing movePrimary: database version went backwards",
+ postCommitDbType.getVersion() > preCommitDbVersion);
+ uassert(7120209,
+ "Error committing movePrimary: update of config.databases failed",
+ postCommitDbType.getPrimary() != ShardingState::get(opCtx)->shardId());
+}
+
+void MovePrimaryCoordinator::clearDbMetadataOnPrimary(OperationContext* opCtx) const {
+ AutoGetDb autoDb(opCtx, _dbName, MODE_X);
+ DatabaseHolder::get(opCtx)->clearDbInfo(opCtx, _dbName);
+}
+
+void MovePrimaryCoordinator::dropStaleDataOnDonor(OperationContext* opCtx) const {
+ // Enable write blocking bypass to allow cleaning of stale data even if writes are disallowed.
+ WriteBlockBypass::get(opCtx).set(true);
+
+ DBDirectClient dbClient(opCtx);
+ invariant(_doc.getCollectionsToClone());
+ for (const auto& nss : *_doc.getCollectionsToClone()) {
+ const auto dropStatus = [&] {
+ BSONObj dropResult;
+ dbClient.runCommand(_dbName.toString(), BSON("drop" << nss.coll()), dropResult);
+ return getStatusFromCommandResult(dropResult);
+ }();
+
+ if (!dropStatus.isOK()) {
+ LOGV2_WARNING(7120210,
+ "Failed to drop stale collection on donor",
+ "namespace"_attr = nss,
+ "error"_attr = redact(dropStatus));
+ }
+ }
+}
+
+void MovePrimaryCoordinator::blockWritesLegacy(OperationContext* opCtx) const {
+ AutoGetDb autoDb(opCtx, _dbName, MODE_X);
+ auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire(
+ opCtx, _dbName, DSSAcquisitionMode::kExclusive);
+ scopedDss->setMovePrimaryInProgress(opCtx);
+}
+
+void MovePrimaryCoordinator::unblockWritesLegacy(OperationContext* opCtx) const {
+ AutoGetDb autoDb(opCtx, _dbName, MODE_IX);
+ auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire(
+ opCtx, _dbName, DSSAcquisitionMode::kExclusive);
+ scopedDss->unsetMovePrimaryInProgress(opCtx);
+}
+
+void MovePrimaryCoordinator::blockWrites(OperationContext* opCtx) const {
+ ShardingRecoveryService::get(opCtx)->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx, NamespaceString(_dbName), _csReason, ShardingCatalogClient::kLocalWriteConcern);
+}
+
+void MovePrimaryCoordinator::blockReads(OperationContext* opCtx) const {
+ ShardingRecoveryService::get(opCtx)->promoteRecoverableCriticalSectionToBlockAlsoReads(
+ opCtx, NamespaceString(_dbName), _csReason, ShardingCatalogClient::kLocalWriteConcern);
+}
+
+void MovePrimaryCoordinator::unblockReadsAndWrites(OperationContext* opCtx) const {
+ ShardingRecoveryService::get(opCtx)->releaseRecoverableCriticalSection(
+ opCtx, NamespaceString(_dbName), _csReason, ShardingCatalogClient::kLocalWriteConcern);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/move_primary_coordinator.h b/src/mongo/db/s/move_primary_coordinator.h
index f0e00e3ad91..944799689e4 100644
--- a/src/mongo/db/s/move_primary_coordinator.h
+++ b/src/mongo/db/s/move_primary_coordinator.h
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2021-present MongoDB, Inc.
+ * Copyright (C) 2022-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -31,6 +31,7 @@
#include "mongo/db/s/move_primary_coordinator_document_gen.h"
#include "mongo/db/s/sharding_ddl_coordinator.h"
+#include "mongo/s/client/shard.h"
namespace mongo {
@@ -42,7 +43,7 @@ public:
using Phase = MovePrimaryCoordinatorPhaseEnum;
MovePrimaryCoordinator(ShardingDDLCoordinatorService* service, const BSONObj& initialState);
- ~MovePrimaryCoordinator() = default;
+ virtual ~MovePrimaryCoordinator() = default;
void checkIfOptionsConflict(const BSONObj& doc) const override;
bool canAlwaysStartWhenUserWritesAreDisabled() const override;
@@ -53,7 +54,101 @@ private:
ExecutorFuture<void> _runImpl(std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept override;
+ ExecutorFuture<void> runMovePrimaryWorkflow(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& token) noexcept;
+
+ /**
+ * Logs in the `config.changelog` collection a specific event for `movePrimary` operations.
+ */
+ void logChange(OperationContext* opCtx, const std::string& what) const;
+
+ /**
+ * Returns the list of unsharded collections for the given database. These are the collections
+ * the recipient is expected to clone.
+ */
+ std::vector<NamespaceString> getUnshardedCollections(OperationContext* opCtx);
+
+ /**
+ * Requests to the recipient to clone all the collections of the given database currently owned
+ * by this shard. Once the cloning is complete, the recipient returns the list of the actually
+ * cloned collections as part of the response.
+ */
+ StatusWith<Shard::CommandResponse> cloneDataToRecipient(OperationContext* opCtx) const;
+
+ /**
+ * Returns `true` whether the list of actually cloned collections (returned by the cloning
+ * command response) matches the list of collection to clone (persisted in the coordinator
+ * document), `false` otherwise.
+ */
+ bool checkClonedData(Shard::CommandResponse cloneResponse) const;
+
+ /**
+ * Commits the new primary shard for the given database to the config server. The database
+ * version is passed to the config server's command as an idempotency key.
+ */
+ StatusWith<Shard::CommandResponse> commitMetadataToConfig(
+ OperationContext* opCtx, const DatabaseVersion& preCommitDbVersion) const;
+
+ /**
+ * Ensures that the metadata changes have been actually commited on the config server, asserting
+ * otherwise. This is a pedantic check to rule out any potentially disastrous problems.
+ */
+ void assertChangedMetadataOnConfig(OperationContext* opCtx,
+ const DatabaseVersion& preCommitDbVersion) const;
+
+ /**
+ * Clears the database metadata in the local catalog cache. Secondary nodes clear the database
+ * metadata as a result of exiting the critical section of the primary node
+ * (`kExitCriticalSection` phase).
+ */
+ void clearDbMetadataOnPrimary(OperationContext* opCtx) const;
+
+ /**
+ * Drops stale collections on the donor.
+ */
+ void dropStaleDataOnDonor(OperationContext* opCtx) const;
+
+ /**
+ * Blocks write operations on the database, causing them to fail with the
+ * `MovePrimaryInProgress` error.
+ *
+ * TODO (SERVER-71566): This is a synchronization mechanism specifically designed for
+ * `movePrimary` operations. It will likely be replaced by the critical section once the time
+ * frame in which writes are blocked is reduced. Writes are already blocked in the `kCatchup`
+ * phase.
+ */
+ void blockWritesLegacy(OperationContext* opCtx) const;
+
+ /**
+ * Unblocks write operations on the database.
+ *
+ * TODO (SERVER-71566): This is a synchronization mechanism specifically designed for
+ * `movePrimary` operations. It will likely be replaced by the critical section once the time
+ * frame in which writes are blocked is reduced. Reads and writes are already unblocked in the
+ // `kExitCriticalSection` phase.
+ */
+ void unblockWritesLegacy(OperationContext* opCtx) const;
+
+ /**
+ * Blocks write operations on the database, causing them to wait until the critical section has
+ * entered.
+ */
+ void blockWrites(OperationContext* opCtx) const;
+
+ /**
+ * Blocks read operations on the database, causing them to wait until the critical section has
+ * entered.
+ */
+ void blockReads(OperationContext* opCtx) const;
+
+ /**
+ * Unblocks read and write operations on the database.
+ */
+ void unblockReadsAndWrites(OperationContext* opCtx) const;
+
const DatabaseName _dbName;
+ const BSONObj _csReason;
};
} // namespace mongo
diff --git a/src/mongo/db/s/move_primary_coordinator_document.idl b/src/mongo/db/s/move_primary_coordinator_document.idl
index d1201309a1b..4fddbf862a4 100644
--- a/src/mongo/db/s/move_primary_coordinator_document.idl
+++ b/src/mongo/db/s/move_primary_coordinator_document.idl
@@ -42,11 +42,12 @@ enums:
type: string
values:
kUnset: "unset"
- kCheckPreconditions: "checkPreconditions"
- kCloneCatalogData: "cloneCatalogData"
+ kClone: "clone"
+ kCatchup: "catchup"
kEnterCriticalSection: "enterCriticalSection"
- kCommitMetadataChanges: "commitMetadataChanges"
- kCleanStaleData: "cleanStaleData"
+ kCommit: "commit"
+ kExitCriticalSection: "exitCriticalSection"
+ kClean: "clean"
structs:
MovePrimaryCoordinatorDocument:
@@ -63,3 +64,7 @@ structs:
toShardId:
type: shard_id
description: "Destination shard of the database."
+ collectionsToClone:
+ type: array<namespacestring>
+ description: "List of collections to be cloned by the destination shard."
+ optional: true
diff --git a/src/mongo/db/s/move_primary_source_manager.cpp b/src/mongo/db/s/move_primary_source_manager.cpp
index c4df7775f6a..2739c02ae8f 100644
--- a/src/mongo/db/s/move_primary_source_manager.cpp
+++ b/src/mongo/db/s/move_primary_source_manager.cpp
@@ -108,7 +108,7 @@ Status MovePrimarySourceManager::clone(OperationContext* opCtx) {
auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire(
opCtx, getNss().dbName(), DSSAcquisitionMode::kExclusive);
- scopedDss->setMovePrimarySourceManager(opCtx, this);
+ scopedDss->setMovePrimaryInProgress(opCtx);
}
_state = kCloning;
@@ -511,7 +511,7 @@ void MovePrimarySourceManager::_cleanup(OperationContext* opCtx) {
auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire(
opCtx, getNss().dbName(), DSSAcquisitionMode::kExclusive);
- scopedDss->clearMovePrimarySourceManager(opCtx);
+ scopedDss->unsetMovePrimaryInProgress(opCtx);
DatabaseHolder::get(opCtx)->clearDbInfo(opCtx, getNss().dbName());
// Leave the critical section if we're still registered.
diff --git a/src/mongo/db/s/move_primary_source_manager.h b/src/mongo/db/s/move_primary_source_manager.h
index 09bd637e0ee..c4de0f2254f 100644
--- a/src/mongo/db/s/move_primary_source_manager.h
+++ b/src/mongo/db/s/move_primary_source_manager.h
@@ -60,6 +60,8 @@ class Status;
*
* At any point in time it is safe to let the MovePrimarySourceManager object go out of scope in
* which case the destructor will take care of clean up based on how far we have advanced.
+ *
+ * TODO (SERVER-71309): Remove once 7.0 becomes last LTS.
*/
class MovePrimarySourceManager {
MovePrimarySourceManager(const MovePrimarySourceManager&) = delete;
diff --git a/src/mongo/db/s/shardsvr_move_primary_command.cpp b/src/mongo/db/s/shardsvr_move_primary_command.cpp
index 324cd8c48fb..55170519081 100644
--- a/src/mongo/db/s/shardsvr_move_primary_command.cpp
+++ b/src/mongo/db/s/shardsvr_move_primary_command.cpp
@@ -97,7 +97,7 @@ public:
const auto dbName = parseNs({boost::none, ""}, cmdObj).dbName();
const NamespaceString dbNss(dbName);
- const auto toShard = movePrimaryRequest.getTo();
+ const auto toShardArg = movePrimaryRequest.getTo();
uassert(
ErrorCodes::InvalidNamespace,
@@ -110,7 +110,7 @@ public:
uassert(ErrorCodes::InvalidOptions,
str::stream() << "you have to specify where you want to move it",
- !toShard.empty());
+ !toShardArg.empty());
CommandHelpers::uassertCommandRunWithMajority(getName(), opCtx->getWriteConcern());
@@ -127,7 +127,7 @@ public:
MovePrimaryCoordinatorDocument doc;
doc.setShardingDDLCoordinatorMetadata(
{{dbNss, DDLCoordinatorTypeEnum::kMovePrimaryNoResilient}});
- doc.setToShardId(toShard.toString());
+ doc.setToShardId(toShardArg.toString());
return doc.toBSON();
}();
@@ -140,11 +140,19 @@ public:
return coordinator->getCompletionFuture();
}
+ auto shardRegistry = Grid::get(opCtx)->shardRegistry();
+ // Ensure that the shard information is up-to-date as possible to catch the case where
+ // a shard with the same name, but with a different host, has been removed/re-added.
+ shardRegistry->reload(opCtx);
+ const auto toShard = uassertStatusOKWithContext(
+ shardRegistry->getShard(opCtx, toShardArg.toString()),
+ "Requested primary shard {} does not exist"_format(toShardArg));
+
const auto coordinatorDoc = [&] {
MovePrimaryCoordinatorDocument doc;
doc.setShardingDDLCoordinatorMetadata(
{{dbNss, DDLCoordinatorTypeEnum::kMovePrimary}});
- doc.setToShardId(toShard.toString());
+ doc.setToShardId(toShard->getId());
return doc.toBSON();
}();
diff --git a/src/mongo/s/sharding_feature_flags.idl b/src/mongo/s/sharding_feature_flags.idl
index 146fa60fcac..3af51ae0393 100644
--- a/src/mongo/s/sharding_feature_flags.idl
+++ b/src/mongo/s/sharding_feature_flags.idl
@@ -85,7 +85,7 @@ feature_flags:
# TODO (SERVER-71309): Remove once 7.0 becomes last LTS.
featureFlagResilientMovePrimary:
description: "Enable the resilient coordinator for the movePrimary command in order to improve
- the tolerance in case of a failure on donor and recipient nodes"
+ the tolerance in case of a failure on donor and recipient nodes"
cpp_varname: feature_flags::gResilientMovePrimary
default: false
featureFlagConfigSettingsSchema: