From eb90b652aa02c9f6b1d805abfd6956ba7b312f60 Mon Sep 17 00:00:00 2001 From: Antonio Fuschetto Date: Tue, 8 Nov 2022 17:07:36 +0000 Subject: SERVER-68541 Serialize the removeShard and commitMovePrimary commands to prevent the loss of moved collections --- .../db_reads_while_recovering_all_commands.js | 1 + src/mongo/db/s/SConscript | 1 + .../configsvr_commit_move_primary_command.cpp | 105 +++++++++++++ src/mongo/db/s/config/sharding_catalog_manager.h | 9 ++ ...harding_catalog_manager_database_operations.cpp | 46 ++++++ src/mongo/db/s/move_primary_source_manager.cpp | 169 +++++++++++++-------- src/mongo/db/s/move_primary_source_manager.h | 16 +- src/mongo/s/request_types/move_primary.idl | 18 +++ 8 files changed, 298 insertions(+), 67 deletions(-) create mode 100644 src/mongo/db/s/config/configsvr_commit_move_primary_command.cpp diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js index 4c7efbfe269..1a30df70531 100644 --- a/jstests/replsets/db_reads_while_recovering_all_commands.js +++ b/jstests/replsets/db_reads_while_recovering_all_commands.js @@ -39,6 +39,7 @@ const allCommands = { _configsvrCommitChunkMigration: {skip: isPrimaryOnly}, _configsvrCommitChunkSplit: {skip: isPrimaryOnly}, _configsvrCommitIndex: {skip: isPrimaryOnly}, + _configsvrCommitMovePrimary: {skip: isPrimaryOnly}, _configsvrCommitReshardCollection: {skip: isPrimaryOnly}, _configsvrConfigureCollectionBalancing: {skip: isPrimaryOnly}, _configsvrCreateDatabase: {skip: isPrimaryOnly}, diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 2694fa7b51b..eb490f4e9c0 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -377,6 +377,7 @@ env.Library( 'config/configsvr_collmod_command.cpp', 'config/configsvr_commit_chunk_migration_command.cpp', 'config/configsvr_commit_index_command.cpp', + 'config/configsvr_commit_move_primary_command.cpp', 'config/configsvr_commit_reshard_collection_command.cpp', 'config/configsvr_configure_collection_balancing.cpp', 'config/configsvr_control_balancer_command.cpp', diff --git a/src/mongo/db/s/config/configsvr_commit_move_primary_command.cpp b/src/mongo/db/s/config/configsvr_commit_move_primary_command.cpp new file mode 100644 index 00000000000..a66e8f0e3ff --- /dev/null +++ b/src/mongo/db/s/config/configsvr_commit_move_primary_command.cpp @@ -0,0 +1,105 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/commands.h" +#include "mongo/db/repl/read_concern_args.h" +#include "mongo/db/s/config/sharding_catalog_manager.h" +#include "mongo/s/request_types/move_primary_gen.h" + +namespace mongo { +namespace { + +class ConfigsvrCommitMovePrimaryCommand final + : public TypedCommand { +public: + using Request = ConfigsvrCommitMovePrimary; + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + void typedRun(OperationContext* opCtx) { + uassert(ErrorCodes::IllegalOperation, + str::stream() << Request::kCommandName << " can only be run on config servers", + serverGlobalParams.clusterRole == ClusterRole::ConfigServer); + + CommandHelpers::uassertCommandRunWithMajority(Request::kCommandName, + opCtx->getWriteConcern()); + + // Set the operation context read concern level to local for reads into the config + // database. + repl::ReadConcernArgs::get(opCtx) = + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + + ShardingCatalogManager::get(opCtx)->commitMovePrimary( + opCtx, + request().getCommandParameter(), + request().getExpectedDatabaseVersion(), + request().getTo()); + } + + private: + NamespaceString ns() const override { + return NamespaceString(request().getDbName()); + } + + bool supportsWriteConcern() const override { + return true; + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(), + ActionType::internal)); + } + }; + +private: + std::string help() const override { + return "Reassign a new primary shard for the given database on the config server. This is " + "an internal command only invokable on the config server, therefore do not call " + "directly."; + } + + bool adminOnly() const override { + return true; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext* context) const override { + return AllowedOnSecondary::kNever; + } +} configsvrCommitMovePrimaryCommand; + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h index 86ac24fa736..c3b588b8fa6 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.h +++ b/src/mongo/db/s/config/sharding_catalog_manager.h @@ -393,6 +393,15 @@ public: StringData dbName, const boost::optional& optPrimaryShard); + /** + * Updates the metadata in config.databases collection with the new primary shard for the given + * database. This also advances the database's lastmod. + */ + void commitMovePrimary(OperationContext* opCtx, + const DatabaseName& dbName, + const DatabaseVersion& expectedDbVersion, + const ShardId& toShard); + // // Collection Operations // diff --git a/src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp index 19587e579e6..b6065b8c814 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp @@ -36,6 +36,7 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" #include "mongo/db/ops/write_ops.h" +#include "mongo/db/persistent_task_store.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/ddl_lock_manager.h" #include "mongo/db/server_options.h" @@ -245,4 +246,49 @@ DatabaseType ShardingCatalogManager::createDatabase( return database; } +void ShardingCatalogManager::commitMovePrimary(OperationContext* opCtx, + const DatabaseName& dbName, + const DatabaseVersion& expectedDbVersion, + const ShardId& toShard) { + // Hold the shard lock until the entire commit finishes to serialize with removeShard. + Lock::SharedLock shardLock(opCtx->lockState(), _kShardMembershipLock); + + const auto updateOp = [&] { + const auto query = [&] { + BSONObjBuilder bsonBuilder; + bsonBuilder.append(DatabaseType::kNameFieldName, dbName.db()); + // Include the version in the update filter to be resilient to potential network retries + // and delayed messages. + for (const auto [fieldName, fieldValue] : expectedDbVersion.toBSON()) { + const auto dottedFieldName = DatabaseType::kVersionFieldName + "." + fieldName; + bsonBuilder.appendAs(fieldValue, dottedFieldName); + } + return bsonBuilder.obj(); + }(); + + const auto update = [&] { + const auto newDbVersion = expectedDbVersion.makeUpdated(); + + BSONObjBuilder bsonBuilder; + bsonBuilder.append(DatabaseType::kPrimaryFieldName, toShard); + bsonBuilder.append(DatabaseType::kVersionFieldName, newDbVersion.toBSON()); + return BSON("$set" << bsonBuilder.obj()); + }(); + + write_ops::UpdateCommandRequest updateOp(NamespaceString::kConfigDatabasesNamespace); + updateOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(query); + entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(update)); + return entry; + }()}); + + return updateOp; + }(); + + DBDirectClient dbClient(opCtx); + const auto commandResponse = dbClient.runCommand(updateOp.serialize({})); + uassertStatusOK(getStatusFromWriteCommandReply(commandResponse->getCommandReply())); +} + } // namespace mongo diff --git a/src/mongo/db/s/move_primary_source_manager.cpp b/src/mongo/db/s/move_primary_source_manager.cpp index dfb6e44ed80..42a46229012 100644 --- a/src/mongo/db/s/move_primary_source_manager.cpp +++ b/src/mongo/db/s/move_primary_source_manager.cpp @@ -43,6 +43,7 @@ #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/grid.h" +#include "mongo/s/request_types/move_primary_gen.h" #include "mongo/util/exit.h" #include "mongo/util/scopeguard.h" @@ -212,6 +213,8 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) { invariant(_state == kCriticalSection); ScopeGuard scopedGuard([&] { cleanupOnError(opCtx); }); + boost::optional expectedDbVersion; + { AutoGetDb autoDb(opCtx, getNss().dbName(), MODE_X); @@ -227,13 +230,13 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) { // Read operations must begin to wait on the critical section just before we send the // commit operation to the config server dss->enterCriticalSectionCommitPhase(opCtx, dssLock, _critSecReason); - } - auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + expectedDbVersion = DatabaseHolder::get(opCtx)->getDbVersion(opCtx, _dbname); + } auto commitStatus = [&]() { try { - return _commitOnConfig(opCtx); + return _commitOnConfig(opCtx, *expectedDbVersion); } catch (const DBException& ex) { return ex.toStatus(); } @@ -241,8 +244,7 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) { if (!commitStatus.isOK()) { // Need to get the latest optime in case the refresh request goes to a secondary -- - // otherwise the read won't wait for the write that _commitOnConfig may have - // done + // otherwise the read won't wait for the write that commit on config server may have done. LOGV2(22044, "Error occurred while committing the movePrimary. Performing a majority write " "against the config server to obtain its latest optime: {error}", @@ -326,79 +328,119 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) { return Status::OK(); } -Status MovePrimarySourceManager::_commitOnConfig(OperationContext* opCtx) { - auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); - - auto getDatabaseEntry = [&]() { - auto findResponse = uassertStatusOK( - configShard->exhaustiveFindOnConfig(opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kMajorityReadConcern, - NamespaceString::kConfigDatabasesNamespace, - BSON(DatabaseType::kNameFieldName << _dbname), - BSON(DatabaseType::kNameFieldName << -1), - 1)); - - const auto databasesVector = std::move(findResponse.docs); - uassert(ErrorCodes::IncompatibleShardingMetadata, - str::stream() << "Tried to find max database version for database '" << _dbname - << "', but found no databases", - !databasesVector.empty()); +Status MovePrimarySourceManager::_commitOnConfig(OperationContext* opCtx, + const DatabaseVersion& expectedDbVersion) { + LOGV2_DEBUG(6854100, + 3, + "Committing movePrimary", + "db"_attr = _dbname, + "fromShard"_attr = _fromShard, + "toShard"_attr = _toShard, + "expectedDbVersion"_attr = expectedDbVersion); + + const auto commitStatus = [&] { + ConfigsvrCommitMovePrimary commitRequest(_dbname, expectedDbVersion, _toShard); + commitRequest.setDbName(NamespaceString::kAdminDb); + + const auto commitResponse = + Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + NamespaceString::kAdminDb.toString(), + CommandHelpers::appendMajorityWriteConcern(commitRequest.toBSON({})), + Shard::RetryPolicy::kIdempotent); + + const auto status = Shard::CommandResponse::getEffectiveStatus(commitResponse); + if (status != ErrorCodes::CommandNotFound) { + return status; + } - return DatabaseType::parse(IDLParserContext("DatabaseType"), databasesVector.front()); - }; + LOGV2(6854101, + "_configsvrCommitMovePrimary command not found on config server, so try to update " + "the metadata document directly", + "db"_attr = _dbname); - const auto dbType = getDatabaseEntry(); + // The fallback logic is not synchronized with the removeShard command and simultaneous + // invocations of movePrimary and removeShard can lead to data loss. + return _fallbackCommitOnConfig(opCtx, expectedDbVersion); + }(); - if (dbType.getPrimary() == _toShard) { - return Status::OK(); + if (!commitStatus.isOK()) { + LOGV2(6854102, + "Error committing movePrimary", + "db"_attr = _dbname, + "error"_attr = redact(commitStatus)); + return commitStatus; } - auto newDbType = dbType; - newDbType.setPrimary(_toShard); + const auto updatedDbType = [&]() { + auto findResponse = uassertStatusOK( + Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kMajorityReadConcern, + NamespaceString::kConfigDatabasesNamespace, + BSON(DatabaseType::kNameFieldName << _dbname), + BSON(DatabaseType::kNameFieldName << -1), + 1)); + + const auto databases = std::move(findResponse.docs); + uassert(ErrorCodes::IncompatibleShardingMetadata, + "Tried to find version for database {}, but found no databases"_format(_dbname), + !databases.empty()); + + return DatabaseType::parse(IDLParserContext("DatabaseType"), databases.front()); + }(); + tassert(6851100, + "Error committing movePrimary: database version went backwards", + updatedDbType.getVersion() > expectedDbVersion); + uassert(6851101, + "Error committing movePrimary: update of config.databases failed", + updatedDbType.getPrimary() != _fromShard); - auto const currentDatabaseVersion = dbType.getVersion(); + LOGV2_DEBUG(6854103, + 3, + "Commited movePrimary", + "db"_attr = _dbname, + "fromShard"_attr = _fromShard, + "toShard"_attr = _toShard, + "updatedDbVersion"_attr = updatedDbType.getVersion()); - newDbType.setVersion(currentDatabaseVersion.makeUpdated()); + return Status::OK(); +} - auto const updateQuery = [&] { - BSONObjBuilder queryBuilder; - queryBuilder.append(DatabaseType::kNameFieldName, _dbname); +Status MovePrimarySourceManager::_fallbackCommitOnConfig(OperationContext* opCtx, + const DatabaseVersion& expectedDbVersion) { + const auto query = [&] { + BSONObjBuilder bsonBuilder; + bsonBuilder.append(DatabaseType::kNameFieldName, _dbname); // Include the version in the update filter to be resilient to potential network retries and // delayed messages. - for (auto [fieldName, elem] : currentDatabaseVersion.toBSON()) { - auto dottedFieldName = DatabaseType::kVersionFieldName + "." + fieldName; - queryBuilder.appendAs(elem, dottedFieldName); + for (const auto [fieldName, fieldValue] : expectedDbVersion.toBSON()) { + const auto dottedFieldName = DatabaseType::kVersionFieldName + "." + fieldName; + bsonBuilder.appendAs(fieldValue, dottedFieldName); } - return queryBuilder.obj(); + return bsonBuilder.obj(); }(); - auto updateStatus = Grid::get(opCtx)->catalogClient()->updateConfigDocument( - opCtx, - NamespaceString::kConfigDatabasesNamespace, - updateQuery, - newDbType.toBSON(), - false, - ShardingCatalogClient::kMajorityWriteConcern); - - if (!updateStatus.isOK()) { - LOGV2(5448803, - "Error committing movePrimary for {db}: {error}", - "Error committing movePrimary", - "db"_attr = _dbname, - "error"_attr = redact(updateStatus.getStatus())); - return updateStatus.getStatus(); - } + const auto update = [&] { + const auto newDbVersion = expectedDbVersion.makeUpdated(); - const auto updatedDbType = getDatabaseEntry(); - tassert(6851100, - "Error committing movePrimary: database version went backwards", - updatedDbType.getVersion() > currentDatabaseVersion); - uassert(6851101, - "Error committing movePrimary: update of `config.databases` failed", - updatedDbType.getPrimary() != _fromShard); + BSONObjBuilder bsonBuilder; + bsonBuilder.append(DatabaseType::kPrimaryFieldName, _toShard); + bsonBuilder.append(DatabaseType::kVersionFieldName, newDbVersion.toBSON()); + return BSON("$set" << bsonBuilder.obj()); + }(); - return Status::OK(); + return Grid::get(opCtx) + ->catalogClient() + ->updateConfigDocument(opCtx, + NamespaceString::kConfigDatabasesNamespace, + query, + update, + false, + ShardingCatalogClient::kMajorityWriteConcern) + .getStatus(); } Status MovePrimarySourceManager::cleanStaleData(OperationContext* opCtx) { @@ -429,7 +471,6 @@ Status MovePrimarySourceManager::cleanStaleData(OperationContext* opCtx) { return Status::OK(); } - void MovePrimarySourceManager::cleanupOnError(OperationContext* opCtx) { if (_state == kDone) { return; diff --git a/src/mongo/db/s/move_primary_source_manager.h b/src/mongo/db/s/move_primary_source_manager.h index dd6d3d0d03f..8f802d5da9e 100644 --- a/src/mongo/db/s/move_primary_source_manager.h +++ b/src/mongo/db/s/move_primary_source_manager.h @@ -145,10 +145,20 @@ private: } /** - * Updates CSRS metadata in config.databases collection to move the given primary database on - * its new shard. + * Invokes the _configsvrCommitMovePrimary command of the config server to reassign the primary + * shard of the database. */ - Status _commitOnConfig(OperationContext* opCtx); + Status _commitOnConfig(OperationContext* opCtx, const DatabaseVersion& expectedDbVersion); + + /** + * Updates the config server's metadata in config.databases collection to reassign the primary + * shard of the database. + * + * This logic is not synchronized with the removeShard command and simultaneous invocations of + * movePrimary and removeShard can lead to data loss. + */ + Status _fallbackCommitOnConfig(OperationContext* opCtx, + const DatabaseVersion& expectedDbVersion); // Used to track the current state of the source manager. See the methods above, which have // comments explaining the various state transitions. diff --git a/src/mongo/s/request_types/move_primary.idl b/src/mongo/s/request_types/move_primary.idl index 51d4a095b8a..c63ed43a558 100644 --- a/src/mongo/s/request_types/move_primary.idl +++ b/src/mongo/s/request_types/move_primary.idl @@ -33,6 +33,7 @@ global: imports: - "mongo/idl/basic_types.idl" + - "mongo/s/sharding_types.idl" structs: movePrimary: @@ -62,3 +63,20 @@ structs: to: type: string description: "The shard serving as the destination for un-sharded collections." + +commands: + _configsvrCommitMovePrimary: + command_name: _configsvrCommitMovePrimary + description: Reassign a new primary shard for the given database on the config server + cpp_name: ConfigsvrCommitMovePrimary + namespace: type + type: database_name + api_version: + strict: false + fields: + expectedDatabaseVersion: + type: database_version + description: Database version known by the current primary shard + to: + type: shard_id + description: Shard serving as the destination for un-sharded collections -- cgit v1.2.1