summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorAntonio Fuschetto <antonio.fuschetto@mongodb.com>2022-11-08 17:43:18 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-11-08 18:31:34 +0000
commit50e767b1dbbd5104959989e056d3bd04b6119748 (patch)
treee7dda3367db2bd38d9145b8d23385312f03f7b22 /src/mongo/db
parent7e3381f8a26e6bb01d4715961f6babadff651de0 (diff)
downloadmongo-50e767b1dbbd5104959989e056d3bd04b6119748.tar.gz
SERVER-68541 Serialize the removeShard and commitMovePrimary commands to prevent the loss of moved collections
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/config/configsvr_commit_move_primary_command.cpp102
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager.h9
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp46
-rw-r--r--src/mongo/db/s/move_primary_source_manager.cpp169
-rw-r--r--src/mongo/db/s/move_primary_source_manager.h16
6 files changed, 276 insertions, 67 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 7ded31465e1..eca1b1a2a0c 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -352,6 +352,7 @@ env.Library(
'config/configsvr_clear_jumbo_flag_command.cpp',
'config/configsvr_collmod_command.cpp',
'config/configsvr_commit_chunk_migration_command.cpp',
+ 'config/configsvr_commit_move_primary_command.cpp',
'config/configsvr_commit_reshard_collection_command.cpp',
'config/configsvr_configure_collection_balancing.cpp',
'config/configsvr_control_balancer_command.cpp',
diff --git a/src/mongo/db/s/config/configsvr_commit_move_primary_command.cpp b/src/mongo/db/s/config/configsvr_commit_move_primary_command.cpp
new file mode 100644
index 00000000000..76fc3f7b5c9
--- /dev/null
+++ b/src/mongo/db/s/config/configsvr_commit_move_primary_command.cpp
@@ -0,0 +1,102 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/repl/read_concern_args.h"
+#include "mongo/db/s/config/sharding_catalog_manager.h"
+#include "mongo/s/request_types/move_primary_gen.h"
+
+namespace mongo {
+namespace {
+
+class ConfigsvrCommitMovePrimaryCommand final
+ : public TypedCommand<ConfigsvrCommitMovePrimaryCommand> {
+public:
+ using Request = ConfigsvrCommitMovePrimary;
+
+ class Invocation final : public InvocationBase {
+ public:
+ using InvocationBase::InvocationBase;
+
+ void typedRun(OperationContext* opCtx) {
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << Request::kCommandName << " can only be run on config servers",
+ serverGlobalParams.clusterRole == ClusterRole::ConfigServer);
+
+ // Set the operation context read concern level to local for reads into the config
+ // database.
+ repl::ReadConcernArgs::get(opCtx) =
+ repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
+
+ ShardingCatalogManager::get(opCtx)->commitMovePrimary(
+ opCtx,
+ request().getCommandParameter(),
+ request().getExpectedDatabaseVersion(),
+ request().getTo());
+ }
+
+ private:
+ NamespaceString ns() const override {
+ return NamespaceString(request().getDbName());
+ }
+
+ bool supportsWriteConcern() const override {
+ return true;
+ }
+
+ void doCheckAuthorization(OperationContext* opCtx) const override {
+ uassert(ErrorCodes::Unauthorized,
+ "Unauthorized",
+ AuthorizationSession::get(opCtx->getClient())
+ ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(),
+ ActionType::internal));
+ }
+ };
+
+private:
+ std::string help() const override {
+ return "Reassign a new primary shard for the given database on the config server. This is "
+ "an internal command only invokable on the config server, therefore do not call "
+ "directly.";
+ }
+
+ bool adminOnly() const override {
+ return true;
+ }
+
+ AllowedOnSecondary secondaryAllowed(ServiceContext* context) const override {
+ return AllowedOnSecondary::kNever;
+ }
+} configsvrCommitMovePrimaryCommand;
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h
index 070c1d3d78f..02f4ee3024a 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager.h
+++ b/src/mongo/db/s/config/sharding_catalog_manager.h
@@ -396,6 +396,15 @@ public:
// # TODO SERVER-63983: remove enableSharding paramter when 6.0 becomes lastLTS
bool enableSharding = false);
+ /**
+ * Updates the metadata in config.databases collection with the new primary shard for the given
+ * database. This also advances the database's lastmod.
+ */
+ void commitMovePrimary(OperationContext* opCtx,
+ const StringData& dbName,
+ const DatabaseVersion& expectedDbVersion,
+ const ShardId& toShard);
+
//
// Collection Operations
//
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp
index 934eca19e8e..6e7bf7215c0 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/ops/write_ops.h"
+#include "mongo/db/persistent_task_store.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/s/dist_lock_manager.h"
#include "mongo/db/server_options.h"
@@ -268,4 +269,49 @@ DatabaseType ShardingCatalogManager::createDatabase(OperationContext* opCtx,
return database;
}
+void ShardingCatalogManager::commitMovePrimary(OperationContext* opCtx,
+ const StringData& dbName,
+ const DatabaseVersion& expectedDbVersion,
+ const ShardId& toShard) {
+ // Hold the shard lock until the entire commit finishes to serialize with removeShard.
+ Lock::SharedLock shardLock(opCtx->lockState(), _kShardMembershipLock);
+
+ const auto updateOp = [&] {
+ const auto query = [&] {
+ BSONObjBuilder bsonBuilder;
+ bsonBuilder.append(DatabaseType::kNameFieldName, dbName);
+ // Include the version in the update filter to be resilient to potential network retries
+ // and delayed messages.
+ for (const auto [fieldName, fieldValue] : expectedDbVersion.toBSON()) {
+ const auto dottedFieldName = DatabaseType::kVersionFieldName + "." + fieldName;
+ bsonBuilder.appendAs(fieldValue, dottedFieldName);
+ }
+ return bsonBuilder.obj();
+ }();
+
+ const auto update = [&] {
+ const auto newDbVersion = expectedDbVersion.makeUpdated();
+
+ BSONObjBuilder bsonBuilder;
+ bsonBuilder.append(DatabaseType::kPrimaryFieldName, toShard);
+ bsonBuilder.append(DatabaseType::kVersionFieldName, newDbVersion.toBSON());
+ return BSON("$set" << bsonBuilder.obj());
+ }();
+
+ write_ops::UpdateCommandRequest updateOp(NamespaceString::kConfigDatabasesNamespace);
+ updateOp.setUpdates({[&] {
+ write_ops::UpdateOpEntry entry;
+ entry.setQ(query);
+ entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(update));
+ return entry;
+ }()});
+
+ return updateOp;
+ }();
+
+ DBDirectClient dbClient(opCtx);
+ const auto commandResponse = dbClient.runCommand(updateOp.serialize({}));
+ uassertStatusOK(getStatusFromWriteCommandReply(commandResponse->getCommandReply()));
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/move_primary_source_manager.cpp b/src/mongo/db/s/move_primary_source_manager.cpp
index 6a65d4ab29b..d0ede20b420 100644
--- a/src/mongo/db/s/move_primary_source_manager.cpp
+++ b/src/mongo/db/s/move_primary_source_manager.cpp
@@ -46,6 +46,7 @@
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/grid.h"
+#include "mongo/s/request_types/move_primary_gen.h"
#include "mongo/util/exit.h"
#include "mongo/util/scopeguard.h"
@@ -215,6 +216,8 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) {
invariant(_state == kCriticalSection);
ScopeGuard scopedGuard([&] { cleanupOnError(opCtx); });
+ boost::optional<DatabaseVersion> expectedDbVersion;
+
{
AutoGetDb autoDb(opCtx, getNss().toString(), MODE_X);
@@ -230,13 +233,13 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) {
// Read operations must begin to wait on the critical section just before we send the
// commit operation to the config server
dss->enterCriticalSectionCommitPhase(opCtx, dssLock, _critSecReason);
- }
- auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+ expectedDbVersion = dss->getDbVersion(opCtx, dssLock);
+ }
auto commitStatus = [&]() {
try {
- return _commitOnConfig(opCtx);
+ return _commitOnConfig(opCtx, *expectedDbVersion);
} catch (const DBException& ex) {
return ex.toStatus();
}
@@ -244,8 +247,7 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) {
if (!commitStatus.isOK()) {
// Need to get the latest optime in case the refresh request goes to a secondary --
- // otherwise the read won't wait for the write that _commitOnConfig may have
- // done
+ // otherwise the read won't wait for the write that commit on config server may have done.
LOGV2(22044,
"Error occurred while committing the movePrimary. Performing a majority write "
"against the config server to obtain its latest optime: {error}",
@@ -329,79 +331,119 @@ Status MovePrimarySourceManager::commitOnConfig(OperationContext* opCtx) {
return Status::OK();
}
-Status MovePrimarySourceManager::_commitOnConfig(OperationContext* opCtx) {
- auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
-
- auto getDatabaseEntry = [&]() {
- auto findResponse = uassertStatusOK(
- configShard->exhaustiveFindOnConfig(opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- repl::ReadConcernLevel::kMajorityReadConcern,
- NamespaceString::kConfigDatabasesNamespace,
- BSON(DatabaseType::kNameFieldName << _dbname),
- BSON(DatabaseType::kNameFieldName << -1),
- 1));
-
- const auto databasesVector = std::move(findResponse.docs);
- uassert(ErrorCodes::IncompatibleShardingMetadata,
- str::stream() << "Tried to find max database version for database '" << _dbname
- << "', but found no databases",
- !databasesVector.empty());
+Status MovePrimarySourceManager::_commitOnConfig(OperationContext* opCtx,
+ const DatabaseVersion& expectedDbVersion) {
+ LOGV2_DEBUG(6854100,
+ 3,
+ "Committing movePrimary",
+ "db"_attr = _dbname,
+ "fromShard"_attr = _fromShard,
+ "toShard"_attr = _toShard,
+ "expectedDbVersion"_attr = expectedDbVersion);
+
+ const auto commitStatus = [&] {
+ ConfigsvrCommitMovePrimary commitRequest(_dbname.toString(), expectedDbVersion, _toShard);
+ commitRequest.setDbName(NamespaceString::kAdminDb);
+
+ const auto commitResponse =
+ Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ NamespaceString::kAdminDb.toString(),
+ CommandHelpers::appendMajorityWriteConcern(commitRequest.toBSON({})),
+ Shard::RetryPolicy::kIdempotent);
+
+ const auto status = Shard::CommandResponse::getEffectiveStatus(commitResponse);
+ if (status != ErrorCodes::CommandNotFound) {
+ return status;
+ }
- return DatabaseType::parse(IDLParserErrorContext("DatabaseType"), databasesVector.front());
- };
+ LOGV2(6854101,
+ "_configsvrCommitMovePrimary command not found on config server, so try to update "
+ "the metadata document directly",
+ "db"_attr = _dbname);
- const auto dbType = getDatabaseEntry();
+ // The fallback logic is not synchronized with the removeShard command and simultaneous
+ // invocations of movePrimary and removeShard can lead to data loss.
+ return _fallbackCommitOnConfig(opCtx, expectedDbVersion);
+ }();
- if (dbType.getPrimary() == _toShard) {
- return Status::OK();
+ if (!commitStatus.isOK()) {
+ LOGV2(6854102,
+ "Error committing movePrimary",
+ "db"_attr = _dbname,
+ "error"_attr = redact(commitStatus));
+ return commitStatus;
}
- auto newDbType = dbType;
- newDbType.setPrimary(_toShard);
+ const auto updatedDbType = [&]() {
+ const auto findResponse = uassertStatusOK(
+ Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kMajorityReadConcern,
+ NamespaceString::kConfigDatabasesNamespace,
+ BSON(DatabaseType::kNameFieldName << _dbname),
+ BSON(DatabaseType::kNameFieldName << -1),
+ 1));
+
+ const auto databases = std::move(findResponse.docs);
+ uassert(ErrorCodes::IncompatibleShardingMetadata,
+ "Tried to find version for database {}, but found no databases"_format(_dbname),
+ !databases.empty());
- auto const currentDatabaseVersion = dbType.getVersion();
+ return DatabaseType::parse(IDLParserErrorContext("DatabaseType"), databases.front());
+ }();
+ tassert(6851100,
+ "Error committing movePrimary: database version went backwards",
+ updatedDbType.getVersion() > expectedDbVersion);
+ uassert(6851101,
+ "Error committing movePrimary: update of config.databases failed",
+ updatedDbType.getPrimary() != _fromShard);
- newDbType.setVersion(currentDatabaseVersion.makeUpdated());
+ LOGV2_DEBUG(6854103,
+ 3,
+ "Commited movePrimary",
+ "db"_attr = _dbname,
+ "fromShard"_attr = _fromShard,
+ "toShard"_attr = _toShard,
+ "updatedDbVersion"_attr = updatedDbType.getVersion());
- auto const updateQuery = [&] {
- BSONObjBuilder queryBuilder;
- queryBuilder.append(DatabaseType::kNameFieldName, _dbname);
+ return Status::OK();
+}
+
+Status MovePrimarySourceManager::_fallbackCommitOnConfig(OperationContext* opCtx,
+ const DatabaseVersion& expectedDbVersion) {
+ const auto query = [&] {
+ BSONObjBuilder bsonBuilder;
+ bsonBuilder.append(DatabaseType::kNameFieldName, _dbname);
// Include the version in the update filter to be resilient to potential network retries and
// delayed messages.
- for (auto [fieldName, elem] : currentDatabaseVersion.toBSON()) {
- auto dottedFieldName = DatabaseType::kVersionFieldName + "." + fieldName;
- queryBuilder.appendAs(elem, dottedFieldName);
+ for (const auto [fieldName, fieldValue] : expectedDbVersion.toBSON()) {
+ const auto dottedFieldName = DatabaseType::kVersionFieldName + "." + fieldName;
+ bsonBuilder.appendAs(fieldValue, dottedFieldName);
}
- return queryBuilder.obj();
+ return bsonBuilder.obj();
}();
- auto updateStatus = Grid::get(opCtx)->catalogClient()->updateConfigDocument(
- opCtx,
- NamespaceString::kConfigDatabasesNamespace,
- updateQuery,
- newDbType.toBSON(),
- false,
- ShardingCatalogClient::kMajorityWriteConcern);
+ const auto update = [&] {
+ const auto newDbVersion = expectedDbVersion.makeUpdated();
- if (!updateStatus.isOK()) {
- LOGV2(5448803,
- "Error committing movePrimary for {db}: {error}",
- "Error committing movePrimary",
- "db"_attr = _dbname,
- "error"_attr = redact(updateStatus.getStatus()));
- return updateStatus.getStatus();
- }
-
- const auto updatedDbType = getDatabaseEntry();
- tassert(6851100,
- "Error committing movePrimary: database version went backwards",
- updatedDbType.getVersion() > currentDatabaseVersion);
- uassert(6851101,
- "Error committing movePrimary: update of `config.databases` failed",
- updatedDbType.getPrimary() != _fromShard);
+ BSONObjBuilder bsonBuilder;
+ bsonBuilder.append(DatabaseType::kPrimaryFieldName, _toShard);
+ bsonBuilder.append(DatabaseType::kVersionFieldName, newDbVersion.toBSON());
+ return BSON("$set" << bsonBuilder.obj());
+ }();
- return Status::OK();
+ return Grid::get(opCtx)
+ ->catalogClient()
+ ->updateConfigDocument(opCtx,
+ NamespaceString::kConfigDatabasesNamespace,
+ query,
+ update,
+ false,
+ ShardingCatalogClient::kMajorityWriteConcern)
+ .getStatus();
}
Status MovePrimarySourceManager::cleanStaleData(OperationContext* opCtx) {
@@ -432,7 +474,6 @@ Status MovePrimarySourceManager::cleanStaleData(OperationContext* opCtx) {
return Status::OK();
}
-
void MovePrimarySourceManager::cleanupOnError(OperationContext* opCtx) {
if (_state == kDone) {
return;
diff --git a/src/mongo/db/s/move_primary_source_manager.h b/src/mongo/db/s/move_primary_source_manager.h
index cd678c95b19..45bc26a6b7a 100644
--- a/src/mongo/db/s/move_primary_source_manager.h
+++ b/src/mongo/db/s/move_primary_source_manager.h
@@ -145,10 +145,20 @@ private:
}
/**
- * Updates CSRS metadata in config.databases collection to move the given primary database on
- * its new shard.
+ * Invokes the _configsvrCommitMovePrimary command of the config server to reassign the primary
+ * shard of the database.
*/
- Status _commitOnConfig(OperationContext* opCtx);
+ Status _commitOnConfig(OperationContext* opCtx, const DatabaseVersion& expectedDbVersion);
+
+ /**
+ * Updates the config server's metadata in config.databases collection to reassign the primary
+ * shard of the database.
+ *
+ * This logic is not synchronized with the removeShard command and simultaneous invocations of
+ * movePrimary and removeShard can lead to data loss.
+ */
+ Status _fallbackCommitOnConfig(OperationContext* opCtx,
+ const DatabaseVersion& expectedDbVersion);
// Used to track the current state of the source manager. See the methods above, which have
// comments explaining the various state transitions.