diff options
Diffstat (limited to 'src')
11 files changed, 369 insertions, 845 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 4aad6f8db57..1a438554288 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -162,7 +162,7 @@ env.Library( '$BUILD_DIR/mongo/s/common_s', '$BUILD_DIR/mongo/s/grid', 'sharding_api_d', - 'sharding_catalog_manager', + ], ) @@ -561,7 +561,6 @@ env.CppUnitTest( 'config/sharding_catalog_manager_add_shard_test.cpp', 'config/sharding_catalog_manager_add_shard_to_zone_test.cpp', 'config/sharding_catalog_manager_assign_key_range_to_zone_test.cpp', - 'config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp', 'config/sharding_catalog_manager_clear_jumbo_flag_test.cpp', 'config/sharding_catalog_manager_commit_chunk_migration_test.cpp', 'config/sharding_catalog_manager_config_initialization_test.cpp', diff --git a/src/mongo/db/s/config/sharding_catalog_manager.cpp b/src/mongo/db/s/config/sharding_catalog_manager.cpp index ff8e7c04f8d..8d0b36cac78 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager.cpp @@ -35,9 +35,7 @@ #include "mongo/db/auth/authorization_session_impl.h" #include "mongo/db/operation_context.h" -#include "mongo/db/query/query_request.h" #include "mongo/db/s/balancer/type_migration.h" -#include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/config_server_version.h" #include "mongo/s/catalog/sharding_catalog_client.h" @@ -88,24 +86,9 @@ OpMsg runCommandInLocalTxn(OperationContext* opCtx, .response); } -void startTransactionWithNoopFind(OperationContext* opCtx, - const NamespaceString& nss, - TxnNumber txnNumber) { - BSONObjBuilder findCmdBuilder; - QueryRequest qr(nss); - qr.setBatchSize(0); - qr.setWantMore(false); - qr.asFindCommand(&findCmdBuilder); - - auto res = runCommandInLocalTxn( - opCtx, nss.db(), true /*startTransaction*/, txnNumber, findCmdBuilder.done()) - .body; - uassertStatusOK(getStatusFromCommandResult(res)); -} - -BSONObj commitOrAbortTransaction(OperationContext* opCtx, - TxnNumber txnNumber, - std::string cmdName) { +BSONObj runCommitOrAbortTxnForConfigDocument(OperationContext* opCtx, + TxnNumber txnNumber, + std::string cmdName) { // Swap out the clients in order to get a fresh opCtx. Previous operations in this transaction // that have been run on this opCtx would have set the timeout in the locker on the opCtx, but // commit should not have a lock timeout. @@ -141,24 +124,6 @@ BSONObj commitOrAbortTransaction(OperationContext* opCtx, return replyOpMsg.body; } -// Runs commit for the transaction with 'txnNumber'. -void commitTransaction(OperationContext* opCtx, TxnNumber txnNumber) { - auto response = commitOrAbortTransaction(opCtx, txnNumber, "commitTransaction"); - uassertStatusOK(getStatusFromCommandResult(response)); - uassertStatusOK(getWriteConcernStatusFromCommandResult(response)); -} - -// Runs abort for the transaction with 'txnNumber'. -void abortTransaction(OperationContext* opCtx, TxnNumber txnNumber) { - auto response = commitOrAbortTransaction(opCtx, txnNumber, "abortTransaction"); - - auto status = getStatusFromCommandResult(response); - if (status.code() != ErrorCodes::NoSuchTransaction) { - uassertStatusOK(status); - uassertStatusOK(getWriteConcernStatusFromCommandResult(response)); - } -} - } // namespace void ShardingCatalogManager::create(ServiceContext* serviceContext, @@ -501,21 +466,17 @@ StatusWith<bool> ShardingCatalogManager::_isShardRequiredByZoneStillInUse( BSONObj ShardingCatalogManager::writeToConfigDocumentInTxn(OperationContext* opCtx, const NamespaceString& nss, const BatchedCommandRequest& request, + bool startTransaction, TxnNumber txnNumber) { invariant(nss.db() == NamespaceString::kConfigDb); - auto response = runCommandInLocalTxn( - opCtx, nss.db(), false /* startTransaction */, txnNumber, request.toBSON()) - .body; - - uassertStatusOK(getStatusFromCommandResult(response)); - uassertStatusOK(getWriteConcernStatusFromCommandResult(response)); - - return response; + return runCommandInLocalTxn(opCtx, nss.db(), startTransaction, txnNumber, request.toBSON()) + .body; } void ShardingCatalogManager::insertConfigDocumentsInTxn(OperationContext* opCtx, const NamespaceString& nss, std::vector<BSONObj> docs, + bool startTransaction, TxnNumber txnNumber) { invariant(nss.db() == NamespaceString::kConfigDb); @@ -530,7 +491,8 @@ void ShardingCatalogManager::insertConfigDocumentsInTxn(OperationContext* opCtx, return insertOp; }()); - writeToConfigDocumentInTxn(opCtx, nss, request, txnNumber); + uassertStatusOK(getStatusFromWriteCommandReply( + writeToConfigDocumentInTxn(opCtx, nss, request, startTransaction, txnNumber))); }; while (!docs.empty()) { @@ -558,29 +520,22 @@ void ShardingCatalogManager::insertConfigDocumentsInTxn(OperationContext* opCtx, doBatchInsert(); } -void ShardingCatalogManager::withTransaction( - OperationContext* opCtx, - const NamespaceString& namespaceForInitialFind, - unique_function<void(OperationContext*, TxnNumber)> func) { - AlternativeSessionRegion asr(opCtx); - AuthorizationSession::get(asr.opCtx()->getClient()) - ->grantInternalAuthorization(asr.opCtx()->getClient()); - TxnNumber txnNumber = 0; - - auto guard = makeGuard([opCtx = asr.opCtx(), txnNumber] { - try { - abortTransaction(opCtx, txnNumber); - } catch (DBException& e) { - LOGV2_WARNING(5192100, - "Failed to abort transaction in AlternativeSessionRegion", - "error"_attr = redact(e)); - } - }); +void ShardingCatalogManager::commitTxnForConfigDocument(OperationContext* opCtx, + TxnNumber txnNumber) { + auto response = runCommitOrAbortTxnForConfigDocument(opCtx, txnNumber, "commitTransaction"); + uassertStatusOK(getStatusFromCommandResult(response)); + uassertStatusOK(getWriteConcernStatusFromCommandResult(response)); +} + +void ShardingCatalogManager::abortTxnForConfigDocument(OperationContext* opCtx, + TxnNumber txnNumber) { + auto response = runCommitOrAbortTxnForConfigDocument(opCtx, txnNumber, "abortTransaction"); - startTransactionWithNoopFind(asr.opCtx(), namespaceForInitialFind, txnNumber); - func(asr.opCtx(), txnNumber); - commitTransaction(asr.opCtx(), txnNumber); - guard.dismiss(); + auto status = getStatusFromCommandResult(response); + if (status.code() != ErrorCodes::NoSuchTransaction) { + uassertStatusOK(status); + uassertStatusOK(getWriteConcernStatusFromCommandResult(response)); + } } } // namespace mongo diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h index fafcbd62897..a6f97372025 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.h +++ b/src/mongo/db/s/config/sharding_catalog_manager.h @@ -30,9 +30,7 @@ #pragma once #include "mongo/base/status_with.h" -#include "mongo/db/auth/authorization_session.h" #include "mongo/db/concurrency/d_concurrency.h" -#include "mongo/db/logical_session_cache.h" #include "mongo/db/repl/optime_with.h" #include "mongo/db/s/config/namespace_serializer.h" #include "mongo/executor/task_executor.h" @@ -185,28 +183,14 @@ public: */ Lock::ExclusiveLock lockZoneMutex(OperationContext* opCtx); - // - // General utilities related to the ShardingCatalogManager - // - - /** - * Starts and commits a transaction on the config server, with a no-op find on the specified - * namespace in order to internally start the transaction. All writes done inside the - * passed-in function must assume that they are run inside a transaction that will be commited - * after the function itself has completely finished. - */ - static void withTransaction(OperationContext* opCtx, - const NamespaceString& namespaceForInitialFind, - unique_function<void(OperationContext*, TxnNumber)> func); - /** * Runs the write 'request' on namespace 'nss' in a transaction with 'txnNumber'. Write must be - * on a collection in the config database. If expectedNumModified is specified, the number of - * documents modified must match expectedNumModified - throws otherwise. + * on a collection in the config database. */ BSONObj writeToConfigDocumentInTxn(OperationContext* opCtx, const NamespaceString& nss, const BatchedCommandRequest& request, + bool startTransaction, TxnNumber txnNumber); /** @@ -217,8 +201,19 @@ public: void insertConfigDocumentsInTxn(OperationContext* opCtx, const NamespaceString& nss, std::vector<BSONObj> docs, + bool startTransaction, TxnNumber txnNumber); + /** + * Runs commit for the transaction with 'txnNumber'. + */ + void commitTxnForConfigDocument(OperationContext* opCtx, TxnNumber txnNumber); + + /** + * Runs abort for the transaction with 'txnNumber'. + */ + void abortTxnForConfigDocument(OperationContext* opCtx, TxnNumber txnNumber); + // // Chunk Operations // @@ -288,20 +283,6 @@ public: const BSONObj& maxKey, const ChunkVersion& version); - /** - * In a single transaction, effectively bumps the shard version for each shard in the collection - * to be the current collection version's major version + 1 inside an already-running - * transaction. - * - * Note: it's the responsibility of the caller to ensure that the list of shards is stable, - * as any shards added after the shard ids have been passed in will be missed. - */ - void bumpCollShardVersionsAndChangeMetadataInTxn( - OperationContext* opCtx, - const NamespaceString& nss, - const std::vector<ShardId>& shardIds, - unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc); - // // Database Operations // @@ -400,6 +381,7 @@ public: const NamespaceString& nss, const CollectionType& coll, const bool upsert, + const bool startTransaction, TxnNumber txnNumber); /** diff --git a/src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp deleted file mode 100644 index 7df7df0e6c7..00000000000 --- a/src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp +++ /dev/null @@ -1,221 +0,0 @@ -/** - * Copyright (C) 2020-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest - -#include "mongo/platform/basic.h" - -#include "mongo/db/dbdirectclient.h" -#include "mongo/db/logical_session_cache_noop.h" -#include "mongo/db/repl/wait_for_majority_service.h" -#include "mongo/db/s/config/config_server_test_fixture.h" -#include "mongo/db/s/config/sharding_catalog_manager.h" -#include "mongo/db/s/transaction_coordinator_service.h" -#include "mongo/db/session_catalog_mongod.h" -#include "mongo/db/transaction_participant.h" -#include "mongo/logv2/log.h" - -namespace mongo { -namespace { - -const NamespaceString kNss("TestDB", "TestColl"); -const ShardType kShard0("shard0000", "shard0000:1234"); -const ShardType kShard1("shard0001", "shard0001:1234"); - -class ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest - : public ConfigServerTestFixture { - void setUp() { - ConfigServerTestFixture::setUp(); - setupShards({kShard0, kShard1}); - - // Create config.transactions collection. - auto opCtx = operationContext(); - DBDirectClient client(opCtx); - client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns()); - client.createCollection(CollectionType::ConfigNS.ns()); - - LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>()); - TransactionCoordinatorService::get(operationContext()) - ->onShardingInitialization(operationContext(), true); - } - - void tearDown() { - TransactionCoordinatorService::get(operationContext())->onStepDown(); - ConfigServerTestFixture::tearDown(); - } - -protected: - ChunkType generateChunkType(const NamespaceString& nss, - const ChunkVersion& chunkVersion, - const ShardId& shardId, - const BSONObj& minKey, - const BSONObj& maxKey) { - ChunkType chunkType; - chunkType.setName(OID::gen()); - chunkType.setNS(nss); - chunkType.setVersion(chunkVersion); - chunkType.setShard(shardId); - chunkType.setMin(minKey); - chunkType.setMax(maxKey); - chunkType.setHistory({ChunkHistory(Timestamp(100, 0), shardId)}); - return chunkType; - } - - /** - * Determines if the chunk's version has been bumped to the targetChunkVersion. - */ - bool chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( - const ChunkType& chunkTypeBefore, - const StatusWith<ChunkType> swChunkTypeAfter, - const ChunkVersion& targetChunkVersion) { - ASSERT_OK(swChunkTypeAfter.getStatus()); - auto chunkTypeAfter = swChunkTypeAfter.getValue(); - - // Regardless of whether the major version was bumped, the chunk's other fields should be - // unchanged. - ASSERT_EQ(chunkTypeBefore.getName(), chunkTypeAfter.getName()); - ASSERT_EQ(chunkTypeBefore.getNS(), chunkTypeAfter.getNS()); - ASSERT_BSONOBJ_EQ(chunkTypeBefore.getMin(), chunkTypeAfter.getMin()); - ASSERT_BSONOBJ_EQ(chunkTypeBefore.getMax(), chunkTypeAfter.getMax()); - ASSERT(chunkTypeBefore.getHistory() == chunkTypeAfter.getHistory()); - - return chunkTypeAfter.getVersion().majorVersion() == targetChunkVersion.majorVersion(); - } - - /** - * If there are multiple chunks per shard, the chunk whose version gets bumped is not - * deterministic. - * - * Asserts that only chunk per shard has its major version increased. - */ - void assertOnlyOneChunkVersionBumped(OperationContext* opCtx, - std::vector<ChunkType> originalChunkTypes, - const ChunkVersion& targetChunkVersion) { - auto aChunkVersionWasBumped = false; - for (auto originalChunkType : originalChunkTypes) { - auto swChunkTypeAfter = getChunkDoc(opCtx, originalChunkType.getMin()); - auto wasBumped = chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( - originalChunkType, swChunkTypeAfter, targetChunkVersion); - if (aChunkVersionWasBumped) { - ASSERT_FALSE(wasBumped); - } else { - aChunkVersionWasBumped = wasBumped; - } - } - - ASSERT_TRUE(aChunkVersionWasBumped); - } -}; - -TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, - BumpChunkVersionOneChunkPerShard) { - const auto epoch = OID::gen(); - const auto shard0Chunk0 = generateChunkType( - kNss, ChunkVersion(10, 1, epoch), kShard0.getName(), BSON("a" << 1), BSON("a" << 10)); - const auto shard1Chunk0 = generateChunkType( - kNss, ChunkVersion(11, 2, epoch), kShard1.getName(), BSON("a" << 11), BSON("a" << 20)); - - const auto collectionVersion = shard1Chunk0.getVersion(); - ChunkVersion targetChunkVersion( - collectionVersion.majorVersion() + 1, 0, collectionVersion.epoch()); - - setupChunks({shard0Chunk0, shard1Chunk0}); - - auto opCtx = operationContext(); - - std::vector<ShardId> shardIds{kShard0.getName(), kShard1.getName()}; - ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( - opCtx, kNss, shardIds, [&](OperationContext*, TxnNumber) {}); - - ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( - shard0Chunk0, getChunkDoc(operationContext(), shard0Chunk0.getMin()), targetChunkVersion)); - - ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( - shard1Chunk0, getChunkDoc(operationContext(), shard1Chunk0.getMin()), targetChunkVersion)); -} - -TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, - BumpChunkVersionTwoChunksOnOneShard) { - const auto epoch = OID::gen(); - const auto shard0Chunk0 = generateChunkType( - kNss, ChunkVersion(10, 1, epoch), kShard0.getName(), BSON("a" << 1), BSON("a" << 10)); - const auto shard0Chunk1 = generateChunkType( - kNss, ChunkVersion(11, 2, epoch), kShard0.getName(), BSON("a" << 11), BSON("a" << 20)); - const auto shard1Chunk0 = generateChunkType( - kNss, ChunkVersion(8, 1, epoch), kShard1.getName(), BSON("a" << 21), BSON("a" << 100)); - - const auto collectionVersion = shard0Chunk1.getVersion(); - ChunkVersion targetChunkVersion( - collectionVersion.majorVersion() + 1, 0, collectionVersion.epoch()); - - setupChunks({shard0Chunk0, shard0Chunk1, shard1Chunk0}); - - auto opCtx = operationContext(); - std::vector<ShardId> shardIds{kShard0.getName(), kShard1.getName()}; - ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( - opCtx, kNss, shardIds, [&](OperationContext*, TxnNumber) {}); - - assertOnlyOneChunkVersionBumped( - operationContext(), {shard0Chunk0, shard0Chunk1}, targetChunkVersion); - - ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( - shard1Chunk0, getChunkDoc(operationContext(), shard1Chunk0.getMin()), targetChunkVersion)); -} - -TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, - BumpChunkVersionTwoChunksOnTwoShards) { - const auto epoch = OID::gen(); - const auto shard0Chunk0 = generateChunkType( - kNss, ChunkVersion(10, 1, epoch), kShard0.getName(), BSON("a" << 1), BSON("a" << 10)); - const auto shard0Chunk1 = generateChunkType( - kNss, ChunkVersion(11, 2, epoch), kShard0.getName(), BSON("a" << 11), BSON("a" << 20)); - const auto shard1Chunk0 = generateChunkType( - kNss, ChunkVersion(8, 1, epoch), kShard1.getName(), BSON("a" << 21), BSON("a" << 100)); - const auto shard1Chunk1 = generateChunkType( - kNss, ChunkVersion(12, 1, epoch), kShard1.getName(), BSON("a" << 101), BSON("a" << 200)); - - const auto collectionVersion = shard1Chunk1.getVersion(); - ChunkVersion targetChunkVersion( - collectionVersion.majorVersion() + 1, 0, collectionVersion.epoch()); - - setupChunks({shard0Chunk0, shard0Chunk1, shard1Chunk0, shard1Chunk1}); - - auto opCtx = operationContext(); - std::vector<ShardId> shardIds{kShard0.getName(), kShard1.getName()}; - ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( - opCtx, kNss, shardIds, [&](OperationContext*, TxnNumber) {}); - - assertOnlyOneChunkVersionBumped( - operationContext(), {shard0Chunk0, shard0Chunk1}, targetChunkVersion); - - assertOnlyOneChunkVersionBumped( - operationContext(), {shard1Chunk0, shard1Chunk1}, targetChunkVersion); -} - -} // namespace -} // namespace mongo diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp index d1f09247cdb..be40c6a5ed6 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp @@ -40,7 +40,6 @@ #include "mongo/client/read_preference.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/dbdirectclient.h" -#include "mongo/db/logical_session_cache.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/repl_client_info.h" @@ -56,7 +55,6 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" -#include "mongo/s/write_ops/batched_command_request.h" #include "mongo/util/fail_point.h" #include "mongo/util/str.h" @@ -69,27 +67,6 @@ MONGO_FAIL_POINT_DEFINE(skipExpiringOldChunkHistory); const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)); -BatchedCommandRequest buildUpdateOp(const NamespaceString& nss, - const BSONObj& query, - const BSONObj& update, - bool upsert, - bool multi) { - BatchedCommandRequest request([&] { - write_ops::Update updateOp(nss); - updateOp.setUpdates({[&] { - write_ops::UpdateOpEntry entry; - entry.setQ(query); - entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(update)); - entry.setUpsert(upsert); - entry.setMulti(multi); - return entry; - }()}); - return updateOp; - }()); - - return request; -} - /** * Append min, max and version information from chunk to the buffer for logChange purposes. */ @@ -323,9 +300,13 @@ StatusWith<ChunkVersion> getMaxChunkVersionFromQueryResponse( return ChunkVersion::parseLegacyWithField(chunksVector.front(), ChunkType::lastmod()); } -// Helper function to get the collection version for nss. Always uses kLocalReadConcern. -StatusWith<ChunkVersion> getCollectionVersion(OperationContext* opCtx, const NamespaceString& nss) { - return getMaxChunkVersionFromQueryResponse( +// Helper function to get collection version and donor shard version following a merge/move/split +BSONObj getShardAndCollectionVersion(OperationContext* opCtx, + const NamespaceString& nss, + const ShardId& fromShard) { + BSONObjBuilder result; + + auto swCollectionVersion = getMaxChunkVersionFromQueryResponse( nss, Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( opCtx, @@ -335,15 +316,7 @@ StatusWith<ChunkVersion> getCollectionVersion(OperationContext* opCtx, const Nam BSON("ns" << nss.ns()), // Query all chunks for this namespace. BSON(ChunkType::lastmod << -1), // Sort by version. 1)); // Limit 1. -} - -// Helper function to get collection version and donor shard version following a merge/move/split -BSONObj getShardAndCollectionVersion(OperationContext* opCtx, - const NamespaceString& nss, - const ShardId& fromShard) { - BSONObjBuilder result; - auto swCollectionVersion = getCollectionVersion(opCtx, nss); auto collectionVersion = uassertStatusOKWithContext( std::move(swCollectionVersion), "Couldn't retrieve collection version from config server"); @@ -386,47 +359,6 @@ BSONObj getShardAndCollectionVersion(OperationContext* opCtx, return result.obj(); } -void bumpMajorVersionOneChunkPerShard(OperationContext* opCtx, - const NamespaceString& nss, - TxnNumber txnNumber, - const std::vector<ShardId>& shardIds) { - auto curCollectionVersion = uassertStatusOK(getCollectionVersion(opCtx, nss)); - ChunkVersion targetChunkVersion( - curCollectionVersion.majorVersion() + 1, 0, curCollectionVersion.epoch()); - - for (const auto& shardId : shardIds) { - BSONObjBuilder updateBuilder; - BSONObjBuilder updateVersionClause(updateBuilder.subobjStart("$set")); - targetChunkVersion.appendLegacyWithField(&updateVersionClause, ChunkType::lastmod()); - updateVersionClause.doneFast(); - auto chunkUpdate = updateBuilder.obj(); - - auto request = buildUpdateOp( - ChunkType::ConfigNS, - BSON(ChunkType::ns(nss.ns()) << ChunkType::shard(shardId.toString())), // query - chunkUpdate, // update - false, // upsert - false // multi - ); - - auto res = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn( - opCtx, ChunkType::ConfigNS, request, txnNumber); - - auto numDocsExpectedModified = 1; - auto numDocsModified = res.getIntField("n"); - - uassert(5030400, - str::stream() << "Expected to match " << numDocsExpectedModified - << " docs, but only matched " << numDocsModified - << " for write request " << request.toString(), - numDocsExpectedModified == numDocsModified); - - // There exists a constraint that a chunk version must be unique for a given namespace, - // so the minor version is incremented for each chunk placed. - targetChunkVersion.incMinor(); - } -} - } // namespace StatusWith<BSONObj> ShardingCatalogManager::commitChunkSplit( @@ -443,7 +375,16 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkSplit( Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); // Get the max chunk version for this namespace. - auto swCollVersion = getCollectionVersion(opCtx, nss); + auto swCollVersion = getMaxChunkVersionFromQueryResponse( + nss, + Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + ChunkType::ConfigNS, + BSON("ns" << nss.ns()), // Query all chunks for this namespace. + BSON(ChunkType::lastmod << -1), // Sort by version. + 1)); // Limit 1. if (!swCollVersion.isOK()) { return swCollVersion.getStatus().withContext( @@ -654,7 +595,17 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMerge( } // Get the max chunk version for this namespace. - auto swCollVersion = getCollectionVersion(opCtx, nss); + auto swCollVersion = getMaxChunkVersionFromQueryResponse( + nss, + Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + ChunkType::ConfigNS, + BSON("ns" << nss.ns()), // Query all chunks for this namespace. + BSON(ChunkType::lastmod << -1), // Sort by version. + 1)); // Limit 1. + if (!swCollVersion.isOK()) { return swCollVersion.getStatus().withContext(str::stream() << "mergeChunk cannot merge chunks."); @@ -1231,21 +1182,4 @@ void ShardingCatalogManager::ensureChunkVersionIsGreaterThan(OperationContext* o } } -void ShardingCatalogManager::bumpCollShardVersionsAndChangeMetadataInTxn( - OperationContext* opCtx, - const NamespaceString& nss, - const std::vector<ShardId>& shardIds, - unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc) { - - // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and - // migrations - Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); - withTransaction(opCtx, - NamespaceString::kConfigReshardingOperationsNamespace, - [&](OperationContext* opCtx, TxnNumber txnNumber) { - bumpMajorVersionOneChunkPerShard(opCtx, nss, txnNumber, shardIds); - changeMetadataFunc(opCtx, txnNumber); - }); -} - } // namespace mongo diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp index a216412912c..50470cf2a80 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp @@ -141,7 +141,7 @@ BatchedCommandRequest buildUpdateOp(const NamespaceString& nss, const BSONObj& query, const BSONObj& update, bool upsert, - bool multi) { + bool useMultiUpdate) { BatchedCommandRequest request([&] { write_ops::Update updateOp(nss); updateOp.setUpdates({[&] { @@ -149,7 +149,7 @@ BatchedCommandRequest buildUpdateOp(const NamespaceString& nss, entry.setQ(query); entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(update)); entry.setUpsert(upsert); - entry.setMulti(multi); + entry.setMulti(useMultiUpdate); return entry; }()}); return updateOp; @@ -637,10 +637,15 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx, collType.setEpoch(newEpoch); collType.setKeyPattern(newShardKeyPattern.getKeyPattern()); - auto updateCollectionAndChunksFn = [&](OperationContext* opCtx, TxnNumber txnNumber) { + { // Update the config.collections entry for the given namespace. + AlternativeSessionRegion asr(opCtx); + AuthorizationSession::get(asr.opCtx()->getClient()) + ->grantInternalAuthorization(asr.opCtx()->getClient()); + TxnNumber txnNumber = 0; + updateShardingCatalogEntryForCollectionInTxn( - opCtx, nss, collType, false /* upsert */, txnNumber); + asr.opCtx(), nss, collType, false /* upsert */, true /* startTransaction */, txnNumber); LOGV2(21933, "refineCollectionShardKey updated collection entry for {namespace}: took " @@ -662,15 +667,17 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx, // to the newly-generated objectid, (ii) their bounds for each new field in the refined // key to MinKey (except for the global max chunk where the max bounds are set to // MaxKey), and unsetting (iii) their jumbo field. - writeToConfigDocumentInTxn(opCtx, - ChunkType::ConfigNS, - buildPipelineUpdateOp(ChunkType::ConfigNS, - BSON("ns" << nss.ns()), - chunkUpdates, - false, // upsert - true // useMultiUpdate - ), - txnNumber); + uassertStatusOK(getStatusFromWriteCommandReply( + writeToConfigDocumentInTxn(asr.opCtx(), + ChunkType::ConfigNS, + buildPipelineUpdateOp(ChunkType::ConfigNS, + BSON("ns" << nss.ns()), + chunkUpdates, + false, // upsert + true // useMultiUpdate + ), + false, // startTransaction + txnNumber))); LOGV2(21935, "refineCollectionShardKey: updated chunk entries for {namespace}: took " @@ -684,15 +691,17 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx, // Update all config.tags entries for the given namespace by setting their bounds for // each new field in the refined key to MinKey (except for the global max tag where the // max bounds are set to MaxKey). - writeToConfigDocumentInTxn(opCtx, - TagsType::ConfigNS, - buildPipelineUpdateOp(TagsType::ConfigNS, - BSON("ns" << nss.ns()), - tagUpdates, - false, // upsert - true // useMultiUpdate - ), - txnNumber); + uassertStatusOK(getStatusFromWriteCommandReply( + writeToConfigDocumentInTxn(asr.opCtx(), + TagsType::ConfigNS, + buildPipelineUpdateOp(TagsType::ConfigNS, + BSON("ns" << nss.ns()), + tagUpdates, + false, // upsert + true // useMultiUpdate + ), + false, // startTransaction + txnNumber))); LOGV2(21936, @@ -707,9 +716,10 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx, LOGV2(21937, "Hit hangRefineCollectionShardKeyBeforeCommit failpoint"); hangRefineCollectionShardKeyBeforeCommit.pauseWhileSet(opCtx); } - }; - withTransaction(opCtx, nss, std::move(updateCollectionAndChunksFn)); + // Note this will wait for majority write concern. + commitTxnForConfigDocument(asr.opCtx(), txnNumber); + } ShardingLogging::get(opCtx)->logChange(opCtx, "refineCollectionShardKey.end", @@ -737,8 +747,9 @@ void ShardingCatalogManager::updateShardingCatalogEntryForCollectionInTxn( const NamespaceString& nss, const CollectionType& coll, const bool upsert, + const bool startTransaction, TxnNumber txnNumber) { - try { + auto status = getStatusFromCommandResult( writeToConfigDocumentInTxn(opCtx, CollectionType::ConfigNS, buildUpdateOp(CollectionType::ConfigNS, @@ -747,11 +758,9 @@ void ShardingCatalogManager::updateShardingCatalogEntryForCollectionInTxn( upsert, false /* multi */ ), - txnNumber); - } catch (DBException& e) { - e.addContext("Collection metadata write failed"); - throw; - } + startTransaction, + txnNumber)); + uassertStatusOKWithContext(status, "Collection metadata write failed"); } } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 1bac5597a72..08f23a349e2 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -50,62 +50,46 @@ namespace mongo { namespace { -BatchedCommandRequest buildInsertOp(const NamespaceString& nss, std::vector<BSONObj> docs) { - BatchedCommandRequest request([&] { - write_ops::Insert insertOp(nss); - insertOp.setDocuments(docs); - return insertOp; - }()); +template <typename Callable> +void withAlternateSession(OperationContext* opCtx, Callable&& callable) { + AlternativeSessionRegion asr(opCtx); + AuthorizationSession::get(asr.opCtx()->getClient()) + ->grantInternalAuthorization(asr.opCtx()->getClient()); + TxnNumber txnNumber = 0; + + auto guard = makeGuard([opCtx = asr.opCtx(), txnNumber] { + try { + ShardingCatalogManager::get(opCtx)->abortTxnForConfigDocument(opCtx, txnNumber); + } catch (const DBException& ex) { + LOGV2(5165900, + "Failed to abort transaction to resharding metadata", + "error"_attr = redact(ex)); + } + }); - return request; -} + callable(asr.opCtx(), txnNumber); -BatchedCommandRequest buildDeleteOp(const NamespaceString& nss, - const BSONObj& query, - bool multiDelete) { - BatchedCommandRequest request([&] { - write_ops::Delete deleteOp(nss); - deleteOp.setDeletes({[&] { - write_ops::DeleteOpEntry entry; - entry.setQ(query); - entry.setMulti(multiDelete); - return entry; - }()}); - return deleteOp; - }()); - - return request; + guard.dismiss(); } -BatchedCommandRequest buildUpdateOp(const NamespaceString& nss, - const BSONObj& query, - const BSONObj& update, - bool upsert, - bool multi) { - BatchedCommandRequest request([&] { - write_ops::Update updateOp(nss); - updateOp.setUpdates({[&] { - write_ops::UpdateOpEntry entry; - entry.setQ(query); - entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(update)); - entry.setUpsert(upsert); - entry.setMulti(multi); - return entry; - }()}); - return updateOp; - }()); +void writeConfigDocs(OperationContext* opCtx, + const NamespaceString& nss, + BatchedCommandRequest request, + bool startTransaction, + TxnNumber txnNumber, + boost::optional<int> expectedNumModified) { + auto response = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn( + opCtx, nss, request, startTransaction, txnNumber); + uassertStatusOK(getStatusFromCommandResult(response)); - return request; -} + if (!expectedNumModified) + return; -void assertNumDocsModifiedMatchesExpected(const BatchedCommandRequest& request, - const BSONObj& response, - int expected) { - auto numDocsModified = response.getIntField("n"); - uassert(5030401, - str::stream() << "Expected to match " << expected << " docs, but only matched " - << numDocsModified << " for write request " << request.toString(), - expected == numDocsModified); + uassert(5030400, + str::stream() << "Expected to match " << expectedNumModified + << " docs, but only matched " << response.getIntField("n") + << " for write request " << request.toString(), + response.getIntField("n") == expectedNumModified); } void writeToCoordinatorStateNss(OperationContext* opCtx, @@ -138,15 +122,12 @@ void writeToCoordinatorStateNss(OperationContext* opCtx, auto expectedNumModified = (request.getBatchType() == BatchedCommandRequest::BatchType_Insert) ? boost::none : boost::make_optional(1); - auto res = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn( - opCtx, - NamespaceString::kConfigReshardingOperationsNamespace, - std::move(request), - txnNumber); - - if (expectedNumModified) { - assertNumDocsModifiedMatchesExpected(request, res, *expectedNumModified); - } + writeConfigDocs(opCtx, + NamespaceString::kConfigReshardingOperationsNamespace, + std::move(request), + true, // startTransaction + txnNumber, + expectedNumModified); } BSONObj createReshardingFieldsUpdateForOriginalNss( @@ -203,18 +184,18 @@ void updateConfigCollectionsForOriginalNss(OperationContext* opCtx, auto writeOp = createReshardingFieldsUpdateForOriginalNss(opCtx, coordinatorDoc, newCollectionEpoch); - auto request = + writeConfigDocs( + opCtx, + CollectionType::ConfigNS, buildUpdateOp(CollectionType::ConfigNS, BSON(CollectionType::kNssFieldName << coordinatorDoc.getNss().ns()), // query writeOp, false, // upsert false // multi - ); - - auto res = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn( - opCtx, CollectionType::ConfigNS, request, txnNumber); - - assertNumDocsModifiedMatchesExpected(request, res, 1 /* expected */); + ), + false, // startTransaction + txnNumber, + 1); // expectedNumModified } void writeToConfigCollectionsForTempNss(OperationContext* opCtx, @@ -274,13 +255,12 @@ void writeToConfigCollectionsForTempNss(OperationContext* opCtx, auto expectedNumModified = (request.getBatchType() == BatchedCommandRequest::BatchType_Insert) ? boost::none : boost::make_optional(1); - - auto res = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn( - opCtx, CollectionType::ConfigNS, std::move(request), txnNumber); - - if (expectedNumModified) { - assertNumDocsModifiedMatchesExpected(request, res, *expectedNumModified); - } + writeConfigDocs(opCtx, + CollectionType::ConfigNS, + std::move(request), + false, // startTransaction + txnNumber, + expectedNumModified); } void insertChunkAndTagDocsForTempNss(OperationContext* opCtx, @@ -294,8 +274,11 @@ void insertChunkAndTagDocsForTempNss(OperationContext* opCtx, initialChunksBSON.begin(), [](ChunkType chunk) { return chunk.toConfigBSON(); }); - ShardingCatalogManager::get(opCtx)->insertConfigDocumentsInTxn( - opCtx, ChunkType::ConfigNS, std::move(initialChunksBSON), txnNumber); + ShardingCatalogManager::get(opCtx)->insertConfigDocumentsInTxn(opCtx, + ChunkType::ConfigNS, + std::move(initialChunksBSON), + false, // startTransaction + txnNumber); // Insert tag documents for temp nss std::vector<BSONObj> zonesBSON(newZones.size()); @@ -303,8 +286,11 @@ void insertChunkAndTagDocsForTempNss(OperationContext* opCtx, return chunk.toBSON(); }); - ShardingCatalogManager::get(opCtx)->insertConfigDocumentsInTxn( - opCtx, TagsType::ConfigNS, std::move(zonesBSON), txnNumber); + ShardingCatalogManager::get(opCtx)->insertConfigDocumentsInTxn(opCtx, + TagsType::ConfigNS, + std::move(zonesBSON), + false, // startTransaction + txnNumber); } void removeChunkAndTagsDocsForOriginalNss(OperationContext* opCtx, @@ -312,25 +298,29 @@ void removeChunkAndTagsDocsForOriginalNss(OperationContext* opCtx, TxnNumber txnNumber) { // Remove all chunk documents for the original nss. We do not know how many chunk docs currently // exist, so cannot pass a value for expectedNumModified - ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn( - opCtx, - ChunkType::ConfigNS, - buildDeleteOp(ChunkType::ConfigNS, - BSON(ChunkType::ns(coordinatorDoc.getNss().ns())), // query - true // multi - ), - txnNumber); + writeConfigDocs(opCtx, + ChunkType::ConfigNS, + buildDeleteOp(ChunkType::ConfigNS, + BSON(ChunkType::ns(coordinatorDoc.getNss().ns())), // query + true // multi + ), + false, // startTransaction + txnNumber, + boost::none // expectedNumModified + ); // Remove all tag documents for the original nss. We do not know how many tag docs currently // exist, so cannot pass a value for expectedNumModified - ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn( - opCtx, - TagsType::ConfigNS, - buildDeleteOp(TagsType::ConfigNS, - BSON(ChunkType::ns(coordinatorDoc.getNss().ns())), // query - true // multi - ), - txnNumber); + writeConfigDocs(opCtx, + TagsType::ConfigNS, + buildDeleteOp(TagsType::ConfigNS, + BSON(ChunkType::ns(coordinatorDoc.getNss().ns())), // query + true // multi + ), + false, // startTransaction + txnNumber, + boost::none // expectedNumModified + ); } void updateChunkAndTagsDocsForTempNss(OperationContext* opCtx, @@ -342,38 +332,34 @@ void updateChunkAndTagsDocsForTempNss(OperationContext* opCtx, // Update all chunk documents that currently have 'ns' as the temporary collection namespace // such that 'ns' is now the original collection namespace and 'lastmodEpoch' is // newCollectionEpoch. - auto chunksRequest = + writeConfigDocs( + opCtx, + ChunkType::ConfigNS, buildUpdateOp(ChunkType::ConfigNS, BSON(ChunkType::ns(coordinatorDoc.getTempReshardingNss().ns())), // query BSON("$set" << BSON("ns" << coordinatorDoc.getNss().ns() << "lastmodEpoch" << newCollectionEpoch)), // update false, // upsert true // multi - ); - - auto chunksRes = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn( - opCtx, ChunkType::ConfigNS, chunksRequest, txnNumber); - - if (expectedNumChunksModified) { - assertNumDocsModifiedMatchesExpected(chunksRequest, chunksRes, *expectedNumChunksModified); - } + ), + false, // startTransaction + txnNumber, + expectedNumChunksModified); - auto tagsRequest = + // Update the 'ns' field to be the original collection namespace for all tags documents that + // currently have 'ns' as the temporary collection namespace + writeConfigDocs( + opCtx, + TagsType::ConfigNS, buildUpdateOp(TagsType::ConfigNS, BSON(TagsType::ns(coordinatorDoc.getTempReshardingNss().ns())), // query BSON("$set" << BSON("ns" << coordinatorDoc.getNss().ns())), // update false, // upsert true // multi - ); - - // Update the 'ns' field to be the original collection namespace for all tags documents that - // currently have 'ns' as the temporary collection namespace - auto tagsRes = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn( - opCtx, TagsType::ConfigNS, tagsRequest, txnNumber); - - if (expectedNumZonesModified) { - assertNumDocsModifiedMatchesExpected(tagsRequest, tagsRes, *expectedNumZonesModified); - } + ), + false, // startTransaction + txnNumber, + expectedNumZonesModified); } /** @@ -388,95 +374,6 @@ std::vector<ShardId> extractShardIds(const std::vector<T>& participantShardEntri [](auto& shardEntry) { return shardEntry.getId(); }); return shardIds; } - -// -// Helper methods for ensuring donors/ recipients are able to notice when certain state transitions -// occur. -// -// Donors/ recipients learn when to transition states by noticing a change in shard versions for one -// of the two collections involved in the resharding operations. -// -// Donors are notified when shard versions spanning the original resharding collection are -// incremented. Recipients are notified when shard versions spanning the temporary resharding -// collection are incremented. -// - -/** - * Maps which participants are to be notified when the coordinator transitions into a given state. - */ -enum class ParticipantsToNofityEnum { kDonors, kRecipients, kNone }; -stdx::unordered_map<CoordinatorStateEnum, ParticipantsToNofityEnum> notifyForStateTransition{ - {CoordinatorStateEnum::kUnused, ParticipantsToNofityEnum::kNone}, - {CoordinatorStateEnum::kInitializing, ParticipantsToNofityEnum::kNone}, - {CoordinatorStateEnum::kPreparingToDonate, ParticipantsToNofityEnum::kDonors}, - {CoordinatorStateEnum::kCloning, ParticipantsToNofityEnum::kRecipients}, - {CoordinatorStateEnum::kMirroring, ParticipantsToNofityEnum::kDonors}, - {CoordinatorStateEnum::kCommitted, ParticipantsToNofityEnum::kNone}, - {CoordinatorStateEnum::kRenaming, ParticipantsToNofityEnum::kRecipients}, - {CoordinatorStateEnum::kDropping, ParticipantsToNofityEnum::kDonors}, - {CoordinatorStateEnum::kDone, ParticipantsToNofityEnum::kNone}, - {CoordinatorStateEnum::kError, ParticipantsToNofityEnum::kNone}, -}; - -/** - * Runs resharding metadata changes in a transaction. - * - * This function should only be called if donor and recipient shards DO NOT need to be informed of - * the updatedCoordinatorDoc's state transition. If donor or recipient shards need to be informed, - * instead call bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn(). - */ -void executeStateTransitionAndMetadataChangesInTxn( - OperationContext* opCtx, - const ReshardingCoordinatorDocument& updatedCoordinatorDoc, - unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc) { - const auto& state = updatedCoordinatorDoc.getState(); - invariant(notifyForStateTransition.find(state) != notifyForStateTransition.end()); - invariant(notifyForStateTransition[state] == ParticipantsToNofityEnum::kNone); - - // Neither donors nor recipients need to be informed of the transition to - // updatedCoordinatorDoc's state. - ShardingCatalogManager::withTransaction(opCtx, - NamespaceString::kConfigReshardingOperationsNamespace, - [&](OperationContext* opCtx, TxnNumber txnNumber) { - changeMetadataFunc(opCtx, txnNumber); - }); -} - -/** - * In a single transaction, bumps the shard version for each shard spanning the corresponding - * resharding collection and executes changeMetadataFunc. - * - * This function should only be called if donor or recipient shards need to be informed of the - * updatedCoordinatorDoc's state transition. If donor or recipient shards do not need to be - * informed, instead call executeStateTransitionAndMetadataChangesInTxn(). - */ -void bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn( - OperationContext* opCtx, - const ReshardingCoordinatorDocument& updatedCoordinatorDoc, - unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc) { - const auto& state = updatedCoordinatorDoc.getState(); - invariant(notifyForStateTransition.find(state) != notifyForStateTransition.end()); - invariant(notifyForStateTransition[state] != ParticipantsToNofityEnum::kNone); - - auto participantsToNotify = notifyForStateTransition[state]; - if (participantsToNotify == ParticipantsToNofityEnum::kDonors) { - // Bump the donor shard versions for the original namespace along with updating the - // metadata. - ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( - opCtx, - updatedCoordinatorDoc.getNss(), - extractShardIds(updatedCoordinatorDoc.getDonorShards()), - std::move(changeMetadataFunc)); - } else if (participantsToNotify == ParticipantsToNofityEnum::kRecipients) { - // Bump the recipient shard versions for the original namespace along with updating the - // metadata. - ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( - opCtx, - updatedCoordinatorDoc.getTempReshardingNss(), - extractShardIds(updatedCoordinatorDoc.getRecipientShards()), - std::move(changeMetadataFunc)); - } -} } // namespace namespace resharding { @@ -516,25 +413,27 @@ void persistInitialStateAndCatalogUpdates(OperationContext* opCtx, opCtx, coordinatorDoc.getNss(), repl::ReadConcernLevel::kMajorityReadConcern)); const auto collation = originalCollType.value.getDefaultCollation(); - bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn( - opCtx, coordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) { - // Insert state doc to config.reshardingOperations. - writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); + withAlternateSession(opCtx, [=](OperationContext* opCtx, TxnNumber txnNumber) { + // Insert state doc to config.reshardingOperations + writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); + + // Update the config.collections entry for the original collection to include + // 'reshardingFields' + updateConfigCollectionsForOriginalNss(opCtx, coordinatorDoc, boost::none, txnNumber); - // Update the config.collections entry for the original collection to include - // 'reshardingFields' - updateConfigCollectionsForOriginalNss(opCtx, coordinatorDoc, boost::none, txnNumber); + // Insert the config.collections entry for the temporary resharding collection. The chunks + // all have the same epoch, so picking the last chunk here is arbitrary. + auto chunkVersion = initialChunks.back().getVersion(); + writeToConfigCollectionsForTempNss( + opCtx, coordinatorDoc, chunkVersion, collation, txnNumber); - // Insert the config.collections entry for the temporary resharding collection. The - // chunks all have the same epoch, so picking the last chunk here is arbitrary. - auto chunkVersion = initialChunks.back().getVersion(); - writeToConfigCollectionsForTempNss( - opCtx, coordinatorDoc, chunkVersion, collation, txnNumber); + // Insert new initial chunk and tag documents + insertChunkAndTagDocsForTempNss( + opCtx, std::move(initialChunks), std::move(newZones), txnNumber); - // Insert new initial chunk and tag documents. - insertChunkAndTagDocsForTempNss( - opCtx, std::move(initialChunks), std::move(newZones), txnNumber); - }); + // Commit the transaction + ShardingCatalogManager::get(opCtx)->commitTxnForConfigDocument(opCtx, txnNumber); + }); } void persistCommittedState(OperationContext* opCtx, @@ -542,44 +441,39 @@ void persistCommittedState(OperationContext* opCtx, OID newCollectionEpoch, boost::optional<int> expectedNumChunksModified, boost::optional<int> expectedNumZonesModified) { - executeStateTransitionAndMetadataChangesInTxn( - opCtx, coordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) { - // Update the config.reshardingOperations entry - writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); - - // Remove the config.collections entry for the temporary collection - writeToConfigCollectionsForTempNss( - opCtx, coordinatorDoc, boost::none, boost::none, txnNumber); + withAlternateSession(opCtx, [=](OperationContext* opCtx, TxnNumber txnNumber) { + // Update the config.reshardingOperations entry + writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); - // Update the config.collections entry for the original namespace to reflect the new - // shard key, new epoch, and new UUID - updateConfigCollectionsForOriginalNss( - opCtx, coordinatorDoc, newCollectionEpoch, txnNumber); - - // Remove all chunk and tag documents associated with the original collection, then - // update the chunk and tag docs currently associated with the temp nss to be associated - // with the original nss - removeChunkAndTagsDocsForOriginalNss(opCtx, coordinatorDoc, txnNumber); - updateChunkAndTagsDocsForTempNss(opCtx, - coordinatorDoc, - newCollectionEpoch, - expectedNumChunksModified, - expectedNumZonesModified, - txnNumber); - }); + // Remove the config.collections entry for the temporary collection + writeToConfigCollectionsForTempNss( + opCtx, coordinatorDoc, boost::none, boost::none, txnNumber); + + // Update the config.collections entry for the original namespace to reflect the new shard + // key, new epoch, and new UUID + updateConfigCollectionsForOriginalNss(opCtx, coordinatorDoc, newCollectionEpoch, txnNumber); + + // Remove all chunk and tag documents associated with the original collection, then update + // the chunk and tag docs currently associated with the temp nss to be associated with the + // original nss + removeChunkAndTagsDocsForOriginalNss(opCtx, coordinatorDoc, txnNumber); + updateChunkAndTagsDocsForTempNss(opCtx, + coordinatorDoc, + newCollectionEpoch, + expectedNumChunksModified, + expectedNumZonesModified, + txnNumber); + + // Commit the transaction + ShardingCatalogManager::get(opCtx)->commitTxnForConfigDocument(opCtx, txnNumber); + }); } -void persistStateTransitionAndCatalogUpdatesThenBumpShardVersions( - OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc) { +void persistStateTransition(OperationContext* opCtx, + const ReshardingCoordinatorDocument& coordinatorDoc) { // Run updates to config.reshardingOperations and config.collections in a transaction auto nextState = coordinatorDoc.getState(); - invariant(notifyForStateTransition.find(nextState) != notifyForStateTransition.end()); - // TODO SERVER-51800 Remove special casing for kError. - invariant(nextState == CoordinatorStateEnum::kError || - notifyForStateTransition[nextState] != ParticipantsToNofityEnum::kNone); - - // Resharding metadata changes to be executed. - auto changeMetadataFunc = [&](OperationContext* opCtx, TxnNumber txnNumber) { + withAlternateSession(opCtx, [=](OperationContext* opCtx, TxnNumber txnNumber) { // Update the config.reshardingOperations entry writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); @@ -588,36 +482,31 @@ void persistStateTransitionAndCatalogUpdatesThenBumpShardVersions( // Update the config.collections entry for the temporary resharding collection. If we've // already committed this operation, we've removed the entry for the temporary - // collection and updated the entry with original namespace to have the new shard key, - // UUID, and epoch + // collection and updated the entry with original namespace to have the new shard key, UUID, + // and epoch if (nextState < CoordinatorStateEnum::kCommitted || nextState == CoordinatorStateEnum::kError) { writeToConfigCollectionsForTempNss( opCtx, coordinatorDoc, boost::none, boost::none, txnNumber); } - }; - - // TODO SERVER-51800 Remove special casing for kError. - if (nextState == CoordinatorStateEnum::kError) { - executeStateTransitionAndMetadataChangesInTxn( - opCtx, coordinatorDoc, std::move(changeMetadataFunc)); - return; - } - bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn( - opCtx, coordinatorDoc, std::move(changeMetadataFunc)); + // Commit the transaction + ShardingCatalogManager::get(opCtx)->commitTxnForConfigDocument(opCtx, txnNumber); + }); } void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc) { - executeStateTransitionAndMetadataChangesInTxn( - opCtx, coordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) { - // Remove entry for this resharding operation from config.reshardingOperations - writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); + withAlternateSession(opCtx, [=](OperationContext* opCtx, TxnNumber txnNumber) { + // Remove entry for this resharding operation from config.reshardingOperations + writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); - // Remove the resharding fields from the config.collections entry - updateConfigCollectionsForOriginalNss(opCtx, coordinatorDoc, boost::none, txnNumber); - }); + // Remove the resharding fields from the config.collections entry + updateConfigCollectionsForOriginalNss(opCtx, coordinatorDoc, boost::none, txnNumber); + + // Commit the transaction + ShardingCatalogManager::get(opCtx)->commitTxnForConfigDocument(opCtx, txnNumber); + }); } } // namespace resharding @@ -652,19 +541,17 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run( .then([this, executor] { return _awaitAllRecipientsFinishedApplying(executor); }) .then([this, executor] { _tellAllDonorsToRefresh(executor); }) .then([this, executor] { return _awaitAllRecipientsInStrictConsistency(executor); }) - .then([this](const ReshardingCoordinatorDocument& updatedCoordinatorDoc) { - return _commit(updatedCoordinatorDoc); + .then([this](const ReshardingCoordinatorDocument& updatedStateDoc) { + return _commit(updatedStateDoc); }) .then([this] { if (_coordinatorDoc.getState() > CoordinatorStateEnum::kRenaming) { return; } - _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kRenaming, - _coordinatorDoc); + this->_runUpdates(CoordinatorStateEnum::kRenaming, _coordinatorDoc); return; }) - .then([this, executor] { _tellAllRecipientsToRefresh(executor); }) .then([this, executor] { return _awaitAllRecipientsRenamedCollection(executor); }) .then([this, executor] { _tellAllDonorsToRefresh(executor); }) .then([this, executor] { return _awaitAllDonorsDroppedOriginalCollection(executor); }) @@ -677,8 +564,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run( return status; } - _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kError, - _coordinatorDoc); + _runUpdates(CoordinatorStateEnum::kError, _coordinatorDoc); LOGV2(4956902, "Resharding failed", @@ -776,12 +662,11 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonat return _reshardingCoordinatorObserver->awaitAllDonorsReadyToDonate() .thenRunOn(**executor) - .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { + .then([this](ReshardingCoordinatorDocument updatedStateDoc) { auto highestMinFetchTimestamp = - getHighestMinFetchTimestamp(coordinatorDocChangedOnDisk.getDonorShards()); - _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kCloning, - coordinatorDocChangedOnDisk, - highestMinFetchTimestamp); + getHighestMinFetchTimestamp(updatedStateDoc.getDonorShards()); + this->_runUpdates( + CoordinatorStateEnum::kCloning, updatedStateDoc, highestMinFetchTimestamp); }); } @@ -794,9 +679,8 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished return _reshardingCoordinatorObserver->awaitAllRecipientsFinishedCloning() .thenRunOn(**executor) - .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { - this->_updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kApplying, - coordinatorDocChangedOnDisk); + .then([this](ReshardingCoordinatorDocument updatedStateDoc) { + this->_runUpdates(CoordinatorStateEnum::kApplying, updatedStateDoc); }); } @@ -809,9 +693,8 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished return _reshardingCoordinatorObserver->awaitAllRecipientsFinishedApplying() .thenRunOn(**executor) - .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { - this->_updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kMirroring, - coordinatorDocChangedOnDisk); + .then([this](ReshardingCoordinatorDocument updatedStateDoc) { + this->_runUpdates(CoordinatorStateEnum::kMirroring, updatedStateDoc); }); } @@ -827,12 +710,12 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsInStrict } Future<void> ReshardingCoordinatorService::ReshardingCoordinator::_commit( - const ReshardingCoordinatorDocument& coordinatorDoc) { + const ReshardingCoordinatorDocument& updatedDoc) { if (_coordinatorDoc.getState() > CoordinatorStateEnum::kMirroring) { return Status::OK(); } - ReshardingCoordinatorDocument updatedCoordinatorDoc = coordinatorDoc; + ReshardingCoordinatorDocument updatedCoordinatorDoc = updatedDoc; updatedCoordinatorDoc.setState(CoordinatorStateEnum::kCommitted); // Get the number of initial chunks and new zones that we inserted during initialization in @@ -876,9 +759,8 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsRenamedC return _reshardingCoordinatorObserver->awaitAllRecipientsRenamedCollection() .thenRunOn(**executor) - .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { - _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kDropping, - coordinatorDocChangedOnDisk); + .then([this](ReshardingCoordinatorDocument updatedStateDoc) { + this->_runUpdates(CoordinatorStateEnum::kDropping, updatedStateDoc); }); } @@ -891,24 +773,22 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsDroppedOrigi return _reshardingCoordinatorObserver->awaitAllDonorsDroppedOriginalCollection() .thenRunOn(**executor) - .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { - _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kDone, - coordinatorDocChangedOnDisk); + .then([this](ReshardingCoordinatorDocument updatedStateDoc) { + this->_runUpdates(CoordinatorStateEnum::kDone, updatedStateDoc); }); } -void ReshardingCoordinatorService::ReshardingCoordinator:: - _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum nextState, - ReshardingCoordinatorDocument coordinatorDoc, - boost::optional<Timestamp> fetchTimestamp) { +void ReshardingCoordinatorService::ReshardingCoordinator::_runUpdates( + CoordinatorStateEnum nextState, + ReshardingCoordinatorDocument updatedStateDoc, + boost::optional<Timestamp> fetchTimestamp) { // Build new state doc for coordinator state update - ReshardingCoordinatorDocument updatedCoordinatorDoc = coordinatorDoc; + ReshardingCoordinatorDocument updatedCoordinatorDoc = updatedStateDoc; updatedCoordinatorDoc.setState(nextState); emplaceFetchTimestampIfExists(updatedCoordinatorDoc, std::move(fetchTimestamp)); auto opCtx = cc().makeOperationContext(); - resharding::persistStateTransitionAndCatalogUpdatesThenBumpShardVersions(opCtx.get(), - updatedCoordinatorDoc); + resharding::persistStateTransition(opCtx.get(), updatedCoordinatorDoc); // Update in-memory coordinator doc _coordinatorDoc = updatedCoordinatorDoc; diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h index adb161a710c..cfcfb5badc8 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h @@ -59,8 +59,8 @@ void persistCommittedState(OperationContext* opCtx, boost::optional<int> expectedNumChunksModified, boost::optional<int> expectedNumZonesModified); -void persistStateTransitionAndCatalogUpdatesThenBumpShardVersions( - OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc); +void persistStateTransition(OperationContext* opCtx, + const ReshardingCoordinatorDocument& coordinatorDoc); void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc); @@ -201,10 +201,9 @@ private: * Updates the entry for this resharding operation in config.reshardingOperations and the * catalog entries for the original and temporary namespaces in config.collections. */ - void _updateCoordinatorDocStateAndCatalogEntries( - CoordinatorStateEnum nextState, - ReshardingCoordinatorDocument coordinatorDoc, - boost::optional<Timestamp> fetchTimestamp = boost::none); + void _runUpdates(CoordinatorStateEnum nextState, + ReshardingCoordinatorDocument updatedStateDoc, + boost::optional<Timestamp> fetchTimestamp = boost::none); /** * Sends 'flushRoutingTableCacheUpdatesWithWriteConcern' for the temporary namespace to all diff --git a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp index 0b423a7dbf2..4ee4d193ff1 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp @@ -27,8 +27,6 @@ * it in the license file. */ -#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest - #include "mongo/platform/basic.h" #include <boost/optional.hpp> @@ -43,7 +41,6 @@ #include "mongo/db/s/resharding_util.h" #include "mongo/db/s/transaction_coordinator_service.h" #include "mongo/db/session_catalog_mongod.h" -#include "mongo/logv2/log.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/unittest/unittest.h" @@ -121,34 +118,6 @@ protected: return collType; } - // Returns the chunk for the donor shard. - ChunkType makeAndInsertChunksForDonorShard(const NamespaceString& nss, - OID epoch, - const ShardKeyPattern& shardKey, - std::vector<OID> ids) { - auto chunks = makeChunks(nss, epoch, shardKey, ids); - - // Only the chunk corresponding to shard0000 is stored as a donor in the coordinator state - // document constructed. - auto donorChunk = chunks[0]; - insertChunkAndZoneEntries({donorChunk}, {}); - return donorChunk; - } - - // Returns the chunk for the recipient shard. - ChunkType makeAndInsertChunksForRecipientShard(const NamespaceString& nss, - OID epoch, - const ShardKeyPattern& shardKey, - std::vector<OID> ids) { - auto chunks = makeChunks(nss, epoch, shardKey, ids); - - // Only the chunk corresponding to shard0001 is stored as a recipient in the coordinator - // state document constructed. - auto recipientChunk = chunks[1]; - insertChunkAndZoneEntries({recipientChunk}, {}); - return recipientChunk; - } - std::vector<ChunkType> makeChunks(const NamespaceString& nss, OID epoch, const ShardKeyPattern& shardKey, @@ -477,8 +446,7 @@ protected: void persistStateTransitionUpdateExpectSuccess( OperationContext* opCtx, ReshardingCoordinatorDocument expectedCoordinatorDoc) { - resharding::persistStateTransitionAndCatalogUpdatesThenBumpShardVersions( - opCtx, expectedCoordinatorDoc); + resharding::persistStateTransition(opCtx, expectedCoordinatorDoc); // Check that config.reshardingOperations and config.collections entries are updated // correctly @@ -535,22 +503,6 @@ protected: opCtx, collType, true); } - void assertChunkVersionDidNotIncreaseAfterStateTransition( - const ChunkType& chunk, const ChunkVersion& collectionVersion) { - auto chunkAfterTransition = getChunkDoc(operationContext(), chunk.getMin()); - ASSERT_EQ(chunkAfterTransition.getStatus(), Status::OK()); - ASSERT_EQ(chunkAfterTransition.getValue().getVersion().majorVersion(), - collectionVersion.majorVersion()); - } - - void assertChunkVersionIncreasedAfterStateTransition(const ChunkType& chunk, - const ChunkVersion& collectionVersion) { - auto chunkAfterTransition = getChunkDoc(operationContext(), chunk.getMin()); - ASSERT_EQ(chunkAfterTransition.getStatus(), Status::OK()); - ASSERT_EQ(chunkAfterTransition.getValue().getVersion().majorVersion(), - collectionVersion.majorVersion() + 1); - } - NamespaceString _originalNss = NamespaceString("db.foo"); UUID _originalUUID = UUID::gen(); OID _originalEpoch = OID::gen(); @@ -576,13 +528,6 @@ protected: TEST_F(ReshardingCoordinatorPersistenceTest, PersistInitialInfoSucceeds) { auto coordinatorDoc = makeCoordinatorDoc(CoordinatorStateEnum::kInitializing); - - // Ensure the chunks for the original namespace exist since they will be bumped as a product of - // the state transition to kPreparingToDonate. - auto donorChunk = makeAndInsertChunksForDonorShard( - _originalNss, _originalEpoch, _oldShardKey, std::vector{OID::gen(), OID::gen()}); - auto collectionVersion = donorChunk.getVersion(); - auto initialChunks = makeChunks(_tempNss, _tempEpoch, _newShardKey, std::vector{OID::gen(), OID::gen()}); auto newZones = makeZones(_tempNss, _newShardKey); @@ -593,47 +538,28 @@ TEST_F(ReshardingCoordinatorPersistenceTest, PersistInitialInfoSucceeds) { persistInitialStateAndCatalogUpdatesExpectSuccess( operationContext(), expectedCoordinatorDoc, initialChunks, newZones); - - // Confirm the shard version was increased for the donor shard. - auto donorChunkPostTransition = getChunkDoc(operationContext(), donorChunk.getMin()); - ASSERT_EQ(donorChunkPostTransition.getStatus(), Status::OK()); - ASSERT_EQ(donorChunkPostTransition.getValue().getVersion().majorVersion(), - collectionVersion.majorVersion() + 1); } TEST_F(ReshardingCoordinatorPersistenceTest, PersistBasicStateTransitionSucceeds) { auto coordinatorDoc = insertStateAndCatalogEntries(CoordinatorStateEnum::kCloning, _originalEpoch); - // Ensure the chunks for the original namespace exist since they will be bumped as a product of - // the state transition to kPreparingToDonate. - auto donorChunk = makeAndInsertChunksForDonorShard( - _originalNss, _originalEpoch, _oldShardKey, std::vector{OID::gen(), OID::gen()}); - auto collectionVersion = donorChunk.getVersion(); - // Persist the updates on disk auto expectedCoordinatorDoc = coordinatorDoc; expectedCoordinatorDoc.setState(CoordinatorStateEnum::kMirroring); persistStateTransitionUpdateExpectSuccess(operationContext(), expectedCoordinatorDoc); - assertChunkVersionIncreasedAfterStateTransition(donorChunk, collectionVersion); } TEST_F(ReshardingCoordinatorPersistenceTest, PersistFetchTimestampStateTransitionSucceeds) { auto coordinatorDoc = insertStateAndCatalogEntries(CoordinatorStateEnum::kPreparingToDonate, _originalEpoch); - auto recipientChunk = makeAndInsertChunksForRecipientShard( - _tempNss, _tempEpoch, _newShardKey, std::vector{OID::gen(), OID::gen()}); - auto collectionVersion = recipientChunk.getVersion(); - // Persist the updates on disk auto expectedCoordinatorDoc = coordinatorDoc; expectedCoordinatorDoc.setState(CoordinatorStateEnum::kCloning); emplaceFetchTimestampIfExists(expectedCoordinatorDoc, Timestamp(1, 1)); - persistStateTransitionUpdateExpectSuccess(operationContext(), expectedCoordinatorDoc); - assertChunkVersionIncreasedAfterStateTransition(recipientChunk, collectionVersion); } TEST_F(ReshardingCoordinatorPersistenceTest, PersistCommitSucceeds) { @@ -641,11 +567,8 @@ TEST_F(ReshardingCoordinatorPersistenceTest, PersistCommitSucceeds) { auto coordinatorDoc = insertStateAndCatalogEntries( CoordinatorStateEnum::kMirroring, _originalEpoch, fetchTimestamp); auto initialChunksIds = std::vector{OID::gen(), OID::gen()}; - - auto tempNssChunks = makeChunks(_tempNss, _tempEpoch, _newShardKey, initialChunksIds); - auto recipientChunk = tempNssChunks[1]; - insertChunkAndZoneEntries(tempNssChunks, makeZones(_tempNss, _newShardKey)); - + insertChunkAndZoneEntries(makeChunks(_tempNss, _tempEpoch, _newShardKey, initialChunksIds), + makeZones(_tempNss, _newShardKey)); insertChunkAndZoneEntries( makeChunks(_originalNss, OID::gen(), _oldShardKey, std::vector{OID::gen(), OID::gen()}), makeZones(_originalNss, _oldShardKey)); @@ -661,9 +584,6 @@ TEST_F(ReshardingCoordinatorPersistenceTest, PersistCommitSucceeds) { persistCommittedStateExpectSuccess( operationContext(), expectedCoordinatorDoc, fetchTimestamp, updatedChunks, updatedZones); - - assertChunkVersionDidNotIncreaseAfterStateTransition(recipientChunk, - recipientChunk.getVersion()); } TEST_F(ReshardingCoordinatorPersistenceTest, PersistTransitionToErrorSucceeds) { @@ -694,10 +614,9 @@ TEST_F(ReshardingCoordinatorPersistenceTest, // Do not insert initial entry into config.reshardingOperations. Attempt to update coordinator // state documents. auto coordinatorDoc = makeCoordinatorDoc(CoordinatorStateEnum::kCloning, Timestamp(1, 1)); - ASSERT_THROWS_CODE(resharding::persistStateTransitionAndCatalogUpdatesThenBumpShardVersions( - operationContext(), coordinatorDoc), + ASSERT_THROWS_CODE(resharding::persistStateTransition(operationContext(), coordinatorDoc), AssertionException, - 50577); + 5030400); } TEST_F(ReshardingCoordinatorPersistenceTest, PersistCommitDoesNotMatchChunksFails) { @@ -726,7 +645,7 @@ TEST_F(ReshardingCoordinatorPersistenceTest, PersistCommitDoesNotMatchChunksFail resharding::persistCommittedState( operationContext(), expectedCoordinatorDoc, _finalEpoch, updatedChunks.size(), 0), AssertionException, - 5030401); + 5030400); } TEST_F(ReshardingCoordinatorPersistenceTest, diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp index d7b20524d75..1c8eeab3a90 100644 --- a/src/mongo/db/s/resharding_util.cpp +++ b/src/mongo/db/s/resharding_util.cpp @@ -49,7 +49,6 @@ #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/document_source_unwind.h" #include "mongo/db/s/collection_sharding_state.h" -#include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/storage/write_unit_of_work.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -152,6 +151,54 @@ NamespaceString constructTemporaryReshardingNss(StringData db, const UUID& sourc sourceUuid.toString())); } +BatchedCommandRequest buildInsertOp(const NamespaceString& nss, std::vector<BSONObj> docs) { + BatchedCommandRequest request([&] { + write_ops::Insert insertOp(nss); + insertOp.setDocuments(docs); + return insertOp; + }()); + + return request; +} + +BatchedCommandRequest buildUpdateOp(const NamespaceString& nss, + const BSONObj& query, + const BSONObj& update, + bool upsert, + bool multi) { + BatchedCommandRequest request([&] { + write_ops::Update updateOp(nss); + updateOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(query); + entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(update)); + entry.setUpsert(upsert); + entry.setMulti(multi); + return entry; + }()}); + return updateOp; + }()); + + return request; +} + +BatchedCommandRequest buildDeleteOp(const NamespaceString& nss, + const BSONObj& query, + bool multiDelete) { + BatchedCommandRequest request([&] { + write_ops::Delete deleteOp(nss); + deleteOp.setDeletes({[&] { + write_ops::DeleteOpEntry entry; + entry.setQ(query); + entry.setMulti(multiDelete); + return entry; + }()}); + return deleteOp; + }()); + + return request; +} + void tellShardsToRefresh(OperationContext* opCtx, const std::vector<ShardId>& shardIds, const NamespaceString& nss, diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h index bec052dc1e0..33c491a4d23 100644 --- a/src/mongo/db/s/resharding_util.h +++ b/src/mongo/db/s/resharding_util.h @@ -102,6 +102,27 @@ UUID getCollectionUUIDFromChunkManger(const NamespaceString& nss, const ChunkMan NamespaceString constructTemporaryReshardingNss(StringData db, const UUID& sourceUuid); /** + * Constructs a BatchedCommandRequest with batch type 'Insert'. + */ +BatchedCommandRequest buildInsertOp(const NamespaceString& nss, std::vector<BSONObj> docs); + +/** + * Constructs a BatchedCommandRequest with batch type 'Update'. + */ +BatchedCommandRequest buildUpdateOp(const NamespaceString& nss, + const BSONObj& query, + const BSONObj& update, + bool upsert, + bool multi); + +/** + * Constructs a BatchedCommandRequest with batch type 'Delete'. + */ +BatchedCommandRequest buildDeleteOp(const NamespaceString& nss, + const BSONObj& query, + bool multiDelete); + +/** * Sends _flushRoutingTableCacheUpdatesWithWriteConcern to a list of shards. Throws if one of the * shards fails to refresh. */ |