summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/move_primary_coordinator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/move_primary_coordinator.cpp')
-rw-r--r--src/mongo/db/s/move_primary_coordinator.cpp377
1 files changed, 359 insertions, 18 deletions
diff --git a/src/mongo/db/s/move_primary_coordinator.cpp b/src/mongo/db/s/move_primary_coordinator.cpp
index 3cf9b179d19..a9e3dd1a6e5 100644
--- a/src/mongo/db/s/move_primary_coordinator.cpp
+++ b/src/mongo/db/s/move_primary_coordinator.cpp
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2021-present MongoDB, Inc.
+ * Copyright (C) 2022-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -29,14 +29,36 @@
#include "mongo/db/s/move_primary_coordinator.h"
+#include "mongo/bson/util/bson_extract.h"
+#include "mongo/db/catalog/database_holder.h"
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/s/database_sharding_state.h"
+#include "mongo/db/s/sharding_logging.h"
+#include "mongo/db/s/sharding_recovery_service.h"
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/vector_clock_mutable.h"
+#include "mongo/db/write_block_bypass.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/request_types/move_primary_gen.h"
+
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
namespace mongo {
+MONGO_FAIL_POINT_DEFINE(hangBeforeCloningCatalogData);
+MONGO_FAIL_POINT_DEFINE(hangBeforeCleaningStaleData);
+
MovePrimaryCoordinator::MovePrimaryCoordinator(ShardingDDLCoordinatorService* service,
const BSONObj& initialState)
: RecoverableShardingDDLCoordinator(service, "MovePrimaryCoordinator", initialState),
- _dbName(nss().dbName()) {}
+ _dbName(nss().dbName()),
+ _csReason([&] {
+ BSONObjBuilder builder;
+ builder.append("command", "movePrimary");
+ builder.append("db", _dbName.toString());
+ builder.append("to", _doc.getToShardId());
+ return builder.obj();
+ }()) {}
bool MovePrimaryCoordinator::canAlwaysStartWhenUserWritesAreDisabled() const {
return true;
@@ -66,45 +88,364 @@ ExecutorFuture<void> MovePrimaryCoordinator::_runImpl(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& token) noexcept {
return ExecutorFuture<void>(**executor)
- .then(_buildPhaseHandler(Phase::kCheckPreconditions,
- [this, anchor = shared_from_this()] {
- auto opCtxHolder = cc().makeOperationContext();
- auto* opCtx = opCtxHolder.get();
- getForwardableOpMetadata().setOn(opCtx);
- }))
- .then(_buildPhaseHandler(Phase::kCloneCatalogData,
+ .then([this, executor, token, anchor = shared_from_this()] {
+ const auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ getForwardableOpMetadata().setOn(opCtx);
+
+ if (_doc.getToShardId() == ShardingState::get(opCtx)->shardId()) {
+ LOGV2(7120200,
+ "Database already on requested primary shard",
+ "db"_attr = _dbName,
+ "to"_attr = _doc.getToShardId());
+
+ return ExecutorFuture<void>(**executor);
+ }
+
+ return runMovePrimaryWorkflow(executor, token);
+ });
+}
+
+ExecutorFuture<void> MovePrimaryCoordinator::runMovePrimaryWorkflow(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor,
+ const CancellationToken& token) noexcept {
+ return ExecutorFuture<void>(**executor)
+ .then(_buildPhaseHandler(
+ Phase::kClone,
+ [this, anchor = shared_from_this()] {
+ const auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ getForwardableOpMetadata().setOn(opCtx);
+
+ LOGV2(7120201,
+ "Running movePrimary operation",
+ "db"_attr = _dbName,
+ "to"_attr = _doc.getToShardId());
+
+ logChange(opCtx, "start");
+
+ ScopeGuard unblockWritesLegacyOnExit([&] {
+ // TODO (SERVER-71444): Fix to be interruptible or document exception.
+ UninterruptibleLockGuard noInterrupt(opCtx->lockState()); // NOLINT
+ unblockWritesLegacy(opCtx);
+ });
+
+ if (!_firstExecution) {
+ uasserted(
+ 7120202,
+ "movePrimary operation on database {} failed cloning data to shard {}"_format(
+ _dbName.toString(), _doc.getToShardId().toString()));
+ }
+
+ blockWritesLegacy(opCtx);
+
+ if (MONGO_unlikely(hangBeforeCloningCatalogData.shouldFail())) {
+ LOGV2(7120203, "Hit hangBeforeCloningCatalogData");
+ hangBeforeCloningCatalogData.pauseWhileSet(opCtx);
+ }
+
+ _doc.setCollectionsToClone(getUnshardedCollections(opCtx));
+ _updateStateDocument(opCtx, StateDoc(_doc));
+
+ const auto cloneResponse = cloneDataToRecipient(opCtx);
+ const auto cloneStatus = Shard::CommandResponse::getEffectiveStatus(cloneResponse);
+ if (!cloneStatus.isOK() || !checkClonedData(cloneResponse.getValue())) {
+ uasserted(
+ cloneStatus.isOK() ? 7120204 : cloneStatus.code(),
+ "movePrimary operation on database {} failed cloning data to shard {}"_format(
+ _dbName.toString(), _doc.getToShardId().toString()));
+ }
+
+ // TODO (SERVER-71566): Temporary solution to cover the case of stepping down before
+ // actually entering the `kCatchup` phase.
+ blockWrites(opCtx);
+ }))
+ .then(_buildPhaseHandler(Phase::kCatchup,
[this, anchor = shared_from_this()] {
- auto opCtxHolder = cc().makeOperationContext();
+ const auto opCtxHolder = cc().makeOperationContext();
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
+
+ blockWrites(opCtx);
}))
.then(_buildPhaseHandler(Phase::kEnterCriticalSection,
[this, anchor = shared_from_this()] {
- auto opCtxHolder = cc().makeOperationContext();
+ const auto opCtxHolder = cc().makeOperationContext();
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
+
+ blockReads(opCtx);
}))
- .then(_buildPhaseHandler(Phase::kCommitMetadataChanges,
+ .then(_buildPhaseHandler(
+ Phase::kCommit,
+ [this, anchor = shared_from_this()] {
+ const auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ getForwardableOpMetadata().setOn(opCtx);
+
+ invariant(_doc.getDatabaseVersion());
+ const auto& preCommitDbVersion = *_doc.getDatabaseVersion();
+
+ const auto commitResponse = commitMetadataToConfig(opCtx, preCommitDbVersion);
+ if (commitResponse == ErrorCodes::ShardNotFound) {
+ unblockReadsAndWrites(opCtx);
+ }
+ uassertStatusOKWithContext(
+ Shard::CommandResponse::getEffectiveStatus(commitResponse),
+ "movePrimary operation on database {} failed to commit metadata changes"_format(
+ _dbName.toString()));
+
+ assertChangedMetadataOnConfig(opCtx, preCommitDbVersion);
+
+ // Checkpoint the vector clock to ensure causality in the event of a crash or
+ // shutdown.
+ VectorClockMutable::get(opCtx)->waitForDurableConfigTime().get(opCtx);
+
+ clearDbMetadataOnPrimary(opCtx);
+
+ logChange(opCtx, "commit");
+ }))
+ .then(_buildPhaseHandler(Phase::kExitCriticalSection,
[this, anchor = shared_from_this()] {
- auto opCtxHolder = cc().makeOperationContext();
+ const auto opCtxHolder = cc().makeOperationContext();
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
+
+ unblockReadsAndWrites(opCtx);
}))
- .then(_buildPhaseHandler(Phase::kCleanStaleData,
+ .then(_buildPhaseHandler(Phase::kClean,
[this, anchor = shared_from_this()] {
- auto opCtxHolder = cc().makeOperationContext();
+ const auto opCtxHolder = cc().makeOperationContext();
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
+
+ if (MONGO_unlikely(hangBeforeCleaningStaleData.shouldFail())) {
+ LOGV2(7120205, "Hit hangBeforeCleaningStaleData");
+ hangBeforeCleaningStaleData.pauseWhileSet(opCtx);
+ }
+
+ dropStaleDataOnDonor(opCtx);
+
+ LOGV2(7120206,
+ "Completed movePrimary operation",
+ "db"_attr = _dbName,
+ "to"_attr = _doc.getToShardId());
+
+ logChange(opCtx, "end");
}))
.onError([this, anchor = shared_from_this()](const Status& status) {
- LOGV2_ERROR(7120000,
- "Error running movePrimary",
- "database"_attr = nss(),
+ const auto opCtxHolder = cc().makeOperationContext();
+ auto* opCtx = opCtxHolder.get();
+ getForwardableOpMetadata().setOn(opCtx);
+
+ LOGV2_ERROR(7120207,
+ "Failed movePrimary operation",
+ "db"_attr = _dbName,
"to"_attr = _doc.getToShardId(),
"error"_attr = redact(status));
+ logChange(opCtx, "error");
+
return status;
});
}
+void MovePrimaryCoordinator::logChange(OperationContext* opCtx, const std::string& what) const {
+ BSONObjBuilder details;
+ details.append("from", ShardingState::get(opCtx)->shardId());
+ details.append("to", _doc.getToShardId());
+ ShardingLogging::get(opCtx)->logChange(
+ opCtx, "movePrimary.{}"_format(what), _dbName.toString(), details.obj());
+}
+
+std::vector<NamespaceString> MovePrimaryCoordinator::getUnshardedCollections(
+ OperationContext* opCtx) {
+ const auto allCollections = [&] {
+ DBDirectClient dbClient(opCtx);
+ const auto collInfos = dbClient.getCollectionInfos(_dbName,
+ BSON("type"
+ << "collection"));
+
+ std::vector<NamespaceString> colls;
+ for (const auto& collInfo : collInfos) {
+ std::string collName;
+ uassertStatusOK(bsonExtractStringField(collInfo, "name", &collName));
+
+ const NamespaceString nss(_dbName, collName);
+ if (!nss.isSystem() ||
+ nss.isLegalClientSystemNS(serverGlobalParams.featureCompatibility)) {
+ colls.push_back(nss);
+ }
+ }
+
+ std::sort(colls.begin(), colls.end());
+ return colls;
+ }();
+
+ const auto shardedCollections = [&] {
+ auto colls = Grid::get(opCtx)->catalogClient()->getAllShardedCollectionsForDb(
+ opCtx, _dbName.toString(), repl::ReadConcernLevel::kMajorityReadConcern);
+
+ std::sort(colls.begin(), colls.end());
+ return colls;
+ }();
+
+ std::vector<NamespaceString> unshardedCollections;
+ std::set_difference(allCollections.cbegin(),
+ allCollections.cend(),
+ shardedCollections.cbegin(),
+ shardedCollections.cend(),
+ std::back_inserter(unshardedCollections));
+
+ return unshardedCollections;
+}
+
+StatusWith<Shard::CommandResponse> MovePrimaryCoordinator::cloneDataToRecipient(
+ OperationContext* opCtx) const {
+ // Enable write blocking bypass to allow cloning of catalog data even if writes are disallowed.
+ WriteBlockBypass::get(opCtx).set(true);
+
+ const auto shardRegistry = Grid::get(opCtx)->shardRegistry();
+ const auto fromShard =
+ uassertStatusOK(shardRegistry->getShard(opCtx, ShardingState::get(opCtx)->shardId()));
+ const auto toShard = uassertStatusOK(shardRegistry->getShard(opCtx, _doc.getToShardId()));
+
+ const auto cloneCommand = [&] {
+ BSONObjBuilder commandBuilder;
+ commandBuilder.append("_shardsvrCloneCatalogData", _dbName.toString());
+ commandBuilder.append("from", fromShard->getConnString().toString());
+ return commandBuilder.obj();
+ }();
+
+ return toShard->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ NamespaceString::kAdminDb.toString(),
+ CommandHelpers::appendMajorityWriteConcern(cloneCommand),
+ Shard::RetryPolicy::kNotIdempotent);
+}
+
+bool MovePrimaryCoordinator::checkClonedData(Shard::CommandResponse cloneResponse) const {
+ invariant(_doc.getCollectionsToClone());
+ const auto& collectionToClone = *_doc.getCollectionsToClone();
+
+ const auto clonedCollections = [&] {
+ std::vector<NamespaceString> colls;
+ for (const auto& bsonElem : cloneResponse.response["clonedColls"].Obj()) {
+ if (bsonElem.type() == String) {
+ colls.push_back(NamespaceString(bsonElem.String()));
+ }
+ }
+
+ std::sort(colls.begin(), colls.end());
+ return colls;
+ }();
+
+ return collectionToClone.size() == clonedCollections.size() &&
+ std::equal(
+ collectionToClone.cbegin(), collectionToClone.cend(), clonedCollections.cbegin());
+}
+
+StatusWith<Shard::CommandResponse> MovePrimaryCoordinator::commitMetadataToConfig(
+ OperationContext* opCtx, const DatabaseVersion& preCommitDbVersion) const {
+ const auto commitCommand = [&] {
+ ConfigsvrCommitMovePrimary commitRequest(_dbName, preCommitDbVersion, _doc.getToShardId());
+ commitRequest.setDbName(NamespaceString::kAdminDb);
+ return commitRequest.toBSON({});
+ }();
+
+ const auto config = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+ return config->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ NamespaceString::kAdminDb.toString(),
+ CommandHelpers::appendMajorityWriteConcern(commitCommand),
+ Shard::RetryPolicy::kIdempotent);
+}
+
+void MovePrimaryCoordinator::assertChangedMetadataOnConfig(
+ OperationContext* opCtx, const DatabaseVersion& preCommitDbVersion) const {
+ const auto postCommitDbType = [&]() {
+ const auto config = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+ auto findResponse = uassertStatusOK(
+ config->exhaustiveFindOnConfig(opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString::kConfigDatabasesNamespace,
+ BSON(DatabaseType::kNameFieldName << _dbName.toString()),
+ BSONObj(),
+ 1));
+
+ const auto databases = std::move(findResponse.docs);
+ uassert(ErrorCodes::IncompatibleShardingMetadata,
+ "Tried to find version for database {}, but found no databases"_format(
+ _dbName.toString()),
+ !databases.empty());
+
+ return DatabaseType::parse(IDLParserContext("DatabaseType"), databases.front());
+ }();
+ tassert(7120208,
+ "Error committing movePrimary: database version went backwards",
+ postCommitDbType.getVersion() > preCommitDbVersion);
+ uassert(7120209,
+ "Error committing movePrimary: update of config.databases failed",
+ postCommitDbType.getPrimary() != ShardingState::get(opCtx)->shardId());
+}
+
+void MovePrimaryCoordinator::clearDbMetadataOnPrimary(OperationContext* opCtx) const {
+ AutoGetDb autoDb(opCtx, _dbName, MODE_X);
+ DatabaseHolder::get(opCtx)->clearDbInfo(opCtx, _dbName);
+}
+
+void MovePrimaryCoordinator::dropStaleDataOnDonor(OperationContext* opCtx) const {
+ // Enable write blocking bypass to allow cleaning of stale data even if writes are disallowed.
+ WriteBlockBypass::get(opCtx).set(true);
+
+ DBDirectClient dbClient(opCtx);
+ invariant(_doc.getCollectionsToClone());
+ for (const auto& nss : *_doc.getCollectionsToClone()) {
+ const auto dropStatus = [&] {
+ BSONObj dropResult;
+ dbClient.runCommand(_dbName.toString(), BSON("drop" << nss.coll()), dropResult);
+ return getStatusFromCommandResult(dropResult);
+ }();
+
+ if (!dropStatus.isOK()) {
+ LOGV2_WARNING(7120210,
+ "Failed to drop stale collection on donor",
+ "namespace"_attr = nss,
+ "error"_attr = redact(dropStatus));
+ }
+ }
+}
+
+void MovePrimaryCoordinator::blockWritesLegacy(OperationContext* opCtx) const {
+ AutoGetDb autoDb(opCtx, _dbName, MODE_X);
+ auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire(
+ opCtx, _dbName, DSSAcquisitionMode::kExclusive);
+ scopedDss->setMovePrimaryInProgress(opCtx);
+}
+
+void MovePrimaryCoordinator::unblockWritesLegacy(OperationContext* opCtx) const {
+ AutoGetDb autoDb(opCtx, _dbName, MODE_IX);
+ auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquire(
+ opCtx, _dbName, DSSAcquisitionMode::kExclusive);
+ scopedDss->unsetMovePrimaryInProgress(opCtx);
+}
+
+void MovePrimaryCoordinator::blockWrites(OperationContext* opCtx) const {
+ ShardingRecoveryService::get(opCtx)->acquireRecoverableCriticalSectionBlockWrites(
+ opCtx, NamespaceString(_dbName), _csReason, ShardingCatalogClient::kLocalWriteConcern);
+}
+
+void MovePrimaryCoordinator::blockReads(OperationContext* opCtx) const {
+ ShardingRecoveryService::get(opCtx)->promoteRecoverableCriticalSectionToBlockAlsoReads(
+ opCtx, NamespaceString(_dbName), _csReason, ShardingCatalogClient::kLocalWriteConcern);
+}
+
+void MovePrimaryCoordinator::unblockReadsAndWrites(OperationContext* opCtx) const {
+ ShardingRecoveryService::get(opCtx)->releaseRecoverableCriticalSection(
+ opCtx, NamespaceString(_dbName), _csReason, ShardingCatalogClient::kLocalWriteConcern);
+}
+
} // namespace mongo