diff options
author | Jordi Serra Torrens <jordi.serra-torrens@mongodb.com> | 2021-07-06 13:20:06 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-07-07 08:19:43 +0000 |
commit | ba21097a0ccdbc3d7a9caad6eeb18ad172287937 (patch) | |
tree | f67f222262eed0536dda2abc20e37b7e31e67907 | |
parent | 4aa92d432b4fe186e051715ff6e910eb92cc544a (diff) | |
download | mongo-ba21097a0ccdbc3d7a9caad6eeb18ad172287937.tar.gz |
SERVER-56650 Make createCollection resilient to network partitions
14 files changed, 417 insertions, 49 deletions
diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js index e7db4bbae74..5f9ef98463a 100644 --- a/jstests/core/views/views_all_commands.js +++ b/jstests/core/views/views_all_commands.js @@ -100,6 +100,7 @@ let viewsCommandTests = { _configsvrRefineCollectionShardKey: {skip: isAnInternalCommand}, _configsvrRenameCollection: {skip: isAnInternalCommand}, _configsvrRenameCollectionMetadata: {skip: isAnInternalCommand}, + _configsvrRemoveChunks: {skip: isAnInternalCommand}, _configsvrRemoveShard: {skip: isAnInternalCommand}, _configsvrRemoveShardFromZone: {skip: isAnInternalCommand}, _configsvrRemoveTags: {skip: isAnInternalCommand}, diff --git a/jstests/noPassthrough/transaction_reaper.js b/jstests/noPassthrough/transaction_reaper.js index 43708d85c95..aead3cddb92 100644 --- a/jstests/noPassthrough/transaction_reaper.js +++ b/jstests/noPassthrough/transaction_reaper.js @@ -56,6 +56,9 @@ function Sharding(lifetime) { assert.commandWorked(this.st.c0.getDB("admin").runCommand({refreshLogicalSessionCacheNow: 1})); assert.commandWorked( this.st.rs0.getPrimary().getDB("admin").runCommand({refreshLogicalSessionCacheNow: 1})); + + // Remove the session created by the above shardCollection + this.st.s.getDB("config").system.sessions.remove({}); } Sharding.prototype.stop = function() { diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js index 48ac6950557..bd3de000ec5 100644 --- a/jstests/replsets/db_reads_while_recovering_all_commands.js +++ b/jstests/replsets/db_reads_while_recovering_all_commands.js @@ -48,6 +48,7 @@ const allCommands = { _configsvrMoveChunk: {skip: isPrimaryOnly}, _configsvrMovePrimary: {skip: isPrimaryOnly}, _configsvrRefineCollectionShardKey: {skip: isPrimaryOnly}, + _configsvrRemoveChunks: {skip: isPrimaryOnly}, _configsvrRemoveShard: {skip: isPrimaryOnly}, _configsvrRemoveShardFromZone: {skip: isPrimaryOnly}, _configsvrRemoveTags: {skip: isPrimaryOnly}, diff --git a/jstests/sharding/configsvr_remove_chunks.js b/jstests/sharding/configsvr_remove_chunks.js new file mode 100644 index 00000000000..9270d672c20 --- /dev/null +++ b/jstests/sharding/configsvr_remove_chunks.js @@ -0,0 +1,92 @@ +/* + * Test the _configsvrRemoveChunks internal command. + * @tags: [ + * requires_fcv_50, + * ] + */ + +(function() { +'use strict'; + +load("jstests/libs/retryable_writes_util.js"); +load("jstests/sharding/libs/find_chunks_util.js"); + +function runConfigsvrRemoveChunksWithRetries(conn, uuid, lsid, txnNumber) { + var res; + assert.soon(() => { + res = st.configRS.getPrimary().adminCommand({ + _configsvrRemoveChunks: 1, + collectionUUID: uuid, + lsid: lsid, + txnNumber: txnNumber, + writeConcern: {w: "majority"} + }); + + if (RetryableWritesUtil.isRetryableCode(res.code) || + RetryableWritesUtil.errmsgContainsRetryableCodeName(res.errmsg) || + (res.writeConcernError && + RetryableWritesUtil.isRetryableCode(res.writeConcernError.code))) { + return false; // Retry + } + + return true; + }); + + return res; +} + +function insertLeftoverChunks(configDB, uuid) { + const chunkDocsForNs = findChunksUtil.findChunksByNs(configDB, ns).toArray(); + var chunksToInsert = []; + chunkDocsForNs.forEach(originalChunk => { + var newChunk = originalChunk; + newChunk._id = ObjectId(); + newChunk.uuid = otherCollectionUUID; + chunksToInsert.push(newChunk); + }); + assert.commandWorked(configDB.getCollection("chunks").insertMany(chunksToInsert)); +} + +let st = new ShardingTest({mongos: 1, shards: 1}); + +const configDB = st.s.getDB('config'); + +const dbName = "test"; +const collName = "foo"; +const ns = dbName + "." + collName; + +let lsid = assert.commandWorked(st.s.getDB("admin").runCommand({startSession: 1})).id; + +assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); +assert.commandWorked(st.s.adminCommand({shardcollection: ns, key: {_id: 1}})); + +// Insert some chunks not associated to any collection to simulate leftovers from a failed +// shardCollection +const otherCollectionUUID = UUID(); +insertLeftoverChunks(configDB, otherCollectionUUID); +assert.eq(1, configDB.getCollection("chunks").find({uuid: otherCollectionUUID}).itcount()); + +// Remove the leftover chunks matching 'otherCollectionUUID' +assert.commandWorked(runConfigsvrRemoveChunksWithRetries( + st.configRS.getPrimary(), otherCollectionUUID, lsid, NumberLong(1))); + +assert.eq(0, configDB.getCollection("chunks").find({uuid: otherCollectionUUID}).itcount()); + +// Insert new leftover chunks +insertLeftoverChunks(configDB, otherCollectionUUID); +assert.eq(1, configDB.getCollection("chunks").find({uuid: otherCollectionUUID}).itcount()); + +// Check that _configsvrRemoveChunks with a txnNumber lesser than the previous one won't remove the +// chunk documents +assert.commandFailedWithCode( + runConfigsvrRemoveChunksWithRetries( + st.configRS.getPrimary(), otherCollectionUUID, lsid, NumberLong(0)), + ErrorCodes.TransactionTooOld); + +assert.eq(1, configDB.getCollection("chunks").find({uuid: otherCollectionUUID}).itcount()); + +// Cleanup the leftover chunks before finishing +configDB.getCollection("chunks").remove({uuid: otherCollectionUUID}); + +st.stop(); +})(); diff --git a/jstests/sharding/read_write_concern_defaults_application.js b/jstests/sharding/read_write_concern_defaults_application.js index 709ee968f32..c2cf075513a 100644 --- a/jstests/sharding/read_write_concern_defaults_application.js +++ b/jstests/sharding/read_write_concern_defaults_application.js @@ -97,6 +97,7 @@ let testCases = { _configsvrMoveChunk: {skip: "internal command"}, _configsvrMovePrimary: {skip: "internal command"}, _configsvrRefineCollectionShardKey: {skip: "internal command"}, + _configsvrRemoveChunks: {skip: "internal command"}, _configsvrRemoveShard: {skip: "internal command"}, _configsvrRemoveShardFromZone: {skip: "internal command"}, _configsvrRemoveTags: {skip: "internal command"}, diff --git a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js index 4fe5768bdc0..5bc62d84c9a 100644 --- a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js +++ b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js @@ -65,6 +65,7 @@ let testCases = { _configsvrDropDatabase: {skip: "primary only"}, _configsvrMoveChunk: {skip: "primary only"}, _configsvrMovePrimary: {skip: "primary only"}, + _configsvrRemoveChunks: {skip: "primary only"}, _configsvrRemoveShardFromZone: {skip: "primary only"}, _configsvrRemoveTags: {skip: "primary only"}, _configsvrReshardCollection: {skip: "primary only"}, diff --git a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js index 321d4cede33..e38bfe8d94e 100644 --- a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js +++ b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js @@ -55,6 +55,7 @@ let testCases = { _configsvrDropDatabase: {skip: "primary only"}, _configsvrMoveChunk: {skip: "primary only"}, _configsvrMovePrimary: {skip: "primary only"}, + _configsvrRemoveChunks: {skip: "primary only"}, _configsvrRemoveShardFromZone: {skip: "primary only"}, _configsvrRemoveTags: {skip: "primary only"}, _configsvrReshardCollection: {skip: "primary only"}, diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 4e435492cb5..145afaa76c3 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -316,6 +316,7 @@ env.Library( 'config/configsvr_move_chunk_command.cpp', 'config/configsvr_move_primary_command.cpp', 'config/configsvr_refine_collection_shard_key_command.cpp', + 'config/configsvr_remove_chunks_command.cpp', 'config/configsvr_remove_shard_command.cpp', 'config/configsvr_remove_shard_from_zone_command.cpp', 'config/configsvr_remove_tags_command.cpp', @@ -342,6 +343,7 @@ env.Library( 'migration_destination_manager_legacy_commands.cpp', 'move_chunk_command.cpp', 'move_primary_coordinator.cpp', + 'remove_chunks.idl', 'rename_collection_coordinator.cpp', 'sharded_rename_collection.idl', 'rename_collection_participant_service.cpp', diff --git a/src/mongo/db/s/config/configsvr_remove_chunks_command.cpp b/src/mongo/db/s/config/configsvr_remove_chunks_command.cpp new file mode 100644 index 00000000000..e0b40d08a64 --- /dev/null +++ b/src/mongo/db/s/config/configsvr_remove_chunks_command.cpp @@ -0,0 +1,149 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/cancelable_operation_context.h" +#include "mongo/db/commands.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/s/remove_chunks_gen.h" +#include "mongo/db/session_catalog_mongod.h" +#include "mongo/db/transaction_participant.h" +#include "mongo/s/catalog/sharding_catalog_client.h" +#include "mongo/s/catalog/type_chunk.h" +#include "mongo/s/grid.h" + +namespace mongo { +namespace { + +class ConfigsvrRemoveChunksCommand final : public TypedCommand<ConfigsvrRemoveChunksCommand> { +public: + using Request = ConfigsvrRemoveChunks; + + class Invocation final : public InvocationBase { + public: + using InvocationBase::InvocationBase; + + void typedRun(OperationContext* opCtx) { + const UUID& collectionUUID = request().getCollectionUUID(); + + opCtx->setAlwaysInterruptAtStepDownOrUp(); + + uassert(ErrorCodes::IllegalOperation, + "_configsvrRemoveChunks can only be run on config servers", + serverGlobalParams.clusterRole == ClusterRole::ConfigServer); + uassert(ErrorCodes::InvalidOptions, + "_configsvrRemoveChunks must be called with majority writeConcern", + opCtx->getWriteConcern().wMode == WriteConcernOptions::kMajority); + + // Set the operation context read concern level to local for reads into the config + // database. + repl::ReadConcernArgs::get(opCtx) = + repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); + + auto txnParticipant = TransactionParticipant::get(opCtx); + uassert( + 5665000, "_configsvrRemoveChunks must be run as a retryable write", txnParticipant); + + { + // Use an ACR because we will perform a {multi: true} delete, which is otherwise not + // supported on a session. + auto newClient = opCtx->getServiceContext()->makeClient("RemoveChunksMetadata"); + { + stdx::lock_guard<Client> lk(*newClient.get()); + newClient->setSystemOperationKillableByStepdown(lk); + } + + AlternativeClientRegion acr(newClient); + auto executor = + Grid::get(opCtx->getServiceContext())->getExecutorPool()->getFixedExecutor(); + auto newOpCtxPtr = CancelableOperationContext( + cc().makeOperationContext(), opCtx->getCancellationToken(), executor); + + // Write with localWriteConcern because we cannot wait for replication with a + // session checked out. The command will wait for majority WC on the epilogue after + // the session has been checked in. + uassertStatusOK( + Grid::get(newOpCtxPtr.get()) + ->catalogClient() + ->removeConfigDocuments(newOpCtxPtr.get(), + ChunkType::ConfigNS, + BSON(ChunkType::collectionUUID << collectionUUID), + ShardingCatalogClient::kLocalWriteConcern)); + } + + // Since we no write happened on this txnNumber, we need to make a dummy write so that + // secondaries can be aware of this txn. + DBDirectClient client(opCtx); + client.update(NamespaceString::kServerConfigurationNamespace.ns(), + BSON("_id" + << "RemoveChunksMetadataStats"), + BSON("$inc" << BSON("count" << 1)), + true /* upsert */, + false /* multi */); + } + + private: + NamespaceString ns() const override { + return NamespaceString(request().getDbName(), ""); + } + + bool supportsWriteConcern() const override { + return true; + } + + void doCheckAuthorization(OperationContext* opCtx) const override { + uassert(ErrorCodes::Unauthorized, + "Unauthorized", + AuthorizationSession::get(opCtx->getClient()) + ->isAuthorizedForActionsOnResource(ResourcePattern::forClusterResource(), + ActionType::internal)); + } + }; + + std::string help() const override { + return "Internal command, which is exported by the sharding config server. Do not call " + "directly. Removes the chunks for the specified collectionUUID."; + } + + bool adminOnly() const override { + return true; + } + + AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { + return AllowedOnSecondary::kNever; + } +} configsvrRemoveChunksCmd; + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/create_collection_coordinator.cpp b/src/mongo/db/s/create_collection_coordinator.cpp index 3b2b83862bb..02f43ee953e 100644 --- a/src/mongo/db/s/create_collection_coordinator.cpp +++ b/src/mongo/db/s/create_collection_coordinator.cpp @@ -40,6 +40,7 @@ #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/create_collection_coordinator.h" #include "mongo/db/s/recoverable_critical_section_service.h" +#include "mongo/db/s/remove_chunks_gen.h" #include "mongo/db/s/shard_key_util.h" #include "mongo/db/s/sharding_ddl_util.h" #include "mongo/db/s/sharding_logging.h" @@ -213,33 +214,30 @@ BSONObj getCollation(OperationContext* opCtx, return actualCollatorBSON; } -void removeChunks(OperationContext* opCtx, const UUID& uuid) { - BatchWriteExecStats stats; - BatchedCommandResponse response; - - BatchedCommandRequest deleteRequest([&]() { - write_ops::DeleteCommandRequest deleteOp(ChunkType::ConfigNS); - deleteOp.setWriteCommandRequestBase([] { - write_ops::WriteCommandRequestBase writeCommandBase; - writeCommandBase.setOrdered(false); - return writeCommandBase; - }()); - deleteOp.setDeletes(std::vector{write_ops::DeleteOpEntry( - BSON(ChunkType::collectionUUID << uuid), true /* multi: true */)}); - return deleteOp; - }()); - - deleteRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); +void cleanupPartialChunksFromPreviousAttempt(OperationContext* opCtx, + const UUID& uuid, + const OperationSessionInfo& osi) { + auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); - cluster::write(opCtx, deleteRequest, &stats, &response, boost::none); + // Remove the chunks matching uuid + ConfigsvrRemoveChunks configsvrRemoveChunksCmd(uuid); + configsvrRemoveChunksCmd.setDbName(NamespaceString::kAdminDb); - uassertStatusOK(response.toStatus()); + const auto swRemoveChunksResult = configShard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + NamespaceString::kAdminDb.toString(), + CommandHelpers::appendMajorityWriteConcern(configsvrRemoveChunksCmd.toBSON(osi.toBSON())), + Shard::RetryPolicy::kIdempotent); + + uassertStatusOKWithContext( + Shard::CommandResponse::getEffectiveStatus(std::move(swRemoveChunksResult)), + str::stream() << "Error removing chunks matching uuid " << uuid); } -void upsertChunks(OperationContext* opCtx, std::vector<ChunkType>& chunks) { - BatchWriteExecStats stats; - BatchedCommandResponse response; - +void upsertChunks(OperationContext* opCtx, + std::vector<ChunkType>& chunks, + const OperationSessionInfo& osi) { BatchedCommandRequest updateRequest([&]() { write_ops::UpdateCommandRequest updateOp(ChunkType::ConfigNS); std::vector<write_ops::UpdateOpEntry> entries; @@ -260,14 +258,31 @@ void upsertChunks(OperationContext* opCtx, std::vector<ChunkType>& chunks) { updateRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); - cluster::write(opCtx, updateRequest, &stats, &response, boost::none); + const BSONObj cmdObj = updateRequest.toBSON().addFields(osi.toBSON()); + + BatchedCommandResponse batchResponse; + const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + const auto response = + configShard->runCommand(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + ChunkType::ConfigNS.db().toString(), + cmdObj, + Shard::kDefaultConfigCommandTimeout, + Shard::RetryPolicy::kIdempotent); - uassertStatusOK(response.toStatus()); + const auto writeStatus = + Shard::CommandResponse::processBatchWriteResponse(response, &batchResponse); + + uassertStatusOK(batchResponse.toStatus()); + uassertStatusOK(writeStatus); } -void updateCatalogEntry(OperationContext* opCtx, const NamespaceString& nss, CollectionType& coll) { - BatchWriteExecStats stats; - BatchedCommandResponse response; +void updateCatalogEntry(OperationContext* opCtx, + const NamespaceString& nss, + CollectionType& coll, + const OperationSessionInfo& osi) { + const auto configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); + BatchedCommandRequest updateRequest([&]() { write_ops::UpdateCommandRequest updateOp(CollectionType::ConfigNS); updateOp.setUpdates({[&] { @@ -282,13 +297,26 @@ void updateCatalogEntry(OperationContext* opCtx, const NamespaceString& nss, Col }()); updateRequest.setWriteConcern(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); - try { - cluster::write(opCtx, updateRequest, &stats, &response, boost::none); + const BSONObj cmdObj = updateRequest.toBSON().addFields(osi.toBSON()); - uassertStatusOK(response.toStatus()); + try { + BatchedCommandResponse batchResponse; + const auto response = + configShard->runCommand(opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + CollectionType::ConfigNS.db().toString(), + cmdObj, + Shard::kDefaultConfigCommandTimeout, + Shard::RetryPolicy::kIdempotent); + + const auto writeStatus = + Shard::CommandResponse::processBatchWriteResponse(response, &batchResponse); + + uassertStatusOK(batchResponse.toStatus()); + uassertStatusOK(writeStatus); } catch (const DBException&) { // If an error happens when contacting the config server, we don't know if the update - // succeded or not, which might cause the local shard version to differ from the config + // succeeded or not, which might cause the local shard version to differ from the config // server, so we clear the metadata to allow another operation to refresh it. UninterruptibleLockGuard noInterrupt(opCtx->lockState()); AutoGetCollection autoColl(opCtx, nss, MODE_IX); @@ -299,21 +327,18 @@ void updateCatalogEntry(OperationContext* opCtx, const NamespaceString& nss, Col void broadcastDropCollection(OperationContext* opCtx, const NamespaceString& nss, - const std::shared_ptr<executor::TaskExecutor>& executor) { + const std::shared_ptr<executor::TaskExecutor>& executor, + const OperationSessionInfo& osi) { const auto primaryShardId = ShardingState::get(opCtx)->shardId(); const ShardsvrDropCollectionParticipant dropCollectionParticipant(nss); auto participants = Grid::get(opCtx)->shardRegistry()->getAllShardIds(opCtx); - // Remove prymary shard from participants + // Remove primary shard from participants participants.erase(std::remove(participants.begin(), participants.end(), primaryShardId), participants.end()); - sharding_ddl_util::sendAuthenticatedCommandToShards( - opCtx, - nss.db(), - CommandHelpers::appendMajorityWriteConcern(dropCollectionParticipant.toBSON({})), - participants, - executor); + sharding_ddl_util::sendDropCollectionParticipantCommandToShards( + opCtx, nss, participants, executor, osi); } } // namespace @@ -382,6 +407,13 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( auto* opCtx = opCtxHolder.get(); getForwardableOpMetadata().setOn(opCtx); + if (!_firstExecution) { + // Perform a noop write on the participants in order to advance the txnNumber + // for this coordinator's lsid so that requests with older txnNumbers can no + // longer execute. + _performNoopRetryableWriteOnParticipants(opCtx, **executor); + } + if (_recoveredFromDisk) { // If a stedown happened it could've ocurred while waiting for majority when // writing config.collections. If the refresh happens before this write is @@ -431,8 +463,13 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( 1, "Removing partial changes from previous run", "namespace"_attr = nss()); - removeChunks(opCtx, *uuid); - broadcastDropCollection(opCtx, nss(), **executor); + + _doc = _updateSession(opCtx, _doc); + cleanupPartialChunksFromPreviousAttempt( + opCtx, *uuid, getCurrentSession(_doc)); + + _doc = _updateSession(opCtx, _doc); + broadcastDropCollection(opCtx, nss(), **executor, getCurrentSession(_doc)); } } @@ -458,7 +495,16 @@ ExecutorFuture<void> CreateCollectionCoordinator::_runImpl( nss(), _critSecReason, ShardingCatalogClient::kMajorityWriteConcern); - _createCollectionOnNonPrimaryShards(opCtx); + + _doc = _updateSession(opCtx, _doc); + try { + _createCollectionOnNonPrimaryShards(opCtx, getCurrentSession(_doc)); + } catch (const ExceptionFor<ErrorCodes::NotARetryableWriteCommand>&) { + // Older 5.0 binaries don't support running the + // _shardsvrCreateCollectionParticipant command as a retryable write yet. In + // that case, retry without attaching session info. + _createCollectionOnNonPrimaryShards(opCtx, boost::none); + } _commit(opCtx); } @@ -641,7 +687,8 @@ void CreateCollectionCoordinator::_createPolicyAndChunks(OperationContext* opCtx _numChunks = _initialChunks.chunks.size(); } -void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards(OperationContext* opCtx) { +void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards( + OperationContext* opCtx, const boost::optional<OperationSessionInfo>& osi) { LOGV2_DEBUG(5277905, 2, "Create collection _createCollectionOnNonPrimaryShards", @@ -670,8 +717,8 @@ void CreateCollectionCoordinator::_createCollectionOnNonPrimaryShards(OperationC requests.emplace_back( chunkShardId, - createCollectionParticipantRequest.toBSON( - BSON("writeConcern" << ShardingCatalogClient::kMajorityWriteConcern.toBSON()))); + CommandHelpers::appendMajorityWriteConcern( + createCollectionParticipantRequest.toBSON(osi ? osi->toBSON() : BSONObj()))); initializedShards.emplace(chunkShardId); } @@ -707,7 +754,8 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { LOGV2_DEBUG(5277906, 2, "Create collection _commit", "namespace"_attr = nss()); // Upsert Chunks. - upsertChunks(opCtx, _initialChunks.chunks); + _doc = _updateSession(opCtx, _doc); + upsertChunks(opCtx, _initialChunks.chunks, getCurrentSession(_doc)); CollectionType coll(nss(), _initialChunks.collVersion().epoch(), @@ -731,7 +779,8 @@ void CreateCollectionCoordinator::_commit(OperationContext* opCtx) { coll.setUnique(*_doc.getUnique()); } - updateCatalogEntry(opCtx, nss(), coll); + _doc = _updateSession(opCtx, _doc); + updateCatalogEntry(opCtx, nss(), coll, getCurrentSession(_doc)); } void CreateCollectionCoordinator::_finalize(OperationContext* opCtx) { @@ -808,6 +857,20 @@ void CreateCollectionCoordinator::_logEndCreateCollection(OperationContext* opCt opCtx, "shardCollection.end", nss().ns(), collectionDetail.obj()); } +void CreateCollectionCoordinator::_performNoopRetryableWriteOnParticipants( + OperationContext* opCtx, const std::shared_ptr<executor::TaskExecutor>& executor) { + auto shardsAndConfigsvr = [&] { + const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); + auto participants = shardRegistry->getAllShardIds(opCtx); + participants.emplace_back(shardRegistry->getConfigShard()->getId()); + return participants; + }(); + + _doc = _updateSession(opCtx, _doc); + sharding_ddl_util::performNoopRetryableWriteOnShards( + opCtx, shardsAndConfigsvr, getCurrentSession(_doc), executor); +} + // Phase change API. void CreateCollectionCoordinator::_enterPhase(Phase newPhase) { diff --git a/src/mongo/db/s/create_collection_coordinator.h b/src/mongo/db/s/create_collection_coordinator.h index bb1f303e3c3..8d57374d640 100644 --- a/src/mongo/db/s/create_collection_coordinator.h +++ b/src/mongo/db/s/create_collection_coordinator.h @@ -111,7 +111,8 @@ private: * If the optimized path can be taken, ensure the collection is already created in all the * participant shards. */ - void _createCollectionOnNonPrimaryShards(OperationContext* opCtx); + void _createCollectionOnNonPrimaryShards(OperationContext* opCtx, + const boost::optional<OperationSessionInfo>& osi); /** * Does the following writes: @@ -135,6 +136,10 @@ private: */ void _logEndCreateCollection(OperationContext* opCtx); + void _performNoopRetryableWriteOnParticipants( + OperationContext* opCtx, const std::shared_ptr<executor::TaskExecutor>& executor); + + CreateCollectionCoordinatorDocument _doc; const BSONObj _critSecReason; diff --git a/src/mongo/db/s/remove_chunks.idl b/src/mongo/db/s/remove_chunks.idl new file mode 100644 index 00000000000..a997b8c282e --- /dev/null +++ b/src/mongo/db/s/remove_chunks.idl @@ -0,0 +1,46 @@ +# Copyright (C) 2021-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + +commands: + _configsvrRemoveChunks: + command_name: _configsvrRemoveChunks + cpp_name: ConfigsvrRemoveChunks + description: "internal _configsvrRemoveChunks command for config server" + namespace: ignored + api_version: "" + strict: false + fields: + collectionUUID: + type: uuid + description: "The collection uuid of the chunks to be removed" diff --git a/src/mongo/db/s/shardsvr_create_collection_participant_command.cpp b/src/mongo/db/s/shardsvr_create_collection_participant_command.cpp index e4708a1ce94..e49c2d78aac 100644 --- a/src/mongo/db/s/shardsvr_create_collection_participant_command.cpp +++ b/src/mongo/db/s/shardsvr_create_collection_participant_command.cpp @@ -70,6 +70,7 @@ public: << request().toBSON(BSONObj()), opCtx->getWriteConcern().wMode == WriteConcernOptions::kMajority); + opCtx->setAlwaysInterruptAtStepDownOrUp(); MigrationDestinationManager::cloneCollectionIndexesAndOptions( opCtx, diff --git a/src/mongo/db/transaction_validation.cpp b/src/mongo/db/transaction_validation.cpp index 574af8ba82d..f30d64eb205 100644 --- a/src/mongo/db/transaction_validation.cpp +++ b/src/mongo/db/transaction_validation.cpp @@ -51,7 +51,9 @@ const StringMap<int> retryableWriteCommands = {{"delete", 1}, {"insert", 1}, {"update", 1}, {"_recvChunkStart", 1}, + {"_configsvrRemoveChunks", 1}, {"_configsvrRemoveTags", 1}, + {"_shardsvrCreateCollectionParticipant", 1}, {"_shardsvrDropCollectionParticipant", 1}, {"_shardsvrRenameCollectionParticipant", 1}, {"_shardsvrRenameCollectionParticipantUnblock", 1}, |