summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJordi Serra Torrens <jordi.serra-torrens@mongodb.com>2021-07-06 13:20:06 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-07-07 08:19:43 +0000
commitba21097a0ccdbc3d7a9caad6eeb18ad172287937 (patch)
treef67f222262eed0536dda2abc20e37b7e31e67907
parent4aa92d432b4fe186e051715ff6e910eb92cc544a (diff)
downloadmongo-ba21097a0ccdbc3d7a9caad6eeb18ad172287937.tar.gz
SERVER-56650 Make createCollection resilient to network partitions
-rw-r--r--jstests/core/views/views_all_commands.js1
-rw-r--r--jstests/noPassthrough/transaction_reaper.js3
-rw-r--r--jstests/replsets/db_reads_while_recovering_all_commands.js1
-rw-r--r--jstests/sharding/configsvr_remove_chunks.js92
-rw-r--r--jstests/sharding/read_write_concern_defaults_application.js1
-rw-r--r--jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js1
-rw-r--r--jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js1
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/config/configsvr_remove_chunks_command.cpp149
-rw-r--r--src/mongo/db/s/create_collection_coordinator.cpp159
-rw-r--r--src/mongo/db/s/create_collection_coordinator.h7
-rw-r--r--src/mongo/db/s/remove_chunks.idl46
-rw-r--r--src/mongo/db/s/shardsvr_create_collection_participant_command.cpp1
-rw-r--r--src/mongo/db/transaction_validation.cpp2
14 files changed, 417 insertions, 49 deletions
diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js
index e7db4bbae74..5f9ef98463a 100644
--- a/jstests/core/views/views_all_commands.js
+++ b/jstests/core/views/views_all_commands.js
@@ -100,6 +100,7 @@ let viewsCommandTests = {
_configsvrRefineCollectionShardKey: {skip: isAnInternalCommand},
_configsvrRenameCollection: {skip: isAnInternalCommand},
_configsvrRenameCollectionMetadata: {skip: isAnInternalCommand},
+ _configsvrRemoveChunks: {skip: isAnInternalCommand},
_configsvrRemoveShard: {skip: isAnInternalCommand},
_configsvrRemoveShardFromZone: {skip: isAnInternalCommand},
_configsvrRemoveTags: {skip: isAnInternalCommand},
diff --git a/jstests/noPassthrough/transaction_reaper.js b/jstests/noPassthrough/transaction_reaper.js
index 43708d85c95..aead3cddb92 100644
--- a/jstests/noPassthrough/transaction_reaper.js
+++ b/jstests/noPassthrough/transaction_reaper.js
@@ -56,6 +56,9 @@ function Sharding(lifetime) {
assert.commandWorked(this.st.c0.getDB("admin").runCommand({refreshLogicalSessionCacheNow: 1}));
assert.commandWorked(
this.st.rs0.getPrimary().getDB("admin").runCommand({refreshLogicalSessionCacheNow: 1}));
+
+ // Remove the session created by the above shardCollection
+ this.st.s.getDB("config").system.sessions.remove({});
}
Sharding.prototype.stop = function() {
diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js
index 48ac6950557..bd3de000ec5 100644
--- a/jstests/replsets/db_reads_while_recovering_all_commands.js
+++ b/jstests/replsets/db_reads_while_recovering_all_commands.js
@@ -48,6 +48,7 @@ const allCommands = {
_configsvrMoveChunk: {skip: isPrimaryOnly},
_configsvrMovePrimary: {skip: isPrimaryOnly},
_configsvrRefineCollectionShardKey: {skip: isPrimaryOnly},
+ _configsvrRemoveChunks: {skip: isPrimaryOnly},
_configsvrRemoveShard: {skip: isPrimaryOnly},
_configsvrRemoveShardFromZone: {skip: isPrimaryOnly},
_configsvrRemoveTags: {skip: isPrimaryOnly},
diff --git a/jstests/sharding/configsvr_remove_chunks.js b/jstests/sharding/configsvr_remove_chunks.js
new file mode 100644
index 00000000000..9270d672c20
--- /dev/null
+++ b/jstests/sharding/configsvr_remove_chunks.js
@@ -0,0 +1,92 @@
+/*
+ * Test the _configsvrRemoveChunks internal command.
+ * @tags: [
+ * requires_fcv_50,
+ * ]
+ */
+
+(function() {
+'use strict';
+
+load("jstests/libs/retryable_writes_util.js");
+load("jstests/sharding/libs/find_chunks_util.js");
+
+function runConfigsvrRemoveChunksWithRetries(conn, uuid, lsid, txnNumber) {
+ var res;
+ assert.soon(() => {
+ res = st.configRS.getPrimary().adminCommand({
+ _configsvrRemoveChunks: 1,
+ collectionUUID: uuid,
+ lsid: lsid,
+ txnNumber: txnNumber,
+ writeConcern: {w: "majority"}
+ });
+
+ if (RetryableWritesUtil.isRetryableCode(res.code) ||
+ RetryableWritesUtil.errmsgContainsRetryableCodeName(res.errmsg) ||
+ (res.writeConcernError &&
+ RetryableWritesUtil.isRetryableCode(res.writeConcernError.code))) {
+ return false; // Retry
+ }
+
+ return true;
+ });
+
+ return res;
+}
+
+function insertLeftoverChunks(configDB, uuid) {
+ const chunkDocsForNs = findChunksUtil.findChunksByNs(configDB, ns).toArray();
+ var chunksToInsert = [];
+ chunkDocsForNs.forEach(originalChunk => {
+ var newChunk = originalChunk;
+ newChunk._id = ObjectId();
+ newChunk.uuid = otherCollectionUUID;
+ chunksToInsert.push(newChunk);
+ });
+ assert.commandWorked(configDB.getCollection("chunks").insertMany(chunksToInsert));
+}
+
+let st = new ShardingTest({mongos: 1, shards: 1});
+
+const configDB = st.s.getDB('config');
+
+const dbName = "test";
+const collName = "foo";
+const ns = dbName + "." + collName;
+
+let lsid = assert.commandWorked(st.s.getDB("admin").runCommand({startSession: 1})).id;
+
+assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+assert.commandWorked(st.s.adminCommand({shardcollection: ns, key: {_id: 1}}));
+
+// Insert some chunks not associated to any collection to simulate leftovers from a failed
+// shardCollection
+const otherCollectionUUID = UUID();
+insertLeftoverChunks(configDB, otherCollectionUUID);
+assert.eq(1, configDB.getCollection("chunks").find({uuid: otherCollectionUUID}).itcount());
+
+// Remove the leftover chunks matching 'otherCollectionUUID'
+assert.commandWorked(runConfigsvrRemoveChunksWithRetries(
+ st.configRS.getPrimary(), otherCollectionUUID, lsid, NumberLong(1)));
+
+assert.eq(0, configDB.getCollection("chunks").find({uuid: otherCollectionUUID}).itcount());
+
+// Insert new leftover chunks
+insertLeftoverChunks(configDB, otherCollectionUUID);
+assert.eq(1, configDB.getCollection("chunks").find({uuid: otherCollectionUUID}).itcount());
+
+// Check that _configsvrRemoveChunks with a txnNumber lesser than the previous one won't remove the
+// chunk documents
+assert.commandFailedWithCode(
+ runConfigsvrRemoveChunksWithRetries(
+ st.configRS.getPrimary(), otherCollectionUUID, lsid, NumberLong(0)),
+ ErrorCodes.TransactionTooOld);
+
+assert.eq(1, configDB.getCollection("chunks").find({uuid: otherCollectionUUID}).itcount());
+
+// Cleanup the leftover chunks before finishing
+configDB.getCollection("chunks").remove({uuid: otherCollectionUUID});
+
+st.stop();
+})();
diff --git a/jstests/sharding/read_write_concern_defaults_application.js b/jstests/sharding/read_write_concern_defaults_application.js
index 709ee968f32..c2cf075513a 100644
--- a/jstests/sharding/read_write_concern_defaults_application.js
+++ b/jstests/sharding/read_write_concern_defaults_application.js
@@ -97,6 +97,7 @@ let testCases = {
_configsvrMoveChunk: {skip: "internal command"},
_configsvrMovePrimary: {skip: "internal command"},
_configsvrRefineCollectionShardKey: {skip: "internal command"},
+ _configsvrRemoveChunks: {skip: "internal command"},
_configsvrRemoveShard: {skip: "internal command"},
_configsvrRemoveShardFromZone: {skip: "internal command"},
_configsvrRemoveTags: {skip: "internal command"},
diff --git a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
index 4fe5768bdc0..5bc62d84c9a 100644
--- a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
+++ b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
@@ -65,6 +65,7 @@ let testCases = {
_configsvrDropDatabase: {skip: "primary only"},
_configsvrMoveChunk: {skip: "primary only"},
_configsvrMovePrimary: {skip: "primary only"},
+ _configsvrRemoveChunks: {skip: "primary only"},
_configsvrRemoveShardFromZone: {skip: "primary only"},
_configsvrRemoveTags: {skip: "primary only"},
_configsvrReshardCollection: {skip: "primary only"},
diff --git a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
index 321d4cede33..e38bfe8d94e 100644
--- a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
+++ b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
@@ -55,6 +55,7 @@ let testCases = {
_configsvrDropDatabase: {skip: "primary only"},
_configsvrMoveChunk: {skip: "primary only"},
_configsvrMovePrimary: {skip: "primary only"},
+ _configsvrRemoveChunks: {skip: "primary only"},
_configsvrRemoveShardFromZone: {skip: "primary only"},
_configsvrRemoveTags: {skip: "primary only"},
_configsvrReshardCollection: {skip: "primary only"},
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 4e435492cb5..145afaa76c3 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -316,6 +316,7 @@ env.Library(
'config/configsvr_move_chunk_command.cpp',
'config/configsvr_move_primary_command.cpp',
'config/configsvr_refine_collection_shard_key_command.cpp',
+ 'config/configsvr_remove_chunks_command.cpp',
'config/configsvr_remove_shard_command.cpp',
'config/configsvr_remove_shard_from_zone_command.cpp',
'config/configsvr_remove_tags_command.cpp',
@@ -342,6 +343,7 @@ env.Library(
'migration_destination_manager_legacy_commands.cpp',
'move_chunk_command.cpp',
'move_primary_coordinator.cpp',
+ 'remove_chunks.idl',
'rename_collection_coordinator.cpp',
'sharded_rename_collection.idl',
'rename_collection_participant_service.cpp',
diff --git a/src/mongo/db/s/config/configsvr_remove_chunks_command.cpp b/src/mongo/db/s/config/configsvr_remove_chunks_command.cpp
new file mode 100644
index 00000000000..e0b40d08a64
--- /dev/null
+++ b/src/mongo/db/s/config/configsvr_remove_chunks_command.cpp
@@ -0,0 +1,149 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/cancelable_operation_context.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/s/remove_chunks_gen.h"
+#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/db/transaction_participant.h"
+#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/grid.h"
+
+namespace mongo {
+namespace {
+
+class ConfigsvrRemoveChunksCommand final : public TypedCommand<ConfigsvrRemoveChunksCommand> {
+public:
+ using Request = ConfigsvrRemoveChunks;
+
+ class Invocation final : public InvocationBase {
+ public:
+ using InvocationBase::InvocationBase;
+
+ void typedRun(OperationContext* opCtx) {
+ const UUID& collectionUUID = request().getCollectionUUID();
+
+ opCtx->setAlwaysInterruptAtStepDownOrUp();
+
+ uassert(ErrorCodes::IllegalOperation,
+ "_configsvrRemoveChunks can only be run on config servers",
+ serverGlobalParams.clusterRole == ClusterRole::ConfigServer);
+ uassert(ErrorCodes::InvalidOptions,
+ "_configsvrRemoveChunks must be called with majority writeConcern",
+ opCtx->getWriteConcern().wMode == WriteConcernOptions::kMajority);
+
+ // Set the operation context read concern level to local for reads into the config
+ // database.
+ repl::ReadConcernArgs::get(opCtx) =
+ repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
+
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ uassert(
+ 5665000, "_configsvrRemoveChunks must be run as a retryable write", txnParticipant);
+
+ {
+ // Use an ACR because we will perform a {multi: true} delete, which is otherwise not
+ // supported on a session.
+ auto newClient = opCtx->getServiceContext()->makeClient("RemoveChunksMetadata");
+ {
+ stdx::lock_guard<Client> lk(*newClient.get());
+ newClient->setSystemOperationKillableByStepdown(lk);
+ }
+
+ AlternativeClientRegion acr(newClient);
+ auto executor =
+ Grid::get(opCtx->getServiceContext())->getExecutorPool()->getFixedExecutor();
+ auto newOpCtxPtr = CancelableOperationContext(
+ cc().makeOperationContext(), opCtx->getCancellationToken(), executor);
+
+ // Write with localWriteConcern because we cannot wait for replication with a
+ // session checked out. The command will wait for majority WC on the epilogue after
+ // the session has been checked in.
+ uassertStatusOK(
+ Grid::get(newOpCtxPtr.get())
+ ->catalogClient()
+ ->removeConfigDocuments(newOpCtxPtr.get(),
+ ChunkType::ConfigNS,
+ BSON(ChunkType::collectionUUID << collectionUUID),
+ ShardingCatalogClient::kLocalWriteConcern));
+ }
+
+ // Since we no write happened on this txnNumber, we need to make a dummy write so that
+ // secondaries can be aware of this txn.
+ DBDirectClient client(opCtx);
+ client.update(NamespaceString::kServerConfigurationNamespace.ns(),
+ BSON("_id"
+ << "RemoveChunksMetadataStats"),
+ BSON("$inc" << BSON("count" << 1)),
+ true /* upsert */,
+ false /* multi */);
+ }
+
+ private:
+ NamespaceString ns() const override {
+ return NamespaceString(request().getDbName(), "");
+ }
+
+ bool supportsWriteConcern() const override {
+ return true;
+ }
+
+ void doCheckAuthorization(OperationContext* opCtx) const override {
+ uassert(ErrorCodes::Unauthorized,
+ "Unauthorized",
+ AuthorizationSession::get(opCtx->getClient())
+ ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(),
+ ActionType::internal));
+ }
+ };
+
+ std::string help() const override {
+ return "Internal command, which is exported by the sharding config server. Do not call "
+ "directly. Removes the chunks for the specified collectionUUID.";
+ }
+
+ bool adminOnly() const override {
+ return true;
+ }
+
+ AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
+ return AllowedOnSecondary::kNever;
+ }
+} configsvrRemoveChunksCmd;
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp
index 3b2b83862bb..02f43ee953e 100644
--- a/src/mongo/db/s/create_collection_coordinator.cpp
+++ b/src/mongo/db/s/create_collection_coordinator.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/create_collection_coordinator.h"
#include "mongo/db/s/recoverable_critical_section_service.h"
+#include "mongo/db/s/remove_chunks_gen.h"
#include "mongo/db/s/shard_key_util.h"
#include "mongo/db/s/sharding_ddl_util.h"
#include "mongo/db/s/sharding_logging.h"
@@ -213,33 +214,30 @@ BSONObj getCollation(OperationContext* opCtx,
return actualCollatorBSON;
}
-void removeChunks(OperationContext* opCtx, const UUID& uuid) {
- BatchWriteExecStats stats;
- BatchedCommandResponse response;
-
- BatchedCommandRequest deleteRequest([&]() {
- write_ops::DeleteCommandRequest deleteOp(ChunkType::ConfigNS);
- deleteOp.setWriteCommandRequestBase([] {
- write_ops::WriteCommandRequestBase writeCommandBase;
- writeCommandBase.setOrdered(false);
- return writeCommandBase;
- }());
- deleteOp.setDeletes(std::vector{write_ops::DeleteOpEntry(
- BSON(ChunkType::collectionUUID << uuid), true /* multi: true */)});
- return deleteOp;
- }());
-
- deleteRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON());
+void cleanupPartialChunksFromPreviousAttempt(OperationContext* opCtx,
+ const UUID& uuid,
+ const OperationSessionInfo& osi) {
+ auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
- cluster::write(opCtx, deleteRequest, &stats, &response, boost::none);
+ // Remove the chunks matching uuid
+ ConfigsvrRemoveChunks configsvrRemoveChunksCmd(uuid);
+ configsvrRemoveChunksCmd.setDbName(NamespaceString::kAdminDb);
- uassertStatusOK(response.toStatus());
+ const auto swRemoveChunksResult = configShard->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ NamespaceString::kAdminDb.toString(),
+ CommandHelpers::appendMajorityWriteConcern(configsvrRemoveChunksCmd.toBSON(osi.toBSON())),
+ Shard::RetryPolicy::kIdempotent);
+
+ uassertStatusOKWithContext(
+ Shard::CommandResponse::getEffectiveStatus(std::move(swRemoveChunksResult)),
+ str::stream() << "Error removing chunks matching uuid " << uuid);
}
-void upsertChunks(OperationContext* opCtx, std::vector<ChunkType>& chunks) {
- BatchWriteExecStats stats;
- BatchedCommandResponse response;
-
+void upsertChunks(OperationContext* opCtx,
+ std::vector<ChunkType>& chunks,
+ const OperationSessionInfo& osi) {
BatchedCommandRequest updateRequest([&]() {
write_ops::UpdateCommandRequest updateOp(ChunkType::ConfigNS);
std::vector<write_ops::UpdateOpEntry> entries;
@@ -260,14 +258,31 @@ void upsertChunks(OperationContext* opCtx, std::vector<ChunkType>& chunks) {
updateRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON());
- cluster::write(opCtx, updateRequest, &stats, &response, boost::none);
+ const BSONObj cmdObj = updateRequest.toBSON().addFields(osi.toBSON());
+
+ BatchedCommandResponse batchResponse;
+ const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+ const auto response =
+ configShard->runCommand(opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ ChunkType::ConfigNS.db().toString(),
+ cmdObj,
+ Shard::kDefaultConfigCommandTimeout,
+ Shard::RetryPolicy::kIdempotent);
- uassertStatusOK(response.toStatus());
+ const auto writeStatus =
+ Shard::CommandResponse::processBatchWriteResponse(response, &batchResponse);
+
+ uassertStatusOK(batchResponse.toStatus());
+ uassertStatusOK(writeStatus);
}
-void updateCatalogEntry(OperationContext* opCtx, const NamespaceString& nss, CollectionType& coll) {
- BatchWriteExecStats stats;
- BatchedCommandResponse response;
+void updateCatalogEntry(OperationContext* opCtx,
+ const NamespaceString& nss,
+ CollectionType& coll,
+ const OperationSessionInfo& osi) {
+ const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard();
+
BatchedCommandRequest updateRequest([&]() {
write_ops::UpdateCommandRequest updateOp(CollectionType::ConfigNS);
updateOp.setUpdates({[&] {
@@ -282,13 +297,26 @@ void updateCatalogEntry(OperationContext* opCtx, const NamespaceString& nss, Col
}());
updateRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON());
- try {
- cluster::write(opCtx, updateRequest, &stats, &response, boost::none);
+ const BSONObj cmdObj = updateRequest.toBSON().addFields(osi.toBSON());
- uassertStatusOK(response.toStatus());
+ try {
+ BatchedCommandResponse batchResponse;
+ const auto response =
+ configShard->runCommand(opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ CollectionType::ConfigNS.db().toString(),
+ cmdObj,
+ Shard::kDefaultConfigCommandTimeout,
+ Shard::RetryPolicy::kIdempotent);
+
+ const auto writeStatus =
+ Shard::CommandResponse::processBatchWriteResponse(response, &batchResponse);
+
+ uassertStatusOK(batchResponse.toStatus());
+ uassertStatusOK(writeStatus);
} catch (const DBException&) {
// If an error happens when contacting the config server, we don't know if the update
- // succeded or not, which might cause the local shard version to differ from the config
+ // succeeded or not, which might cause the local shard version to differ from the config
// server, so we clear the metadata to allow another operation to refresh it.
UninterruptibleLockGuard noInterrupt(opCtx->lockState());
AutoGetCollection autoColl(opCtx, nss, MODE_IX);
@@ -299,21 +327,18 @@ void updateCatalogEntry(OperationContext* opCtx, const NamespaceString& nss, Col
void broadcastDropCollection(OperationContext* opCtx,
const NamespaceString& nss,
- const std::shared_ptr<executor::TaskExecutor>& executor) {
+ const std::shared_ptr<executor::TaskExecutor>& executor,
+ const OperationSessionInfo& osi) {
const auto primaryShardId = ShardingState::get(opCtx)->shardId();
const ShardsvrDropCollectionParticipant dropCollectionParticipant(nss);
auto participants = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx);
- // Remove prymary shard from participants
+ // Remove primary shard from participants
participants.erase(std::remove(participants.begin(), participants.end(), primaryShardId),
participants.end());
- sharding_ddl_util::sendAuthenticatedCommandToShards(
- opCtx,
- nss.db(),
- CommandHelpers::appendMajorityWriteConcern(dropCollectionParticipant.toBSON({})),
- participants,
- executor);
+ sharding_ddl_util::sendDropCollectionParticipantCommandToShards(
+ opCtx, nss, participants, executor, osi);
}
} // namespace
@@ -382,6 +407,13 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);
+ if (!_firstExecution) {
+ // Perform a noop write on the participants in order to advance the txnNumber
+ // for this coordinator's lsid so that requests with older txnNumbers can no
+ // longer execute.
+ _performNoopRetryableWriteOnParticipants(opCtx, **executor);
+ }
+
if (_recoveredFromDisk) {
// If a stedown happened it could've ocurred while waiting for majority when
// writing config.collections. If the refresh happens before this write is
@@ -431,8 +463,13 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
1,
"Removing partial changes from previous run",
"namespace"_attr = nss());
- removeChunks(opCtx, *uuid);
- broadcastDropCollection(opCtx, nss(), **executor);
+
+ _doc = _updateSession(opCtx, _doc);
+ cleanupPartialChunksFromPreviousAttempt(
+ opCtx, *uuid, getCurrentSession(_doc));
+
+ _doc = _updateSession(opCtx, _doc);
+ broadcastDropCollection(opCtx, nss(), **executor, getCurrentSession(_doc));
}
}
@@ -458,7 +495,16 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl(
nss(),
_critSecReason,
ShardingCatalogClient::kMajorityWriteConcern);
- _createCollectionOnNonPrimaryShards(opCtx);
+
+ _doc = _updateSession(opCtx, _doc);
+ try {
+ _createCollectionOnNonPrimaryShards(opCtx, getCurrentSession(_doc));
+ } catch (const ExceptionFor<ErrorCodes::NotARetryableWriteCommand>&) {
+ // Older 5.0 binaries don't support running the
+ // _shardsvrCreateCollectionParticipant command as a retryable write yet. In
+ // that case, retry without attaching session info.
+ _createCollectionOnNonPrimaryShards(opCtx, boost::none);
+ }
_commit(opCtx);
}
@@ -641,7 +687,8 @@ void CreateCollectionCoordinator::_createPolicyAndChunks(OperationContext* opCtx
_numChunks = _initialChunks.chunks.size();
}
-void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards(OperationContext* opCtx) {
+void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards(
+ OperationContext* opCtx, const boost::optional<OperationSessionInfo>& osi) {
LOGV2_DEBUG(5277905,
2,
"Create collection _createCollectionOnNonPrimaryShards",
@@ -670,8 +717,8 @@ void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards(OperationC
requests.emplace_back(
chunkShardId,
- createCollectionParticipantRequest.toBSON(
- BSON("writeConcern" << ShardingCatalogClient::kMajorityWriteConcern.toBSON())));
+ CommandHelpers::appendMajorityWriteConcern(
+ createCollectionParticipantRequest.toBSON(osi ? osi->toBSON() : BSONObj())));
initializedShards.emplace(chunkShardId);
}
@@ -707,7 +754,8 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) {
LOGV2_DEBUG(5277906, 2, "Create collection _commit", "namespace"_attr = nss());
// Upsert Chunks.
- upsertChunks(opCtx, _initialChunks.chunks);
+ _doc = _updateSession(opCtx, _doc);
+ upsertChunks(opCtx, _initialChunks.chunks, getCurrentSession(_doc));
CollectionType coll(nss(),
_initialChunks.collVersion().epoch(),
@@ -731,7 +779,8 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) {
coll.setUnique(*_doc.getUnique());
}
- updateCatalogEntry(opCtx, nss(), coll);
+ _doc = _updateSession(opCtx, _doc);
+ updateCatalogEntry(opCtx, nss(), coll, getCurrentSession(_doc));
}
void CreateCollectionCoordinator::_finalize(OperationContext* opCtx) {
@@ -808,6 +857,20 @@ void CreateCollectionCoordinator::_logEndCreateCollection(OperationContext* opCt
opCtx, "shardCollection.end", nss().ns(), collectionDetail.obj());
}
+void CreateCollectionCoordinator::_performNoopRetryableWriteOnParticipants(
+ OperationContext* opCtx, const std::shared_ptr<executor::TaskExecutor>& executor) {
+ auto shardsAndConfigsvr = [&] {
+ const auto shardRegistry = Grid::get(opCtx)->shardRegistry();
+ auto participants = shardRegistry->getAllShardIds(opCtx);
+ participants.emplace_back(shardRegistry->getConfigShard()->getId());
+ return participants;
+ }();
+
+ _doc = _updateSession(opCtx, _doc);
+ sharding_ddl_util::performNoopRetryableWriteOnShards(
+ opCtx, shardsAndConfigsvr, getCurrentSession(_doc), executor);
+}
+
// Phase change API.
void CreateCollectionCoordinator::_enterPhase(Phase newPhase) {
diff --git a/src/mongo/db/s/create_collection_coordinator.h b/src/mongo/db/s/create_collection_coordinator.h
index bb1f303e3c3..8d57374d640 100644
--- a/src/mongo/db/s/create_collection_coordinator.h
+++ b/src/mongo/db/s/create_collection_coordinator.h
@@ -111,7 +111,8 @@ private:
* If the optimized path can be taken, ensure the collection is already created in all the
* participant shards.
*/
- void _createCollectionOnNonPrimaryShards(OperationContext* opCtx);
+ void _createCollectionOnNonPrimaryShards(OperationContext* opCtx,
+ const boost::optional<OperationSessionInfo>& osi);
/**
* Does the following writes:
@@ -135,6 +136,10 @@ private:
*/
void _logEndCreateCollection(OperationContext* opCtx);
+ void _performNoopRetryableWriteOnParticipants(
+ OperationContext* opCtx, const std::shared_ptr<executor::TaskExecutor>& executor);
+
+
CreateCollectionCoordinatorDocument _doc;
const BSONObj _critSecReason;
diff --git a/src/mongo/db/s/remove_chunks.idl b/src/mongo/db/s/remove_chunks.idl
new file mode 100644
index 00000000000..a997b8c282e
--- /dev/null
+++ b/src/mongo/db/s/remove_chunks.idl
@@ -0,0 +1,46 @@
+# Copyright (C) 2021-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+commands:
+ _configsvrRemoveChunks:
+ command_name: _configsvrRemoveChunks
+ cpp_name: ConfigsvrRemoveChunks
+ description: "internal _configsvrRemoveChunks command for config server"
+ namespace: ignored
+ api_version: ""
+ strict: false
+ fields:
+ collectionUUID:
+ type: uuid
+ description: "The collection uuid of the chunks to be removed"
diff --git a/src/mongo/db/s/shardsvr_create_collection_participant_command.cpp b/src/mongo/db/s/shardsvr_create_collection_participant_command.cpp
index e4708a1ce94..e49c2d78aac 100644
--- a/src/mongo/db/s/shardsvr_create_collection_participant_command.cpp
+++ b/src/mongo/db/s/shardsvr_create_collection_participant_command.cpp
@@ -70,6 +70,7 @@ public:
<< request().toBSON(BSONObj()),
opCtx->getWriteConcern().wMode == WriteConcernOptions::kMajority);
+ opCtx->setAlwaysInterruptAtStepDownOrUp();
MigrationDestinationManager::cloneCollectionIndexesAndOptions(
opCtx,
diff --git a/src/mongo/db/transaction_validation.cpp b/src/mongo/db/transaction_validation.cpp
index 574af8ba82d..f30d64eb205 100644
--- a/src/mongo/db/transaction_validation.cpp
+++ b/src/mongo/db/transaction_validation.cpp
@@ -51,7 +51,9 @@ const StringMap<int> retryableWriteCommands = {{"delete", 1},
{"insert", 1},
{"update", 1},
{"_recvChunkStart", 1},
+ {"_configsvrRemoveChunks", 1},
{"_configsvrRemoveTags", 1},
+ {"_shardsvrCreateCollectionParticipant", 1},
{"_shardsvrDropCollectionParticipant", 1},
{"_shardsvrRenameCollectionParticipant", 1},
{"_shardsvrRenameCollectionParticipantUnblock", 1},