summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAntonio Fuschetto <antonio.fuschetto@mongodb.com>2022-11-08 17:07:36 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-08 17:41:30 +0000
commiteb90b652aa02c9f6b1d805abfd6956ba7b312f60 (patch)
tree453f91a0b6484219209129286274cc6fe71b1b54
parent13a0b0db261bfa8130d67db5d4730cb3563456dd (diff)
downloadmongo-eb90b652aa02c9f6b1d805abfd6956ba7b312f60.tar.gz
SERVER-68541 Serialize the removeShard and commitMovePrimary commands to prevent the loss of moved collections
-rw-r--r--jstests/replsets/db_reads_while_recovering_all_commands.js1
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/config/configsvr_commit_move_primary_command.cpp105
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager.h9
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp46
-rw-r--r--src/mongo/db/s/move_primary_source_manager.cpp169
-rw-r--r--src/mongo/db/s/move_primary_source_manager.h16
-rw-r--r--src/mongo/s/request_types/move_primary.idl18
8 files changed, 298 insertions, 67 deletions
diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js
index 4c7efbfe269..1a30df70531 100644
--- a/jstests/replsets/db_reads_while_recovering_all_commands.js
+++ b/jstests/replsets/db_reads_while_recovering_all_commands.js
@@ -39,6 +39,7 @@ const allCommands = {
_configsvrCommitChunkMigration: {skip: isPrimaryOnly},
_configsvrCommitChunkSplit: {skip: isPrimaryOnly},
_configsvrCommitIndex: {skip: isPrimaryOnly},
+ _configsvrCommitMovePrimary: {skip: isPrimaryOnly},
_configsvrCommitReshardCollection: {skip: isPrimaryOnly},
_configsvrConfigureCollectionBalancing: {skip: isPrimaryOnly},
_configsvrCreateDatabase: {skip: isPrimaryOnly},
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 2694fa7b51b..eb490f4e9c0 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -377,6 +377,7 @@ env.Library(
'config/configsvr_collmod_command.cpp',
'config/configsvr_commit_chunk_migration_command.cpp',
'config/configsvr_commit_index_command.cpp',
+ 'config/configsvr_commit_move_primary_command.cpp',
'config/configsvr_commit_reshard_collection_command.cpp',
'config/configsvr_configure_collection_balancing.cpp',
'config/configsvr_control_balancer_command.cpp',
diff --git a/src/mongo/db/s/config/configsvr_commit_move_primary_command.cpp b/src/mongo/db/s/config/configsvr_commit_move_primary_command.cpp
new file mode 100644
index 00000000000..a66e8f0e3ff
--- /dev/null
+++ b/src/mongo/db/s/config/configsvr_commit_move_primary_command.cpp
@@ -0,0 +1,105 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/repl/read_concern_args.h"
+#include "mongo/db/s/config/sharding_catalog_manager.h"
+#include "mongo/s/request_types/move_primary_gen.h"
+
+namespace mongo {
+namespace {
+
+class ConfigsvrCommitMovePrimaryCommand final
+ : public TypedCommand<ConfigsvrCommitMovePrimaryCommand> {
+public:
+ using Request = ConfigsvrCommitMovePrimary;
+
+ class Invocation final : public InvocationBase {
+ public:
+ using InvocationBase::InvocationBase;
+
+ void typedRun(OperationContext* opCtx) {
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << Request::kCommandName << " can only be run on config servers",
+ serverGlobalParams.clusterRole == ClusterRole::ConfigServer);
+
+ CommandHelpers::uassertCommandRunWithMajority(Request::kCommandName,
+ opCtx->getWriteConcern());
+
+ // Set the operation context read concern level to local for reads into the config
+ // database.
+ repl::ReadConcernArgs::get(opCtx) =
+ repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
+
+ ShardingCatalogManager::get(opCtx)->commitMovePrimary(
+ opCtx,
+ request().getCommandParameter(),
+ request().getExpectedDatabaseVersion(),
+ request().getTo());
+ }
+
+ private:
+ NamespaceString ns() const override {
+ return NamespaceString(request().getDbName());
+ }
+
+ bool supportsWriteConcern() const override {
+ return true;
+ }
+
+ void doCheckAuthorization(OperationContext* opCtx) const override {
+ uassert(ErrorCodes::Unauthorized,
+ "Unauthorized",
+ AuthorizationSession::get(opCtx->getClient())
+ ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(),
+ ActionType::internal));
+ }
+ };
+
+private:
+ std::string help() const override {
+ return "Reassign a new primary shard for the given database on the config server. This is "
+ "an internal command only invokable on the config server, therefore do not call "
+ "directly.";
+ }
+
+ bool adminOnly() const override {
+ return true;
+ }
+
+ AllowedOnSecondary secondaryAllowed(ServiceContext* context) const override {
+ return AllowedOnSecondary::kNever;
+ }
+} configsvrCommitMovePrimaryCommand;
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h
index 86ac24fa736..c3b588b8fa6 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager.h
+++ b/src/mongo/db/s/config/sharding_catalog_manager.h
@@ -393,6 +393,15 @@ public:
StringData dbName,
const boost::optional<ShardId>& optPrimaryShard);
+ /**
+ * Updates the metadata in config.databases collection with the new primary shard for the given
+ * database. This also advances the database's lastmod.
+ */
+ void commitMovePrimary(OperationContext* opCtx,
+ const DatabaseName& dbName,
+ const DatabaseVersion& expectedDbVersion,
+ const ShardId& toShard);
+
//
// Collection Operations
//
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp
index 19587e579e6..b6065b8c814 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/ops/write_ops.h"
+#include "mongo/db/persistent_task_store.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/s/ddl_lock_manager.h"
#include "mongo/db/server_options.h"
@@ -245,4 +246,49 @@ DatabaseType ShardingCatalogManager::createDatabase(
return database;
}
+void ShardingCatalogManager::commitMovePrimary(OperationContext* opCtx,
+ const DatabaseName& dbName,
+ const DatabaseVersion& expectedDbVersion,
+ const ShardId& toShard) {
+ // Hold the shard lock until the entire commit finishes to serialize with removeShard.
+ Lock::SharedLock shardLock(opCtx->lockState(), _kShardMembershipLock);
+
+ const auto updateOp = [&] {
+ const auto query = [&] {
+ BSONObjBuilder bsonBuilder;
+ bsonBuilder.append(DatabaseType::kNameFieldName, dbName.db());
+ // Include the version in the update filter to be resilient to potential network retries
+ // and delayed messages.
+ for (const auto [fieldName, fieldValue] : expectedDbVersion.toBSON()) {
+ const auto dottedFieldName = DatabaseType::kVersionFieldName + "." + fieldName;
+ bsonBuilder.appendAs(fieldValue, dottedFieldName);
+ }
+ return bsonBuilder.obj();
+ }();
+
+ const auto update = [&] {
+ const auto newDbVersion = expectedDbVersion.makeUpdated();
+
+ BSONObjBuilder bsonBuilder;
+ bsonBuilder.append(DatabaseType::kPrimaryFieldName, toShard);
+ bsonBuilder.append(DatabaseType::kVersionFieldName, newDbVersion.toBSON());
+ return BSON("$set" << bsonBuilder.obj());
+ }();
+
+ write_ops::UpdateCommandRequest updateOp(NamespaceString::kConfigDatabasesNamespace);
+ updateOp.setUpdates({[&] {
+ write_ops::UpdateOpEntry entry;
+ entry.setQ(query);
+ entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(update));
+ return entry;
+ }()});
+
+ return updateOp;
+ }();
+
+ DBDirectClient dbClient(opCtx);
+ const auto commandResponse = dbClient.runCommand(updateOp.serialize({}));
+ uassertStatusOK(getStatusFromWriteCommandReply(commandResponse->getCommandReply()));
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/move_primary_source_manager.cpp b/src/mongo/db/s/move_primary_source_manager.cpp
index dfb6e44ed80..42a46229012 100644
--- a/src/mongo/db/s/move_primary_source_manager.cpp
+++ b/src/mongo/db/s/move_primary_source_manager.cpp
@@ -43,6 +43,7 @@
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/grid.h"
+#include "mongo/s/request_types/move_primary_gen.h"
#include "mongo/util/exit.h"
#include "mongo/util/scopeguard.h"
@@ -212,6 +213,8 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) {
invariant(_state == kCriticalSection);
ScopeGuard scopedGuard([&] { cleanupOnError(opCtx); });
+ boost::optional<DatabaseVersion> expectedDbVersion;
+
{
AutoGetDb autoDb(opCtx, getNss().dbName(), MODE_X);
@@ -227,13 +230,13 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) {
// Read operations must begin to wait on the critical section just before we send the
// commit operation to the config server
dss->enterCriticalSectionCommitPhase(opCtx, dssLock, _critSecReason);
- }
- auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+ expectedDbVersion = DatabaseHolder::get(opCtx)->getDbVersion(opCtx, _dbname);
+ }
auto commitStatus = [&]() {
try {
- return _commitOnConfig(opCtx);
+ return _commitOnConfig(opCtx, *expectedDbVersion);
} catch (const DBException& ex) {
return ex.toStatus();
}
@@ -241,8 +244,7 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) {
if (!commitStatus.isOK()) {
// Need to get the latest optime in case the refresh request goes to a secondary --
- // otherwise the read won't wait for the write that _commitOnConfig may have
- // done
+ // otherwise the read won't wait for the write that commit on config server may have done.
LOGV2(22044,
"Error occurred while committing the movePrimary. Performing a majority write "
"against the config server to obtain its latest optime: {error}",
@@ -326,79 +328,119 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) {
return Status::OK();
}
-Status MovePrimarySourceManager::_commitOnConfig(OperationContext* opCtx) {
- auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
-
- auto getDatabaseEntry = [&]() {
- auto findResponse = uassertStatusOK(
- configShard->exhaustiveFindOnConfig(opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- repl::ReadConcernLevel::kMajorityReadConcern,
- NamespaceString::kConfigDatabasesNamespace,
- BSON(DatabaseType::kNameFieldName << _dbname),
- BSON(DatabaseType::kNameFieldName << -1),
- 1));
-
- const auto databasesVector = std::move(findResponse.docs);
- uassert(ErrorCodes::IncompatibleShardingMetadata,
- str::stream() << "Tried to find max database version for database '" << _dbname
- << "', but found no databases",
- !databasesVector.empty());
+Status MovePrimarySourceManager::_commitOnConfig(OperationContext* opCtx,
+ const DatabaseVersion& expectedDbVersion) {
+ LOGV2_DEBUG(6854100,
+ 3,
+ "Committing movePrimary",
+ "db"_attr = _dbname,
+ "fromShard"_attr = _fromShard,
+ "toShard"_attr = _toShard,
+ "expectedDbVersion"_attr = expectedDbVersion);
+
+ const auto commitStatus = [&] {
+ ConfigsvrCommitMovePrimary commitRequest(_dbname, expectedDbVersion, _toShard);
+ commitRequest.setDbName(NamespaceString::kAdminDb);
+
+ const auto commitResponse =
+ Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ NamespaceString::kAdminDb.toString(),
+ CommandHelpers::appendMajorityWriteConcern(commitRequest.toBSON({})),
+ Shard::RetryPolicy::kIdempotent);
+
+ const auto status = Shard::CommandResponse::getEffectiveStatus(commitResponse);
+ if (status != ErrorCodes::CommandNotFound) {
+ return status;
+ }
- return DatabaseType::parse(IDLParserContext("DatabaseType"), databasesVector.front());
- };
+ LOGV2(6854101,
+ "_configsvrCommitMovePrimary command not found on config server, so try to update "
+ "the metadata document directly",
+ "db"_attr = _dbname);
- const auto dbType = getDatabaseEntry();
+ // The fallback logic is not synchronized with the removeShard command and simultaneous
+ // invocations of movePrimary and removeShard can lead to data loss.
+ return _fallbackCommitOnConfig(opCtx, expectedDbVersion);
+ }();
- if (dbType.getPrimary() == _toShard) {
- return Status::OK();
+ if (!commitStatus.isOK()) {
+ LOGV2(6854102,
+ "Error committing movePrimary",
+ "db"_attr = _dbname,
+ "error"_attr = redact(commitStatus));
+ return commitStatus;
}
- auto newDbType = dbType;
- newDbType.setPrimary(_toShard);
+ const auto updatedDbType = [&]() {
+ auto findResponse = uassertStatusOK(
+ Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString::kConfigDatabasesNamespace,
+ BSON(DatabaseType::kNameFieldName << _dbname),
+ BSON(DatabaseType::kNameFieldName << -1),
+ 1));
+
+ const auto databases = std::move(findResponse.docs);
+ uassert(ErrorCodes::IncompatibleShardingMetadata,
+ "Tried to find version for database {}, but found no databases"_format(_dbname),
+ !databases.empty());
+
+ return DatabaseType::parse(IDLParserContext("DatabaseType"), databases.front());
+ }();
+ tassert(6851100,
+ "Error committing movePrimary: database version went backwards",
+ updatedDbType.getVersion() > expectedDbVersion);
+ uassert(6851101,
+ "Error committing movePrimary: update of config.databases failed",
+ updatedDbType.getPrimary() != _fromShard);
- auto const currentDatabaseVersion = dbType.getVersion();
+ LOGV2_DEBUG(6854103,
+ 3,
+ "Commited movePrimary",
+ "db"_attr = _dbname,
+ "fromShard"_attr = _fromShard,
+ "toShard"_attr = _toShard,
+ "updatedDbVersion"_attr = updatedDbType.getVersion());
- newDbType.setVersion(currentDatabaseVersion.makeUpdated());
+ return Status::OK();
+}
- auto const updateQuery = [&] {
- BSONObjBuilder queryBuilder;
- queryBuilder.append(DatabaseType::kNameFieldName, _dbname);
+Status MovePrimarySourceManager::_fallbackCommitOnConfig(OperationContext* opCtx,
+ const DatabaseVersion& expectedDbVersion) {
+ const auto query = [&] {
+ BSONObjBuilder bsonBuilder;
+ bsonBuilder.append(DatabaseType::kNameFieldName, _dbname);
// Include the version in the update filter to be resilient to potential network retries and
// delayed messages.
- for (auto [fieldName, elem] : currentDatabaseVersion.toBSON()) {
- auto dottedFieldName = DatabaseType::kVersionFieldName + "." + fieldName;
- queryBuilder.appendAs(elem, dottedFieldName);
+ for (const auto [fieldName, fieldValue] : expectedDbVersion.toBSON()) {
+ const auto dottedFieldName = DatabaseType::kVersionFieldName + "." + fieldName;
+ bsonBuilder.appendAs(fieldValue, dottedFieldName);
}
- return queryBuilder.obj();
+ return bsonBuilder.obj();
}();
- auto updateStatus = Grid::get(opCtx)->catalogClient()->updateConfigDocument(
- opCtx,
- NamespaceString::kConfigDatabasesNamespace,
- updateQuery,
- newDbType.toBSON(),
- false,
- ShardingCatalogClient::kMajorityWriteConcern);
-
- if (!updateStatus.isOK()) {
- LOGV2(5448803,
- "Error committing movePrimary for {db}: {error}",
- "Error committing movePrimary",
- "db"_attr = _dbname,
- "error"_attr = redact(updateStatus.getStatus()));
- return updateStatus.getStatus();
- }
+ const auto update = [&] {
+ const auto newDbVersion = expectedDbVersion.makeUpdated();
- const auto updatedDbType = getDatabaseEntry();
- tassert(6851100,
- "Error committing movePrimary: database version went backwards",
- updatedDbType.getVersion() > currentDatabaseVersion);
- uassert(6851101,
- "Error committing movePrimary: update of `config.databases` failed",
- updatedDbType.getPrimary() != _fromShard);
+ BSONObjBuilder bsonBuilder;
+ bsonBuilder.append(DatabaseType::kPrimaryFieldName, _toShard);
+ bsonBuilder.append(DatabaseType::kVersionFieldName, newDbVersion.toBSON());
+ return BSON("$set" << bsonBuilder.obj());
+ }();
- return Status::OK();
+ return Grid::get(opCtx)
+ ->catalogClient()
+ ->updateConfigDocument(opCtx,
+ NamespaceString::kConfigDatabasesNamespace,
+ query,
+ update,
+ false,
+ ShardingCatalogClient::kMajorityWriteConcern)
+ .getStatus();
}
Status MovePrimarySourceManager::cleanStaleData(OperationContext* opCtx) {
@@ -429,7 +471,6 @@ Status MovePrimarySourceManager::cleanStaleData(OperationContext* opCtx) {
return Status::OK();
}
-
void MovePrimarySourceManager::cleanupOnError(OperationContext* opCtx) {
if (_state == kDone) {
return;
diff --git a/src/mongo/db/s/move_primary_source_manager.h b/src/mongo/db/s/move_primary_source_manager.h
index dd6d3d0d03f..8f802d5da9e 100644
--- a/src/mongo/db/s/move_primary_source_manager.h
+++ b/src/mongo/db/s/move_primary_source_manager.h
@@ -145,10 +145,20 @@ private:
}
/**
- * Updates CSRS metadata in config.databases collection to move the given primary database on
- * its new shard.
+ * Invokes the _configsvrCommitMovePrimary command of the config server to reassign the primary
+ * shard of the database.
*/
- Status _commitOnConfig(OperationContext* opCtx);
+ Status _commitOnConfig(OperationContext* opCtx, const DatabaseVersion& expectedDbVersion);
+
+ /**
+ * Updates the config server's metadata in config.databases collection to reassign the primary
+ * shard of the database.
+ *
+ * This logic is not synchronized with the removeShard command and simultaneous invocations of
+ * movePrimary and removeShard can lead to data loss.
+ */
+ Status _fallbackCommitOnConfig(OperationContext* opCtx,
+ const DatabaseVersion& expectedDbVersion);
// Used to track the current state of the source manager. See the methods above, which have
// comments explaining the various state transitions.
diff --git a/src/mongo/s/request_types/move_primary.idl b/src/mongo/s/request_types/move_primary.idl
index 51d4a095b8a..c63ed43a558 100644
--- a/src/mongo/s/request_types/move_primary.idl
+++ b/src/mongo/s/request_types/move_primary.idl
@@ -33,6 +33,7 @@ global:
imports:
- "mongo/idl/basic_types.idl"
+ - "mongo/s/sharding_types.idl"
structs:
movePrimary:
@@ -62,3 +63,20 @@ structs:
to:
type: string
description: "The shard serving as the destination for un-sharded collections."
+
+commands:
+ _configsvrCommitMovePrimary:
+ command_name: _configsvrCommitMovePrimary
+ description: Reassign a new primary shard for the given database on the config server
+ cpp_name: ConfigsvrCommitMovePrimary
+ namespace: type
+ type: database_name
+ api_version:
+ strict: false
+ fields:
+ expectedDatabaseVersion:
+ type: database_version
+ description: Database version known by the current primary shard
+ to:
+ type: shard_id
+ description: Shard serving as the destination for un-sharded collections