summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/core/views/views_all_commands.js1
-rw-r--r--jstests/replsets/db_reads_while_recovering_all_commands.js1
-rw-r--r--jstests/sharding/move_chunk_allowMigrations.js60
-rw-r--r--jstests/sharding/read_write_concern_defaults_application.js1
-rw-r--r--jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js1
-rw-r--r--jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js1
-rw-r--r--src/mongo/db/s/SConscript4
-rw-r--r--src/mongo/db/s/config/configsvr_set_allow_migrations_command.cpp105
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager.h8
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp52
-rw-r--r--src/mongo/db/s/drop_collection_coordinator.cpp17
-rw-r--r--src/mongo/db/s/drop_collection_coordinator.h1
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp17
-rw-r--r--src/mongo/db/s/resharding_util.cpp58
-rw-r--r--src/mongo/db/s/resharding_util.h15
-rw-r--r--src/mongo/db/s/sharding_ddl_util.cpp21
-rw-r--r--src/mongo/db/s/sharding_ddl_util.h5
-rw-r--r--src/mongo/db/s/sharding_util.cpp103
-rw-r--r--src/mongo/db/s/sharding_util.h58
-rw-r--r--src/mongo/db/s/sharding_util_refresh_test.cpp (renamed from src/mongo/db/s/resharding/resharding_util_refresh_test.cpp)28
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/request_types/set_allow_migrations.idl50
22 files changed, 496 insertions, 112 deletions
diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js
index ba170e033a7..0f43a62ab46 100644
--- a/jstests/core/views/views_all_commands.js
+++ b/jstests/core/views/views_all_commands.js
@@ -97,6 +97,7 @@ let viewsCommandTests = {
_configsvrRemoveShard: {skip: isAnInternalCommand},
_configsvrRemoveShardFromZone: {skip: isAnInternalCommand},
_configsvrReshardCollection: {skip: isAnInternalCommand},
+ _configsvrSetAllowMigrations: {skip: isAnInternalCommand},
_configsvrShardCollection: {skip: isAnInternalCommand},
_configsvrUpdateZoneKeyRange: {skip: isAnInternalCommand},
_flushDatabaseCacheUpdates: {skip: isUnrelated},
diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js
index 073242812cd..a4e76bcbdcc 100644
--- a/jstests/replsets/db_reads_while_recovering_all_commands.js
+++ b/jstests/replsets/db_reads_while_recovering_all_commands.js
@@ -47,6 +47,7 @@ const allCommands = {
_configsvrRemoveShard: {skip: isPrimaryOnly},
_configsvrRemoveShardFromZone: {skip: isPrimaryOnly},
_configsvrReshardCollection: {skip: isPrimaryOnly},
+ _configsvrSetAllowMigrations: {skip: isPrimaryOnly},
_configsvrShardCollection: {skip: isPrimaryOnly},
_configsvrUpdateZoneKeyRange: {skip: isPrimaryOnly},
_flushDatabaseCacheUpdates: {skip: isPrimaryOnly},
diff --git a/jstests/sharding/move_chunk_allowMigrations.js b/jstests/sharding/move_chunk_allowMigrations.js
index e9ef4fe1a40..33b4067a96f 100644
--- a/jstests/sharding/move_chunk_allowMigrations.js
+++ b/jstests/sharding/move_chunk_allowMigrations.js
@@ -1,6 +1,8 @@
/**
- * Tests that a collection with alloMigrations: false in config.collections prohibits committing a
+ * Tests that a collection with allowMigrations: false in config.collections prohibits committing a
* moveChunk and disables the balancer.
+ * Also tests that the _configsvrSetAllowMigrations commands updates the 'allowMigrations' field and
+ * bumps the collection version.
*
* @tags: [
* requires_fcv_47,
@@ -12,6 +14,7 @@
load('jstests/libs/fail_point_util.js');
load('jstests/libs/parallel_shell_helpers.js');
load("jstests/sharding/libs/find_chunks_util.js");
+load("jstests/sharding/libs/shard_versioning_util.js");
const st = new ShardingTest({config: 1, shards: 2});
const configDB = st.s.getDB("config");
@@ -63,7 +66,8 @@ const setUpDb = function setUpDatabaseAndEnableSharding() {
// for collA.
//
// collBSetParams specify the field(s) that will be set on the collB in config.collections.
-const testBalancer = function testAllowMigrationsFalseDisablesBalancer(collBSetParams) {
+const testBalancer = function testAllowMigrationsFalseDisablesBalancer(allowMigrations,
+ collBSetNoBalanceParam) {
setUpDb();
const collAName = "collA";
@@ -97,10 +101,15 @@ const testBalancer = function testAllowMigrationsFalseDisablesBalancer(collBSetP
.count());
}
- jsTestLog(
- `Disabling balancing of ${collB.getFullName()} with parameters ${tojson(collBSetParams)}`);
+ jsTestLog(`Disabling balancing of ${collB.getFullName()} with allowMigrations ${
+ allowMigrations} and parameters ${tojson(collBSetNoBalanceParam)}`);
assert.commandWorked(
- configDB.collections.update({_id: collB.getFullName()}, {$set: collBSetParams}));
+ configDB.collections.update({_id: collB.getFullName()}, {$set: collBSetNoBalanceParam}));
+ assert.commandWorked(st.configRS.getPrimary().adminCommand({
+ _configsvrSetAllowMigrations: collB.getFullName(),
+ allowMigrations: allowMigrations,
+ writeConcern: {w: "majority"}
+ }));
st.startBalancer();
assert.soon(() => {
@@ -133,10 +142,45 @@ const testBalancer = function testAllowMigrationsFalseDisablesBalancer(collBSetP
.count());
};
+const testConfigsvrSetAllowMigrationsCommand = function() {
+ setUpDb();
+
+ const collName = "foo";
+ const ns = dbName + "." + collName;
+
+ assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {x: 1}}));
+
+ ShardVersioningUtil.assertCollectionVersionEquals(st.shard0, ns, Timestamp(1, 0));
+
+ // Use _configsvrSetAllowMigrations to forbid migrations from happening
+ assert.commandWorked(st.configRS.getPrimary().adminCommand(
+ {_configsvrSetAllowMigrations: ns, allowMigrations: false, writeConcern: {w: "majority"}}));
+
+ // Check that allowMigrations has been set to 'false' on the configsvr config.collections.
+ assert.eq(false, configDB.collections.findOne({_id: ns}).allowMigrations);
+
+ // Check that the collection version has been bumped and the shard has refreshed.
+ ShardVersioningUtil.assertCollectionVersionEquals(st.shard0, ns, Timestamp(2, 0));
+
+ // Use _configsvrSetAllowMigrations to allow migrations to happen
+ assert.commandWorked(st.configRS.getPrimary().adminCommand(
+ {_configsvrSetAllowMigrations: ns, allowMigrations: true, writeConcern: {w: "majority"}}));
+
+ // Check that allowMigrations has been unset (that implies migrations are allowed) on the
+ // configsvr config.collections.
+ assert.eq(undefined, configDB.collections.findOne({_id: ns}).allowMigrations);
+
+ // Check that the collection version has been bumped and the shard has refreshed.
+ ShardVersioningUtil.assertCollectionVersionEquals(st.shard0, ns, Timestamp(3, 0));
+};
+
// Test cases that should disable the balancer.
-testBalancer({allowMigrations: false});
-testBalancer({allowMigrations: false, noBalance: false});
-testBalancer({allowMigrations: false, noBalance: true});
+testBalancer(false /* allowMigrations */, {});
+testBalancer(false /* allowMigrations */, {noBalance: false});
+testBalancer(false /* allowMigrations */, {noBalance: true});
+
+// Test the _configsvrSetAllowMigrations internal command
+testConfigsvrSetAllowMigrationsCommand();
st.stop();
})();
diff --git a/jstests/sharding/read_write_concern_defaults_application.js b/jstests/sharding/read_write_concern_defaults_application.js
index 09290ecbea8..c1836e609e9 100644
--- a/jstests/sharding/read_write_concern_defaults_application.js
+++ b/jstests/sharding/read_write_concern_defaults_application.js
@@ -97,6 +97,7 @@ let testCases = {
_configsvrRemoveShardFromZone: {skip: "internal command"},
_configsvrRenameCollection: {skip: "internal command"},
_configsvrReshardCollection: {skip: "internal command"},
+ _configsvrSetAllowMigrations: {skip: "internal command"},
_configsvrShardCollection: {skip: "internal command"},
_configsvrUpdateZoneKeyRange: {skip: "internal command"},
_flushDatabaseCacheUpdates: {skip: "internal command"},
diff --git a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
index 7362ed671ad..dbbb71103da 100644
--- a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
+++ b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
@@ -65,6 +65,7 @@ let testCases = {
_configsvrMovePrimary: {skip: "primary only"},
_configsvrRemoveShardFromZone: {skip: "primary only"},
_configsvrReshardCollection: {skip: "primary only"},
+ _configsvrSetAllowMigrations: {skip: "primary only"},
_configsvrShardCollection: {skip: "primary only"},
_configsvrUpdateZoneKeyRange: {skip: "primary only"},
_flushRoutingTableCacheUpdates: {skip: "does not return user data"},
diff --git a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
index 78c60030e1f..d4901ae4ad5 100644
--- a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
+++ b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
@@ -56,6 +56,7 @@ let testCases = {
_configsvrMovePrimary: {skip: "primary only"},
_configsvrRemoveShardFromZone: {skip: "primary only"},
_configsvrReshardCollection: {skip: "primary only"},
+ _configsvrSetAllowMigrations: {skip: "primary only"},
_configsvrShardCollection: {skip: "primary only"},
_configsvrUpdateZoneKeyRange: {skip: "primary only"},
_flushRoutingTableCacheUpdates: {skip: "does not return user data"},
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 00f970bb141..a855f50c787 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -266,6 +266,7 @@ env.Library(
'drop_database_legacy.cpp',
'type_lockpings.cpp',
'type_locks.cpp',
+ 'sharding_util.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/catalog_raii',
@@ -327,6 +328,7 @@ env.Library(
'config/configsvr_remove_shard_command.cpp',
'config/configsvr_remove_shard_from_zone_command.cpp',
'config/configsvr_reshard_collection_cmd.cpp',
+ 'config/configsvr_set_allow_migrations_command.cpp',
'config/configsvr_shard_collection_command.cpp',
'config/configsvr_split_chunk_command.cpp',
'config/configsvr_update_zone_key_range_command.cpp',
@@ -561,9 +563,9 @@ env.CppUnitTest(
'config/sharding_catalog_manager_split_chunk_test.cpp',
'resharding/resharding_coordinator_observer_test.cpp',
'resharding/resharding_coordinator_test.cpp',
- 'resharding/resharding_util_refresh_test.cpp',
'resharding/resharding_util_test.cpp',
'sharding_ddl_util_test.cpp',
+ 'sharding_util_refresh_test.cpp',
'type_lockpings_test.cpp',
'type_locks_test.cpp',
'vector_clock_config_server_test.cpp',
diff --git a/src/mongo/db/s/config/configsvr_set_allow_migrations_command.cpp b/src/mongo/db/s/config/configsvr_set_allow_migrations_command.cpp
new file mode 100644
index 00000000000..8ab105f72e6
--- /dev/null
+++ b/src/mongo/db/s/config/configsvr_set_allow_migrations_command.cpp
@@ -0,0 +1,105 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/s/config/sharding_catalog_manager.h"
+#include "mongo/s/request_types/set_allow_migrations_gen.h"
+
+namespace mongo {
+namespace {
+
+class ConfigsvrSetAllowMigrationsCommand final
+ : public TypedCommand<ConfigsvrSetAllowMigrationsCommand> {
+public:
+ using Request = ConfigsvrSetAllowMigrations;
+
+ class Invocation final : public InvocationBase {
+ public:
+ using InvocationBase::InvocationBase;
+
+ void typedRun(OperationContext* opCtx) {
+ const NamespaceString& nss = ns();
+
+ uassert(ErrorCodes::IllegalOperation,
+ "_configsvrSetAllowMigrations can only be run on config servers",
+ serverGlobalParams.clusterRole == ClusterRole::ConfigServer);
+ uassert(ErrorCodes::InvalidOptions,
+ "_configsvrSetAllowMigrations must be called with majority writeConcern",
+ opCtx->getWriteConcern().wMode == WriteConcernOptions::kMajority);
+
+ // Set the operation context read concern level to local for reads into the config
+ // database.
+ repl::ReadConcernArgs::get(opCtx) =
+ repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
+
+ const auto allowMigrations = request().getAllowMigrations();
+
+ ShardingCatalogManager::get(opCtx)->setAllowMigrationsAndBumpOneChunk(
+ opCtx, nss, allowMigrations);
+ }
+
+ private:
+ NamespaceString ns() const override {
+ return request().getCommandParameter();
+ }
+
+ bool supportsWriteConcern() const override {
+ return true;
+ }
+
+ void doCheckAuthorization(OperationContext* opCtx) const override {
+ uassert(ErrorCodes::Unauthorized,
+ "Unauthorized",
+ AuthorizationSession::get(opCtx->getClient())
+ ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(),
+ ActionType::internal));
+ }
+ };
+
+ std::string help() const override {
+ return "Internal command, which is exported by the sharding config server. Do not call "
+ "directly. Sets the allowMigrations flag on the specified collection.";
+ }
+
+ bool adminOnly() const override {
+ return true;
+ }
+
+ AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ return AllowedOnSecondary::kNever;
+ }
+} configsvrSetAllowMigrationsCmd;
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h
index 9de828a0b46..7413e595e0d 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager.h
+++ b/src/mongo/db/s/config/sharding_catalog_manager.h
@@ -301,6 +301,14 @@ public:
const NamespaceString& nss,
const BSONObj& minKey);
+ /**
+ * In a transaction, sets the 'allowMigrations' to the requested state and bumps the collection
+ * version.
+ */
+ void setAllowMigrationsAndBumpOneChunk(OperationContext* opCtx,
+ const NamespaceString& nss,
+ bool allowMigrations);
+
//
// Database Operations
//
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
index 811d9e9cc66..f4a1d7c5349 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/s/sharding_logging.h"
+#include "mongo/db/s/sharding_util.h"
#include "mongo/db/server_options.h"
#include "mongo/db/snapshot_window_options_gen.h"
#include "mongo/db/transaction_participant_gen.h"
@@ -1453,4 +1454,55 @@ void ShardingCatalogManager::splitOrMarkJumbo(OperationContext* opCtx,
}
}
+void ShardingCatalogManager::setAllowMigrationsAndBumpOneChunk(OperationContext* opCtx,
+ const NamespaceString& nss,
+ bool allowMigrations) {
+ std::set<ShardId> shardsIds;
+ {
+ // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and
+ // migrations
+ Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock);
+
+ const auto cm = uassertStatusOK(
+ Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx,
+ nss));
+ cm.getAllShardIds(&shardsIds);
+ withTransaction(
+ opCtx, CollectionType::ConfigNS, [&](OperationContext* opCtx, TxnNumber txnNumber) {
+ // Update the 'allowMigrations' field. An unset 'allowMigrations' field implies
+ // 'true'. To ease backwards compatibility we omit 'allowMigrations' instead of
+ // setting it explicitly to 'true'.
+ const auto update = allowMigrations
+ ? BSON("$unset" << BSON(CollectionType::kAllowMigrationsFieldName << ""))
+ : BSON("$set" << BSON(CollectionType::kAllowMigrationsFieldName << false));
+
+ writeToConfigDocumentInTxn(
+ opCtx,
+ CollectionType::ConfigNS,
+ BatchedCommandRequest::buildUpdateOp(
+ CollectionType::ConfigNS,
+ BSON(CollectionType::kNssFieldName << nss.ns()) /* query */,
+ update /* update */,
+ false /* upsert */,
+ false /* multi */),
+ txnNumber);
+
+ // Bump the chunk version for one single chunk
+ invariant(!shardsIds.empty());
+ bumpMajorVersionOneChunkPerShard(opCtx, nss, txnNumber, {*shardsIds.begin()});
+ });
+
+ // From now on migrations are not allowed anymore, so it is not possible that new shards
+ // will own chunks for this collection.
+ }
+
+ // Trigger a refresh on each shard containing chunks for this collection.
+ const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
+ sharding_util::tellShardsToRefreshCollection(
+ opCtx,
+ {std::make_move_iterator(shardsIds.begin()), std::make_move_iterator(shardsIds.end())},
+ nss,
+ executor);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/drop_collection_coordinator.cpp b/src/mongo/db/s/drop_collection_coordinator.cpp
index 49e9ef287b2..a824e2f916d 100644
--- a/src/mongo/db/s/drop_collection_coordinator.cpp
+++ b/src/mongo/db/s/drop_collection_coordinator.cpp
@@ -77,17 +77,6 @@ void DropCollectionCoordinator::_sendDropCollToParticipants(OperationContext* op
}
}
-void DropCollectionCoordinator::_stopMigrations(OperationContext* opCtx) {
- // TODO SERVER-53861 this will not stop current ongoing migrations
- uassertStatusOK(Grid::get(opCtx)->catalogClient()->updateConfigDocument(
- opCtx,
- CollectionType::ConfigNS,
- BSON(CollectionType::kNssFieldName << _nss.ns()),
- BSON("$set" << BSON(CollectionType::kAllowMigrationsFieldName << false)),
- false /* upsert */,
- ShardingCatalogClient::kMajorityWriteConcern));
-}
-
SemiFuture<void> DropCollectionCoordinator::runImpl(
std::shared_ptr<executor::TaskExecutor> executor) {
return ExecutorFuture<void>(executor, Status::OK())
@@ -103,7 +92,11 @@ SemiFuture<void> DropCollectionCoordinator::runImpl(
const auto collDistLock = uassertStatusOK(distLockManager->lock(
opCtx, _nss.ns(), "DropCollection", DistLockManager::kDefaultLockTimeout));
- _stopMigrations(opCtx);
+ try {
+ sharding_ddl_util::stopMigrations(opCtx, _nss);
+ } catch (ExceptionFor<ErrorCodes::NamespaceNotSharded>&) {
+ // The collection is not sharded or doesn't exist.
+ }
const auto catalogClient = Grid::get(opCtx)->catalogClient();
diff --git a/src/mongo/db/s/drop_collection_coordinator.h b/src/mongo/db/s/drop_collection_coordinator.h
index b462fd04e04..8ddb85dfcad 100644
--- a/src/mongo/db/s/drop_collection_coordinator.h
+++ b/src/mongo/db/s/drop_collection_coordinator.h
@@ -43,7 +43,6 @@ public:
private:
SemiFuture<void> runImpl(std::shared_ptr<executor::TaskExecutor> executor) override;
- void _stopMigrations(OperationContext* opCtx);
void _sendDropCollToParticipants(OperationContext* opCtx);
ServiceContext* _serviceContext;
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index a51bddbef22..0f451f28a9d 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/s/resharding_util.h"
+#include "mongo/db/s/sharding_util.h"
#include "mongo/db/vector_clock.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -1287,14 +1288,16 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRe
nssToRefresh = _coordinatorDoc.getNss();
}
- tellShardsToRefresh(opCtx.get(), recipientIds, nssToRefresh, **executor);
+ sharding_util::tellShardsToRefreshCollection(
+ opCtx.get(), recipientIds, nssToRefresh, **executor);
}
void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllDonorsToRefresh(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
auto opCtx = cc().makeOperationContext();
auto donorIds = extractShardIds(_coordinatorDoc.getDonorShards());
- tellShardsToRefresh(opCtx.get(), donorIds, _coordinatorDoc.getNss(), **executor);
+ sharding_util::tellShardsToRefreshCollection(
+ opCtx.get(), donorIds, _coordinatorDoc.getNss(), **executor);
}
void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllParticipantsToRefresh(
@@ -1306,11 +1309,11 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllParticipantsTo
std::set<ShardId> participantShardIds{donorShardIds.begin(), donorShardIds.end()};
participantShardIds.insert(recipientShardIds.begin(), recipientShardIds.end());
- sendCommandToShards(opCtx.get(),
- refreshCmd,
- {participantShardIds.begin(), participantShardIds.end()},
- _coordinatorDoc.getNss(),
- **executor);
+ sharding_util::sendCommandToShards(opCtx.get(),
+ refreshCmd,
+ {participantShardIds.begin(), participantShardIds.end()},
+ _coordinatorDoc.getNss(),
+ **executor);
}
} // namespace mongo
diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp
index 61035669c3f..858c1fe2c90 100644
--- a/src/mongo/db/s/resharding_util.cpp
+++ b/src/mongo/db/s/resharding_util.cpp
@@ -140,64 +140,6 @@ std::set<ShardId> getRecipientShards(OperationContext* opCtx,
return recipients;
}
-void tellShardsToRefresh(OperationContext* opCtx,
- const std::vector<ShardId>& shardIds,
- const NamespaceString& nss,
- const std::shared_ptr<executor::TaskExecutor>& executor) {
- auto cmd = _flushRoutingTableCacheUpdatesWithWriteConcern(nss);
- cmd.setSyncFromConfig(true);
- cmd.setDbName(nss.db());
- auto cmdObj =
- cmd.toBSON(BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority));
-
- sendCommandToShards(opCtx, cmdObj, shardIds, nss, executor);
-}
-
-void sendCommandToShards(OperationContext* opCtx,
- const BSONObj& command,
- const std::vector<ShardId>& shardIds,
- const NamespaceString& nss,
- const std::shared_ptr<executor::TaskExecutor>& executor) {
- std::vector<AsyncRequestsSender::Request> requests;
- for (const auto& shardId : shardIds) {
- requests.emplace_back(shardId, command);
- }
-
- if (!requests.empty()) {
- // The _flushRoutingTableCacheUpdatesWithWriteConcern command will fail with a
- // QueryPlanKilled error response if the config.cache.chunks collection is dropped
- // concurrently. The config.cache.chunks collection is dropped by the shard when it detects
- // the sharded collection's epoch having changed. We use kIdempotentOrCursorInvalidated so
- // the ARS automatically retries in that situation.
- AsyncRequestsSender ars(opCtx,
- executor,
- "admin",
- requests,
- ReadPreferenceSetting(ReadPreference::PrimaryOnly),
- Shard::RetryPolicy::kIdempotentOrCursorInvalidated);
-
- while (!ars.done()) {
- // Retrieve the responses and throw at the first failure.
- auto response = ars.next();
-
- auto generateErrorContext = [&]() -> std::string {
- return str::stream()
- << "Unable to _flushRoutingTableCacheUpdatesWithWriteConcern for namespace "
- << nss.ns() << " on " << response.shardId;
- };
-
- auto shardResponse =
- uassertStatusOKWithContext(std::move(response.swResponse), generateErrorContext());
-
- auto status = getStatusFromCommandResult(shardResponse.data);
- uassertStatusOKWithContext(status, generateErrorContext());
-
- auto wcStatus = getWriteConcernStatusFromCommandResult(shardResponse.data);
- uassertStatusOKWithContext(wcStatus, generateErrorContext());
- }
- }
-}
-
void checkForHolesAndOverlapsInChunks(std::vector<ReshardedChunk>& chunks,
const KeyPattern& keyPattern) {
std::sort(chunks.begin(), chunks.end(), [](const ReshardedChunk& a, const ReshardedChunk& b) {
diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h
index bbc36407ab1..007202ca4ab 100644
--- a/src/mongo/db/s/resharding_util.h
+++ b/src/mongo/db/s/resharding_util.h
@@ -202,21 +202,6 @@ std::set<ShardId> getRecipientShards(OperationContext* opCtx,
const UUID& reshardingUUID);
/**
- * Sends _flushRoutingTableCacheUpdatesWithWriteConcern to a list of shards. Throws if one of the
- * shards fails to refresh.
- */
-void tellShardsToRefresh(OperationContext* opCtx,
- const std::vector<ShardId>& shardIds,
- const NamespaceString& nss,
- const std::shared_ptr<executor::TaskExecutor>& executor);
-
-void sendCommandToShards(OperationContext* opCtx,
- const BSONObj& command,
- const std::vector<ShardId>& shardIds,
- const NamespaceString& nss,
- const std::shared_ptr<executor::TaskExecutor>& executor);
-
-/**
* Asserts that there is not a hole or overlap in the chunks.
*/
void checkForHolesAndOverlapsInChunks(std::vector<ReshardedChunk>& chunks,
diff --git a/src/mongo/db/s/sharding_ddl_util.cpp b/src/mongo/db/s/sharding_ddl_util.cpp
index 715d8cda56d..90a9cd70cad 100644
--- a/src/mongo/db/s/sharding_ddl_util.cpp
+++ b/src/mongo/db/s/sharding_ddl_util.cpp
@@ -41,6 +41,7 @@
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_tags.h"
#include "mongo/s/grid.h"
+#include "mongo/s/request_types/set_allow_migrations_gen.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
@@ -258,5 +259,25 @@ void releaseCriticalSection(OperationContext* opCtx, const NamespaceString& nss)
csr->clearFilteringMetadata(opCtx);
}
+void stopMigrations(OperationContext* opCtx, const NamespaceString& nss) {
+ const ConfigsvrSetAllowMigrations configsvrSetAllowMigrationsCmd(nss,
+ false /* allowMigrations */);
+ const auto swSetAllowMigrationsResult =
+ Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ NamespaceString::kAdminDb.toString(),
+ CommandHelpers::appendMajorityWriteConcern(configsvrSetAllowMigrationsCmd.toBSON({})),
+ Shard::RetryPolicy::kIdempotent // Although ConfigsvrSetAllowMigrations is not really
+ // idempotent (because it will cause the collection
+ // version to be bumped), it is safe to be retried.
+ );
+
+ uassertStatusOKWithContext(
+ Shard::CommandResponse::getEffectiveStatus(std::move(swSetAllowMigrationsResult)),
+ str::stream() << "Error setting allowMigrations to false for collection "
+ << nss.toString());
+}
+
} // namespace sharding_ddl_util
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_ddl_util.h b/src/mongo/db/s/sharding_ddl_util.h
index 0fb804781b6..9f0ae72e67c 100644
--- a/src/mongo/db/s/sharding_ddl_util.h
+++ b/src/mongo/db/s/sharding_ddl_util.h
@@ -91,5 +91,10 @@ void acquireCriticalSection(OperationContext* opCtx, const NamespaceString& nss)
*/
void releaseCriticalSection(OperationContext* opCtx, const NamespaceString& nss);
+/**
+ * Stops ongoing migrations and prevents future ones to start for the given nss.
+ */
+void stopMigrations(OperationContext* opCtx, const NamespaceString& nss);
+
} // namespace sharding_ddl_util
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_util.cpp b/src/mongo/db/s/sharding_util.cpp
new file mode 100644
index 00000000000..7f0d5080bdb
--- /dev/null
+++ b/src/mongo/db/s/sharding_util.cpp
@@ -0,0 +1,103 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/sharding_util.h"
+
+#include "mongo/db/commands.h"
+#include "mongo/logv2/log.h"
+#include "mongo/s/async_requests_sender.h"
+#include "mongo/s/request_types/flush_routing_table_cache_updates_gen.h"
+
+namespace mongo {
+
+namespace sharding_util {
+
+void tellShardsToRefreshCollection(OperationContext* opCtx,
+ const std::vector<ShardId>& shardIds,
+ const NamespaceString& nss,
+ const std::shared_ptr<executor::TaskExecutor>& executor) {
+ auto cmd = _flushRoutingTableCacheUpdatesWithWriteConcern(nss);
+ cmd.setSyncFromConfig(true);
+ cmd.setDbName(nss.db());
+ auto cmdObj = CommandHelpers::appendMajorityWriteConcern(cmd.toBSON({}));
+
+ sendCommandToShards(opCtx, cmdObj, shardIds, nss, executor);
+}
+
+void sendCommandToShards(OperationContext* opCtx,
+ const BSONObj& command,
+ const std::vector<ShardId>& shardIds,
+ const NamespaceString& nss,
+ const std::shared_ptr<executor::TaskExecutor>& executor) {
+ std::vector<AsyncRequestsSender::Request> requests;
+ for (const auto& shardId : shardIds) {
+ requests.emplace_back(shardId, command);
+ }
+
+ if (!requests.empty()) {
+ // The _flushRoutingTableCacheUpdatesWithWriteConcern command will fail with a
+ // QueryPlanKilled error response if the config.cache.chunks collection is dropped
+ // concurrently. The config.cache.chunks collection is dropped by the shard when it detects
+ // the sharded collection's epoch having changed. We use kIdempotentOrCursorInvalidated so
+ // the ARS automatically retries in that situation.
+ AsyncRequestsSender ars(opCtx,
+ executor,
+ "admin",
+ requests,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ Shard::RetryPolicy::kIdempotentOrCursorInvalidated);
+
+ while (!ars.done()) {
+ // Retrieve the responses and throw at the first failure.
+ auto response = ars.next();
+
+ auto generateErrorContext = [&]() -> std::string {
+ return str::stream()
+ << "Unable to _flushRoutingTableCacheUpdatesWithWriteConcern for namespace "
+ << nss.ns() << " on " << response.shardId;
+ };
+
+ auto shardResponse =
+ uassertStatusOKWithContext(std::move(response.swResponse), generateErrorContext());
+
+ auto status = getStatusFromCommandResult(shardResponse.data);
+ uassertStatusOKWithContext(status, generateErrorContext());
+
+ auto wcStatus = getWriteConcernStatusFromCommandResult(shardResponse.data);
+ uassertStatusOKWithContext(wcStatus, generateErrorContext());
+ }
+ }
+}
+
+} // namespace sharding_util
+} // namespace mongo
diff --git a/src/mongo/db/s/sharding_util.h b/src/mongo/db/s/sharding_util.h
new file mode 100644
index 00000000000..85946fd5cea
--- /dev/null
+++ b/src/mongo/db/s/sharding_util.h
@@ -0,0 +1,58 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/s/shard_id.h"
+
+namespace mongo {
+namespace sharding_util {
+
+/**
+ * Sends _flushRoutingTableCacheUpdatesWithWriteConcern to a list of shards. Throws if one of the
+ * shards fails to refresh.
+ */
+void tellShardsToRefreshCollection(OperationContext* opCtx,
+ const std::vector<ShardId>& shardIds,
+ const NamespaceString& nss,
+ const std::shared_ptr<executor::TaskExecutor>& executor);
+
+void sendCommandToShards(OperationContext* opCtx,
+ const BSONObj& command,
+ const std::vector<ShardId>& shardIds,
+ const NamespaceString& nss,
+ const std::shared_ptr<executor::TaskExecutor>& executor);
+
+} // namespace sharding_util
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_util_refresh_test.cpp b/src/mongo/db/s/sharding_util_refresh_test.cpp
index f13a484f857..90850ecc206 100644
--- a/src/mongo/db/s/resharding/resharding_util_refresh_test.cpp
+++ b/src/mongo/db/s/sharding_util_refresh_test.cpp
@@ -33,7 +33,7 @@
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/s/config/config_server_test_fixture.h"
-#include "mongo/db/s/resharding_util.h"
+#include "mongo/db/s/sharding_util.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/logv2/log.h"
#include "mongo/s/catalog/type_shard.h"
@@ -55,7 +55,7 @@ const BSONObj kMockResWithWriteConcernError =
const Status kRetryableError{ErrorCodes::HostUnreachable, "RetryableError for test"};
-class ReshardingRefresherTest : public ConfigServerTestFixture {
+class ShardingRefresherTest : public ConfigServerTestFixture {
protected:
void setUp() {
ConfigServerTestFixture::setUp();
@@ -74,10 +74,12 @@ protected:
}
};
-TEST_F(ReshardingRefresherTest, refresherTwoShardsSucceed) {
+TEST_F(ShardingRefresherTest, refresherTwoShardsSucceed) {
auto opCtx = operationContext();
auto nss = NamespaceString("mydb", "mycoll");
- auto future = launchAsync([&] { tellShardsToRefresh(opCtx, kShardIdList, nss, executor()); });
+ auto future = launchAsync([&] {
+ sharding_util::tellShardsToRefreshCollection(opCtx, kShardIdList, nss, executor());
+ });
onCommand([&](const executor::RemoteCommandRequest& request) { return BSON("ok" << 1); });
onCommand([&](const executor::RemoteCommandRequest& request) { return BSON("ok" << 1); });
@@ -85,20 +87,24 @@ TEST_F(ReshardingRefresherTest, refresherTwoShardsSucceed) {
future.default_timed_get();
}
-TEST_F(ReshardingRefresherTest, refresherTwoShardsFirstErrors) {
+TEST_F(ShardingRefresherTest, refresherTwoShardsFirstErrors) {
auto opCtx = operationContext();
auto nss = NamespaceString("mydb", "mycoll");
- auto future = launchAsync([&] { tellShardsToRefresh(opCtx, kShardIdList, nss, executor()); });
+ auto future = launchAsync([&] {
+ sharding_util::tellShardsToRefreshCollection(opCtx, kShardIdList, nss, executor());
+ });
onCommand([&](const executor::RemoteCommandRequest& request) { return kMockErrorRes; });
ASSERT_THROWS_CODE(future.default_timed_get(), DBException, kMockStatus.code());
}
-TEST_F(ReshardingRefresherTest, refresherTwoShardsSecondErrors) {
+TEST_F(ShardingRefresherTest, refresherTwoShardsSecondErrors) {
auto opCtx = operationContext();
auto nss = NamespaceString("mydb", "mycoll");
- auto future = launchAsync([&] { tellShardsToRefresh(opCtx, kShardIdList, nss, executor()); });
+ auto future = launchAsync([&] {
+ sharding_util::tellShardsToRefreshCollection(opCtx, kShardIdList, nss, executor());
+ });
onCommand([&](const executor::RemoteCommandRequest& request) { return BSON("ok" << 1); });
onCommand([&](const executor::RemoteCommandRequest& request) { return kMockErrorRes; });
@@ -106,10 +112,12 @@ TEST_F(ReshardingRefresherTest, refresherTwoShardsSecondErrors) {
ASSERT_THROWS_CODE(future.default_timed_get(), DBException, kMockStatus.code());
}
-TEST_F(ReshardingRefresherTest, refresherTwoShardsWriteConcernFailed) {
+TEST_F(ShardingRefresherTest, refresherTwoShardsWriteConcernFailed) {
auto opCtx = operationContext();
auto nss = NamespaceString("mydb", "mycoll");
- auto future = launchAsync([&] { tellShardsToRefresh(opCtx, kShardIdList, nss, executor()); });
+ auto future = launchAsync([&] {
+ sharding_util::tellShardsToRefreshCollection(opCtx, kShardIdList, nss, executor());
+ });
onCommand([&](const executor::RemoteCommandRequest& request) {
return kMockResWithWriteConcernError;
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index c1b13f8a89e..f78c1273d24 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -168,6 +168,7 @@ env.Library(
'request_types/refine_collection_shard_key.idl',
'request_types/remove_shard_from_zone_request_type.cpp',
'request_types/reshard_collection.idl',
+ 'request_types/set_allow_migrations.idl',
'request_types/set_shard_version_request.cpp',
'request_types/shard_collection.idl',
'request_types/sharded_ddl_commands.idl',
diff --git a/src/mongo/s/request_types/set_allow_migrations.idl b/src/mongo/s/request_types/set_allow_migrations.idl
new file mode 100644
index 00000000000..5c1829218de
--- /dev/null
+++ b/src/mongo/s/request_types/set_allow_migrations.idl
@@ -0,0 +1,50 @@
+# Copyright (C) 2021-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+# configsvrSetAllowMigrations IDL File
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+commands:
+ _configsvrSetAllowMigrations:
+ command_name: _configsvrSetAllowMigrations
+ cpp_name: ConfigsvrSetAllowMigrations
+ description: "internal setAllowMigrations command for config server"
+ namespace: type
+ api_version: ""
+ type: namespacestring
+ strict: false
+ fields:
+ allowMigrations:
+ type: bool
+ description: "The new allowMigrations flag state to be set."
+ optional: false