diff options
author | Blake Oler <blake.oler@mongodb.com> | 2020-10-21 20:17:10 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-11-03 00:43:59 +0000 |
commit | 411dab1fa8060e48e11bf67b396be3ff24b94d3b (patch) | |
tree | 57fdf9ca5f48e77a0dabe792e0d9795d03435db6 /src | |
parent | dbd149f6c61abb501bac289fd4e6136aa9bbfc36 (diff) | |
download | mongo-411dab1fa8060e48e11bf67b396be3ff24b94d3b.tar.gz |
SERVER-51291 Increment shard version when changing original or temporary resharding collection
Diffstat (limited to 'src')
11 files changed, 845 insertions, 369 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 1a438554288..4aad6f8db57 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -162,7 +162,7 @@ env.Library( '$BUILD_DIR/mongo/s/common_s', '$BUILD_DIR/mongo/s/grid', 'sharding_api_d', - + 'sharding_catalog_manager', ], ) @@ -561,6 +561,7 @@ env.CppUnitTest( 'config/sharding_catalog_manager_add_shard_test.cpp', 'config/sharding_catalog_manager_add_shard_to_zone_test.cpp', 'config/sharding_catalog_manager_assign_key_range_to_zone_test.cpp', + 'config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp', 'config/sharding_catalog_manager_clear_jumbo_flag_test.cpp', 'config/sharding_catalog_manager_commit_chunk_migration_test.cpp', 'config/sharding_catalog_manager_config_initialization_test.cpp', diff --git a/src/mongo/db/s/config/sharding_catalog_manager.cpp b/src/mongo/db/s/config/sharding_catalog_manager.cpp index 8d0b36cac78..ff8e7c04f8d 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager.cpp @@ -35,7 +35,9 @@ #include "mongo/db/auth/authorization_session_impl.h" #include "mongo/db/operation_context.h" +#include "mongo/db/query/query_request.h" #include "mongo/db/s/balancer/type_migration.h" +#include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/config_server_version.h" #include "mongo/s/catalog/sharding_catalog_client.h" @@ -86,9 +88,24 @@ OpMsg runCommandInLocalTxn(OperationContext* opCtx, .response); } -BSONObj runCommitOrAbortTxnForConfigDocument(OperationContext* opCtx, - TxnNumber txnNumber, - std::string cmdName) { +void startTransactionWithNoopFind(OperationContext* opCtx, + const NamespaceString& nss, + TxnNumber txnNumber) { + BSONObjBuilder findCmdBuilder; + QueryRequest qr(nss); + qr.setBatchSize(0); + qr.setWantMore(false); + qr.asFindCommand(&findCmdBuilder); + + auto res = runCommandInLocalTxn( + opCtx, nss.db(), true /*startTransaction*/, txnNumber, findCmdBuilder.done()) + .body; + uassertStatusOK(getStatusFromCommandResult(res)); +} + +BSONObj commitOrAbortTransaction(OperationContext* opCtx, + TxnNumber txnNumber, + std::string cmdName) { // Swap out the clients in order to get a fresh opCtx. Previous operations in this transaction // that have been run on this opCtx would have set the timeout in the locker on the opCtx, but // commit should not have a lock timeout. @@ -124,6 +141,24 @@ BSONObj runCommitOrAbortTxnForConfigDocument(OperationContext* opCtx, return replyOpMsg.body; } +// Runs commit for the transaction with 'txnNumber'. +void commitTransaction(OperationContext* opCtx, TxnNumber txnNumber) { + auto response = commitOrAbortTransaction(opCtx, txnNumber, "commitTransaction"); + uassertStatusOK(getStatusFromCommandResult(response)); + uassertStatusOK(getWriteConcernStatusFromCommandResult(response)); +} + +// Runs abort for the transaction with 'txnNumber'. +void abortTransaction(OperationContext* opCtx, TxnNumber txnNumber) { + auto response = commitOrAbortTransaction(opCtx, txnNumber, "abortTransaction"); + + auto status = getStatusFromCommandResult(response); + if (status.code() != ErrorCodes::NoSuchTransaction) { + uassertStatusOK(status); + uassertStatusOK(getWriteConcernStatusFromCommandResult(response)); + } +} + } // namespace void ShardingCatalogManager::create(ServiceContext* serviceContext, @@ -466,17 +501,21 @@ StatusWith<bool> ShardingCatalogManager::_isShardRequiredByZoneStillInUse( BSONObj ShardingCatalogManager::writeToConfigDocumentInTxn(OperationContext* opCtx, const NamespaceString& nss, const BatchedCommandRequest& request, - bool startTransaction, TxnNumber txnNumber) { invariant(nss.db() == NamespaceString::kConfigDb); - return runCommandInLocalTxn(opCtx, nss.db(), startTransaction, txnNumber, request.toBSON()) - .body; + auto response = runCommandInLocalTxn( + opCtx, nss.db(), false /* startTransaction */, txnNumber, request.toBSON()) + .body; + + uassertStatusOK(getStatusFromCommandResult(response)); + uassertStatusOK(getWriteConcernStatusFromCommandResult(response)); + + return response; } void ShardingCatalogManager::insertConfigDocumentsInTxn(OperationContext* opCtx, const NamespaceString& nss, std::vector<BSONObj> docs, - bool startTransaction, TxnNumber txnNumber) { invariant(nss.db() == NamespaceString::kConfigDb); @@ -491,8 +530,7 @@ void ShardingCatalogManager::insertConfigDocumentsInTxn(OperationContext* opCtx, return insertOp; }()); - uassertStatusOK(getStatusFromWriteCommandReply( - writeToConfigDocumentInTxn(opCtx, nss, request, startTransaction, txnNumber))); + writeToConfigDocumentInTxn(opCtx, nss, request, txnNumber); }; while (!docs.empty()) { @@ -520,22 +558,29 @@ void ShardingCatalogManager::insertConfigDocumentsInTxn(OperationContext* opCtx, doBatchInsert(); } -void ShardingCatalogManager::commitTxnForConfigDocument(OperationContext* opCtx, - TxnNumber txnNumber) { - auto response = runCommitOrAbortTxnForConfigDocument(opCtx, txnNumber, "commitTransaction"); - uassertStatusOK(getStatusFromCommandResult(response)); - uassertStatusOK(getWriteConcernStatusFromCommandResult(response)); -} - -void ShardingCatalogManager::abortTxnForConfigDocument(OperationContext* opCtx, - TxnNumber txnNumber) { - auto response = runCommitOrAbortTxnForConfigDocument(opCtx, txnNumber, "abortTransaction"); +void ShardingCatalogManager::withTransaction( + OperationContext* opCtx, + const NamespaceString& namespaceForInitialFind, + unique_function<void(OperationContext*, TxnNumber)> func) { + AlternativeSessionRegion asr(opCtx); + AuthorizationSession::get(asr.opCtx()->getClient()) + ->grantInternalAuthorization(asr.opCtx()->getClient()); + TxnNumber txnNumber = 0; + + auto guard = makeGuard([opCtx = asr.opCtx(), txnNumber] { + try { + abortTransaction(opCtx, txnNumber); + } catch (DBException& e) { + LOGV2_WARNING(5192100, + "Failed to abort transaction in AlternativeSessionRegion", + "error"_attr = redact(e)); + } + }); - auto status = getStatusFromCommandResult(response); - if (status.code() != ErrorCodes::NoSuchTransaction) { - uassertStatusOK(status); - uassertStatusOK(getWriteConcernStatusFromCommandResult(response)); - } + startTransactionWithNoopFind(asr.opCtx(), namespaceForInitialFind, txnNumber); + func(asr.opCtx(), txnNumber); + commitTransaction(asr.opCtx(), txnNumber); + guard.dismiss(); } } // namespace mongo diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h index a6f97372025..fafcbd62897 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.h +++ b/src/mongo/db/s/config/sharding_catalog_manager.h @@ -30,7 +30,9 @@ #pragma once #include "mongo/base/status_with.h" +#include "mongo/db/auth/authorization_session.h" #include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/logical_session_cache.h" #include "mongo/db/repl/optime_with.h" #include "mongo/db/s/config/namespace_serializer.h" #include "mongo/executor/task_executor.h" @@ -183,14 +185,28 @@ public: */ Lock::ExclusiveLock lockZoneMutex(OperationContext* opCtx); + // + // General utilities related to the ShardingCatalogManager + // + + /** + * Starts and commits a transaction on the config server, with a no-op find on the specified + * namespace in order to internally start the transaction. All writes done inside the + * passed-in function must assume that they are run inside a transaction that will be commited + * after the function itself has completely finished. + */ + static void withTransaction(OperationContext* opCtx, + const NamespaceString& namespaceForInitialFind, + unique_function<void(OperationContext*, TxnNumber)> func); + /** * Runs the write 'request' on namespace 'nss' in a transaction with 'txnNumber'. Write must be - * on a collection in the config database. + * on a collection in the config database. If expectedNumModified is specified, the number of + * documents modified must match expectedNumModified - throws otherwise. */ BSONObj writeToConfigDocumentInTxn(OperationContext* opCtx, const NamespaceString& nss, const BatchedCommandRequest& request, - bool startTransaction, TxnNumber txnNumber); /** @@ -201,19 +217,8 @@ public: void insertConfigDocumentsInTxn(OperationContext* opCtx, const NamespaceString& nss, std::vector<BSONObj> docs, - bool startTransaction, TxnNumber txnNumber); - /** - * Runs commit for the transaction with 'txnNumber'. - */ - void commitTxnForConfigDocument(OperationContext* opCtx, TxnNumber txnNumber); - - /** - * Runs abort for the transaction with 'txnNumber'. - */ - void abortTxnForConfigDocument(OperationContext* opCtx, TxnNumber txnNumber); - // // Chunk Operations // @@ -283,6 +288,20 @@ public: const BSONObj& maxKey, const ChunkVersion& version); + /** + * In a single transaction, effectively bumps the shard version for each shard in the collection + * to be the current collection version's major version + 1 inside an already-running + * transaction. + * + * Note: it's the responsibility of the caller to ensure that the list of shards is stable, + * as any shards added after the shard ids have been passed in will be missed. + */ + void bumpCollShardVersionsAndChangeMetadataInTxn( + OperationContext* opCtx, + const NamespaceString& nss, + const std::vector<ShardId>& shardIds, + unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc); + // // Database Operations // @@ -381,7 +400,6 @@ public: const NamespaceString& nss, const CollectionType& coll, const bool upsert, - const bool startTransaction, TxnNumber txnNumber); /** diff --git a/src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp new file mode 100644 index 00000000000..7df7df0e6c7 --- /dev/null +++ b/src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp @@ -0,0 +1,221 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#include "mongo/platform/basic.h" + +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/logical_session_cache_noop.h" +#include "mongo/db/repl/wait_for_majority_service.h" +#include "mongo/db/s/config/config_server_test_fixture.h" +#include "mongo/db/s/config/sharding_catalog_manager.h" +#include "mongo/db/s/transaction_coordinator_service.h" +#include "mongo/db/session_catalog_mongod.h" +#include "mongo/db/transaction_participant.h" +#include "mongo/logv2/log.h" + +namespace mongo { +namespace { + +const NamespaceString kNss("TestDB", "TestColl"); +const ShardType kShard0("shard0000", "shard0000:1234"); +const ShardType kShard1("shard0001", "shard0001:1234"); + +class ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest + : public ConfigServerTestFixture { + void setUp() { + ConfigServerTestFixture::setUp(); + setupShards({kShard0, kShard1}); + + // Create config.transactions collection. + auto opCtx = operationContext(); + DBDirectClient client(opCtx); + client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns()); + client.createCollection(CollectionType::ConfigNS.ns()); + + LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>()); + TransactionCoordinatorService::get(operationContext()) + ->onShardingInitialization(operationContext(), true); + } + + void tearDown() { + TransactionCoordinatorService::get(operationContext())->onStepDown(); + ConfigServerTestFixture::tearDown(); + } + +protected: + ChunkType generateChunkType(const NamespaceString& nss, + const ChunkVersion& chunkVersion, + const ShardId& shardId, + const BSONObj& minKey, + const BSONObj& maxKey) { + ChunkType chunkType; + chunkType.setName(OID::gen()); + chunkType.setNS(nss); + chunkType.setVersion(chunkVersion); + chunkType.setShard(shardId); + chunkType.setMin(minKey); + chunkType.setMax(maxKey); + chunkType.setHistory({ChunkHistory(Timestamp(100, 0), shardId)}); + return chunkType; + } + + /** + * Determines if the chunk's version has been bumped to the targetChunkVersion. + */ + bool chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( + const ChunkType& chunkTypeBefore, + const StatusWith<ChunkType> swChunkTypeAfter, + const ChunkVersion& targetChunkVersion) { + ASSERT_OK(swChunkTypeAfter.getStatus()); + auto chunkTypeAfter = swChunkTypeAfter.getValue(); + + // Regardless of whether the major version was bumped, the chunk's other fields should be + // unchanged. + ASSERT_EQ(chunkTypeBefore.getName(), chunkTypeAfter.getName()); + ASSERT_EQ(chunkTypeBefore.getNS(), chunkTypeAfter.getNS()); + ASSERT_BSONOBJ_EQ(chunkTypeBefore.getMin(), chunkTypeAfter.getMin()); + ASSERT_BSONOBJ_EQ(chunkTypeBefore.getMax(), chunkTypeAfter.getMax()); + ASSERT(chunkTypeBefore.getHistory() == chunkTypeAfter.getHistory()); + + return chunkTypeAfter.getVersion().majorVersion() == targetChunkVersion.majorVersion(); + } + + /** + * If there are multiple chunks per shard, the chunk whose version gets bumped is not + * deterministic. + * + * Asserts that only chunk per shard has its major version increased. + */ + void assertOnlyOneChunkVersionBumped(OperationContext* opCtx, + std::vector<ChunkType> originalChunkTypes, + const ChunkVersion& targetChunkVersion) { + auto aChunkVersionWasBumped = false; + for (auto originalChunkType : originalChunkTypes) { + auto swChunkTypeAfter = getChunkDoc(opCtx, originalChunkType.getMin()); + auto wasBumped = chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( + originalChunkType, swChunkTypeAfter, targetChunkVersion); + if (aChunkVersionWasBumped) { + ASSERT_FALSE(wasBumped); + } else { + aChunkVersionWasBumped = wasBumped; + } + } + + ASSERT_TRUE(aChunkVersionWasBumped); + } +}; + +TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, + BumpChunkVersionOneChunkPerShard) { + const auto epoch = OID::gen(); + const auto shard0Chunk0 = generateChunkType( + kNss, ChunkVersion(10, 1, epoch), kShard0.getName(), BSON("a" << 1), BSON("a" << 10)); + const auto shard1Chunk0 = generateChunkType( + kNss, ChunkVersion(11, 2, epoch), kShard1.getName(), BSON("a" << 11), BSON("a" << 20)); + + const auto collectionVersion = shard1Chunk0.getVersion(); + ChunkVersion targetChunkVersion( + collectionVersion.majorVersion() + 1, 0, collectionVersion.epoch()); + + setupChunks({shard0Chunk0, shard1Chunk0}); + + auto opCtx = operationContext(); + + std::vector<ShardId> shardIds{kShard0.getName(), kShard1.getName()}; + ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( + opCtx, kNss, shardIds, [&](OperationContext*, TxnNumber) {}); + + ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( + shard0Chunk0, getChunkDoc(operationContext(), shard0Chunk0.getMin()), targetChunkVersion)); + + ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( + shard1Chunk0, getChunkDoc(operationContext(), shard1Chunk0.getMin()), targetChunkVersion)); +} + +TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, + BumpChunkVersionTwoChunksOnOneShard) { + const auto epoch = OID::gen(); + const auto shard0Chunk0 = generateChunkType( + kNss, ChunkVersion(10, 1, epoch), kShard0.getName(), BSON("a" << 1), BSON("a" << 10)); + const auto shard0Chunk1 = generateChunkType( + kNss, ChunkVersion(11, 2, epoch), kShard0.getName(), BSON("a" << 11), BSON("a" << 20)); + const auto shard1Chunk0 = generateChunkType( + kNss, ChunkVersion(8, 1, epoch), kShard1.getName(), BSON("a" << 21), BSON("a" << 100)); + + const auto collectionVersion = shard0Chunk1.getVersion(); + ChunkVersion targetChunkVersion( + collectionVersion.majorVersion() + 1, 0, collectionVersion.epoch()); + + setupChunks({shard0Chunk0, shard0Chunk1, shard1Chunk0}); + + auto opCtx = operationContext(); + std::vector<ShardId> shardIds{kShard0.getName(), kShard1.getName()}; + ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( + opCtx, kNss, shardIds, [&](OperationContext*, TxnNumber) {}); + + assertOnlyOneChunkVersionBumped( + operationContext(), {shard0Chunk0, shard0Chunk1}, targetChunkVersion); + + ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( + shard1Chunk0, getChunkDoc(operationContext(), shard1Chunk0.getMin()), targetChunkVersion)); +} + +TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, + BumpChunkVersionTwoChunksOnTwoShards) { + const auto epoch = OID::gen(); + const auto shard0Chunk0 = generateChunkType( + kNss, ChunkVersion(10, 1, epoch), kShard0.getName(), BSON("a" << 1), BSON("a" << 10)); + const auto shard0Chunk1 = generateChunkType( + kNss, ChunkVersion(11, 2, epoch), kShard0.getName(), BSON("a" << 11), BSON("a" << 20)); + const auto shard1Chunk0 = generateChunkType( + kNss, ChunkVersion(8, 1, epoch), kShard1.getName(), BSON("a" << 21), BSON("a" << 100)); + const auto shard1Chunk1 = generateChunkType( + kNss, ChunkVersion(12, 1, epoch), kShard1.getName(), BSON("a" << 101), BSON("a" << 200)); + + const auto collectionVersion = shard1Chunk1.getVersion(); + ChunkVersion targetChunkVersion( + collectionVersion.majorVersion() + 1, 0, collectionVersion.epoch()); + + setupChunks({shard0Chunk0, shard0Chunk1, shard1Chunk0, shard1Chunk1}); + + auto opCtx = operationContext(); + std::vector<ShardId> shardIds{kShard0.getName(), kShard1.getName()}; + ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( + opCtx, kNss, shardIds, [&](OperationContext*, TxnNumber) {}); + + assertOnlyOneChunkVersionBumped( + operationContext(), {shard0Chunk0, shard0Chunk1}, targetChunkVersion); + + assertOnlyOneChunkVersionBumped( + operationContext(), {shard1Chunk0, shard1Chunk1}, targetChunkVersion); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp index be40c6a5ed6..d1f09247cdb 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp @@ -40,6 +40,7 @@ #include "mongo/client/read_preference.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/logical_session_cache.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/repl_client_info.h" @@ -55,6 +56,7 @@ #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" +#include "mongo/s/write_ops/batched_command_request.h" #include "mongo/util/fail_point.h" #include "mongo/util/str.h" @@ -67,6 +69,27 @@ MONGO_FAIL_POINT_DEFINE(skipExpiringOldChunkHistory); const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)); +BatchedCommandRequest buildUpdateOp(const NamespaceString& nss, + const BSONObj& query, + const BSONObj& update, + bool upsert, + bool multi) { + BatchedCommandRequest request([&] { + write_ops::Update updateOp(nss); + updateOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(query); + entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(update)); + entry.setUpsert(upsert); + entry.setMulti(multi); + return entry; + }()}); + return updateOp; + }()); + + return request; +} + /** * Append min, max and version information from chunk to the buffer for logChange purposes. */ @@ -300,13 +323,9 @@ StatusWith<ChunkVersion> getMaxChunkVersionFromQueryResponse( return ChunkVersion::parseLegacyWithField(chunksVector.front(), ChunkType::lastmod()); } -// Helper function to get collection version and donor shard version following a merge/move/split -BSONObj getShardAndCollectionVersion(OperationContext* opCtx, - const NamespaceString& nss, - const ShardId& fromShard) { - BSONObjBuilder result; - - auto swCollectionVersion = getMaxChunkVersionFromQueryResponse( +// Helper function to get the collection version for nss. Always uses kLocalReadConcern. +StatusWith<ChunkVersion> getCollectionVersion(OperationContext* opCtx, const NamespaceString& nss) { + return getMaxChunkVersionFromQueryResponse( nss, Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( opCtx, @@ -316,7 +335,15 @@ BSONObj getShardAndCollectionVersion(OperationContext* opCtx, BSON("ns" << nss.ns()), // Query all chunks for this namespace. BSON(ChunkType::lastmod << -1), // Sort by version. 1)); // Limit 1. +} + +// Helper function to get collection version and donor shard version following a merge/move/split +BSONObj getShardAndCollectionVersion(OperationContext* opCtx, + const NamespaceString& nss, + const ShardId& fromShard) { + BSONObjBuilder result; + auto swCollectionVersion = getCollectionVersion(opCtx, nss); auto collectionVersion = uassertStatusOKWithContext( std::move(swCollectionVersion), "Couldn't retrieve collection version from config server"); @@ -359,6 +386,47 @@ BSONObj getShardAndCollectionVersion(OperationContext* opCtx, return result.obj(); } +void bumpMajorVersionOneChunkPerShard(OperationContext* opCtx, + const NamespaceString& nss, + TxnNumber txnNumber, + const std::vector<ShardId>& shardIds) { + auto curCollectionVersion = uassertStatusOK(getCollectionVersion(opCtx, nss)); + ChunkVersion targetChunkVersion( + curCollectionVersion.majorVersion() + 1, 0, curCollectionVersion.epoch()); + + for (const auto& shardId : shardIds) { + BSONObjBuilder updateBuilder; + BSONObjBuilder updateVersionClause(updateBuilder.subobjStart("$set")); + targetChunkVersion.appendLegacyWithField(&updateVersionClause, ChunkType::lastmod()); + updateVersionClause.doneFast(); + auto chunkUpdate = updateBuilder.obj(); + + auto request = buildUpdateOp( + ChunkType::ConfigNS, + BSON(ChunkType::ns(nss.ns()) << ChunkType::shard(shardId.toString())), // query + chunkUpdate, // update + false, // upsert + false // multi + ); + + auto res = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn( + opCtx, ChunkType::ConfigNS, request, txnNumber); + + auto numDocsExpectedModified = 1; + auto numDocsModified = res.getIntField("n"); + + uassert(5030400, + str::stream() << "Expected to match " << numDocsExpectedModified + << " docs, but only matched " << numDocsModified + << " for write request " << request.toString(), + numDocsExpectedModified == numDocsModified); + + // There exists a constraint that a chunk version must be unique for a given namespace, + // so the minor version is incremented for each chunk placed. + targetChunkVersion.incMinor(); + } +} + } // namespace StatusWith<BSONObj> ShardingCatalogManager::commitChunkSplit( @@ -375,16 +443,7 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkSplit( Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); // Get the max chunk version for this namespace. - auto swCollVersion = getMaxChunkVersionFromQueryResponse( - nss, - Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - ChunkType::ConfigNS, - BSON("ns" << nss.ns()), // Query all chunks for this namespace. - BSON(ChunkType::lastmod << -1), // Sort by version. - 1)); // Limit 1. + auto swCollVersion = getCollectionVersion(opCtx, nss); if (!swCollVersion.isOK()) { return swCollVersion.getStatus().withContext( @@ -595,17 +654,7 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMerge( } // Get the max chunk version for this namespace. - auto swCollVersion = getMaxChunkVersionFromQueryResponse( - nss, - Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - repl::ReadConcernLevel::kLocalReadConcern, - ChunkType::ConfigNS, - BSON("ns" << nss.ns()), // Query all chunks for this namespace. - BSON(ChunkType::lastmod << -1), // Sort by version. - 1)); // Limit 1. - + auto swCollVersion = getCollectionVersion(opCtx, nss); if (!swCollVersion.isOK()) { return swCollVersion.getStatus().withContext(str::stream() << "mergeChunk cannot merge chunks."); @@ -1182,4 +1231,21 @@ void ShardingCatalogManager::ensureChunkVersionIsGreaterThan(OperationContext* o } } +void ShardingCatalogManager::bumpCollShardVersionsAndChangeMetadataInTxn( + OperationContext* opCtx, + const NamespaceString& nss, + const std::vector<ShardId>& shardIds, + unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc) { + + // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and + // migrations + Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); + withTransaction(opCtx, + NamespaceString::kConfigReshardingOperationsNamespace, + [&](OperationContext* opCtx, TxnNumber txnNumber) { + bumpMajorVersionOneChunkPerShard(opCtx, nss, txnNumber, shardIds); + changeMetadataFunc(opCtx, txnNumber); + }); +} + } // namespace mongo diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp index 50470cf2a80..a216412912c 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp @@ -141,7 +141,7 @@ BatchedCommandRequest buildUpdateOp(const NamespaceString& nss, const BSONObj& query, const BSONObj& update, bool upsert, - bool useMultiUpdate) { + bool multi) { BatchedCommandRequest request([&] { write_ops::Update updateOp(nss); updateOp.setUpdates({[&] { @@ -149,7 +149,7 @@ BatchedCommandRequest buildUpdateOp(const NamespaceString& nss, entry.setQ(query); entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(update)); entry.setUpsert(upsert); - entry.setMulti(useMultiUpdate); + entry.setMulti(multi); return entry; }()}); return updateOp; @@ -637,15 +637,10 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx, collType.setEpoch(newEpoch); collType.setKeyPattern(newShardKeyPattern.getKeyPattern()); - { + auto updateCollectionAndChunksFn = [&](OperationContext* opCtx, TxnNumber txnNumber) { // Update the config.collections entry for the given namespace. - AlternativeSessionRegion asr(opCtx); - AuthorizationSession::get(asr.opCtx()->getClient()) - ->grantInternalAuthorization(asr.opCtx()->getClient()); - TxnNumber txnNumber = 0; - updateShardingCatalogEntryForCollectionInTxn( - asr.opCtx(), nss, collType, false /* upsert */, true /* startTransaction */, txnNumber); + opCtx, nss, collType, false /* upsert */, txnNumber); LOGV2(21933, "refineCollectionShardKey updated collection entry for {namespace}: took " @@ -667,17 +662,15 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx, // to the newly-generated objectid, (ii) their bounds for each new field in the refined // key to MinKey (except for the global max chunk where the max bounds are set to // MaxKey), and unsetting (iii) their jumbo field. - uassertStatusOK(getStatusFromWriteCommandReply( - writeToConfigDocumentInTxn(asr.opCtx(), - ChunkType::ConfigNS, - buildPipelineUpdateOp(ChunkType::ConfigNS, - BSON("ns" << nss.ns()), - chunkUpdates, - false, // upsert - true // useMultiUpdate - ), - false, // startTransaction - txnNumber))); + writeToConfigDocumentInTxn(opCtx, + ChunkType::ConfigNS, + buildPipelineUpdateOp(ChunkType::ConfigNS, + BSON("ns" << nss.ns()), + chunkUpdates, + false, // upsert + true // useMultiUpdate + ), + txnNumber); LOGV2(21935, "refineCollectionShardKey: updated chunk entries for {namespace}: took " @@ -691,17 +684,15 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx, // Update all config.tags entries for the given namespace by setting their bounds for // each new field in the refined key to MinKey (except for the global max tag where the // max bounds are set to MaxKey). - uassertStatusOK(getStatusFromWriteCommandReply( - writeToConfigDocumentInTxn(asr.opCtx(), - TagsType::ConfigNS, - buildPipelineUpdateOp(TagsType::ConfigNS, - BSON("ns" << nss.ns()), - tagUpdates, - false, // upsert - true // useMultiUpdate - ), - false, // startTransaction - txnNumber))); + writeToConfigDocumentInTxn(opCtx, + TagsType::ConfigNS, + buildPipelineUpdateOp(TagsType::ConfigNS, + BSON("ns" << nss.ns()), + tagUpdates, + false, // upsert + true // useMultiUpdate + ), + txnNumber); LOGV2(21936, @@ -716,10 +707,9 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx, LOGV2(21937, "Hit hangRefineCollectionShardKeyBeforeCommit failpoint"); hangRefineCollectionShardKeyBeforeCommit.pauseWhileSet(opCtx); } + }; - // Note this will wait for majority write concern. - commitTxnForConfigDocument(asr.opCtx(), txnNumber); - } + withTransaction(opCtx, nss, std::move(updateCollectionAndChunksFn)); ShardingLogging::get(opCtx)->logChange(opCtx, "refineCollectionShardKey.end", @@ -747,9 +737,8 @@ void ShardingCatalogManager::updateShardingCatalogEntryForCollectionInTxn( const NamespaceString& nss, const CollectionType& coll, const bool upsert, - const bool startTransaction, TxnNumber txnNumber) { - auto status = getStatusFromCommandResult( + try { writeToConfigDocumentInTxn(opCtx, CollectionType::ConfigNS, buildUpdateOp(CollectionType::ConfigNS, @@ -758,9 +747,11 @@ void ShardingCatalogManager::updateShardingCatalogEntryForCollectionInTxn( upsert, false /* multi */ ), - startTransaction, - txnNumber)); - uassertStatusOKWithContext(status, "Collection metadata write failed"); + txnNumber); + } catch (DBException& e) { + e.addContext("Collection metadata write failed"); + throw; + } } } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 08f23a349e2..1bac5597a72 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -50,46 +50,62 @@ namespace mongo { namespace { -template <typename Callable> -void withAlternateSession(OperationContext* opCtx, Callable&& callable) { - AlternativeSessionRegion asr(opCtx); - AuthorizationSession::get(asr.opCtx()->getClient()) - ->grantInternalAuthorization(asr.opCtx()->getClient()); - TxnNumber txnNumber = 0; - - auto guard = makeGuard([opCtx = asr.opCtx(), txnNumber] { - try { - ShardingCatalogManager::get(opCtx)->abortTxnForConfigDocument(opCtx, txnNumber); - } catch (const DBException& ex) { - LOGV2(5165900, - "Failed to abort transaction to resharding metadata", - "error"_attr = redact(ex)); - } - }); +BatchedCommandRequest buildInsertOp(const NamespaceString& nss, std::vector<BSONObj> docs) { + BatchedCommandRequest request([&] { + write_ops::Insert insertOp(nss); + insertOp.setDocuments(docs); + return insertOp; + }()); - callable(asr.opCtx(), txnNumber); + return request; +} - guard.dismiss(); +BatchedCommandRequest buildDeleteOp(const NamespaceString& nss, + const BSONObj& query, + bool multiDelete) { + BatchedCommandRequest request([&] { + write_ops::Delete deleteOp(nss); + deleteOp.setDeletes({[&] { + write_ops::DeleteOpEntry entry; + entry.setQ(query); + entry.setMulti(multiDelete); + return entry; + }()}); + return deleteOp; + }()); + + return request; } -void writeConfigDocs(OperationContext* opCtx, - const NamespaceString& nss, - BatchedCommandRequest request, - bool startTransaction, - TxnNumber txnNumber, - boost::optional<int> expectedNumModified) { - auto response = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn( - opCtx, nss, request, startTransaction, txnNumber); - uassertStatusOK(getStatusFromCommandResult(response)); +BatchedCommandRequest buildUpdateOp(const NamespaceString& nss, + const BSONObj& query, + const BSONObj& update, + bool upsert, + bool multi) { + BatchedCommandRequest request([&] { + write_ops::Update updateOp(nss); + updateOp.setUpdates({[&] { + write_ops::UpdateOpEntry entry; + entry.setQ(query); + entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(update)); + entry.setUpsert(upsert); + entry.setMulti(multi); + return entry; + }()}); + return updateOp; + }()); - if (!expectedNumModified) - return; + return request; +} - uassert(5030400, - str::stream() << "Expected to match " << expectedNumModified - << " docs, but only matched " << response.getIntField("n") - << " for write request " << request.toString(), - response.getIntField("n") == expectedNumModified); +void assertNumDocsModifiedMatchesExpected(const BatchedCommandRequest& request, + const BSONObj& response, + int expected) { + auto numDocsModified = response.getIntField("n"); + uassert(5030401, + str::stream() << "Expected to match " << expected << " docs, but only matched " + << numDocsModified << " for write request " << request.toString(), + expected == numDocsModified); } void writeToCoordinatorStateNss(OperationContext* opCtx, @@ -122,12 +138,15 @@ void writeToCoordinatorStateNss(OperationContext* opCtx, auto expectedNumModified = (request.getBatchType() == BatchedCommandRequest::BatchType_Insert) ? boost::none : boost::make_optional(1); - writeConfigDocs(opCtx, - NamespaceString::kConfigReshardingOperationsNamespace, - std::move(request), - true, // startTransaction - txnNumber, - expectedNumModified); + auto res = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn( + opCtx, + NamespaceString::kConfigReshardingOperationsNamespace, + std::move(request), + txnNumber); + + if (expectedNumModified) { + assertNumDocsModifiedMatchesExpected(request, res, *expectedNumModified); + } } BSONObj createReshardingFieldsUpdateForOriginalNss( @@ -184,18 +203,18 @@ void updateConfigCollectionsForOriginalNss(OperationContext* opCtx, auto writeOp = createReshardingFieldsUpdateForOriginalNss(opCtx, coordinatorDoc, newCollectionEpoch); - writeConfigDocs( - opCtx, - CollectionType::ConfigNS, + auto request = buildUpdateOp(CollectionType::ConfigNS, BSON(CollectionType::kNssFieldName << coordinatorDoc.getNss().ns()), // query writeOp, false, // upsert false // multi - ), - false, // startTransaction - txnNumber, - 1); // expectedNumModified + ); + + auto res = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn( + opCtx, CollectionType::ConfigNS, request, txnNumber); + + assertNumDocsModifiedMatchesExpected(request, res, 1 /* expected */); } void writeToConfigCollectionsForTempNss(OperationContext* opCtx, @@ -255,12 +274,13 @@ void writeToConfigCollectionsForTempNss(OperationContext* opCtx, auto expectedNumModified = (request.getBatchType() == BatchedCommandRequest::BatchType_Insert) ? boost::none : boost::make_optional(1); - writeConfigDocs(opCtx, - CollectionType::ConfigNS, - std::move(request), - false, // startTransaction - txnNumber, - expectedNumModified); + + auto res = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn( + opCtx, CollectionType::ConfigNS, std::move(request), txnNumber); + + if (expectedNumModified) { + assertNumDocsModifiedMatchesExpected(request, res, *expectedNumModified); + } } void insertChunkAndTagDocsForTempNss(OperationContext* opCtx, @@ -274,11 +294,8 @@ void insertChunkAndTagDocsForTempNss(OperationContext* opCtx, initialChunksBSON.begin(), [](ChunkType chunk) { return chunk.toConfigBSON(); }); - ShardingCatalogManager::get(opCtx)->insertConfigDocumentsInTxn(opCtx, - ChunkType::ConfigNS, - std::move(initialChunksBSON), - false, // startTransaction - txnNumber); + ShardingCatalogManager::get(opCtx)->insertConfigDocumentsInTxn( + opCtx, ChunkType::ConfigNS, std::move(initialChunksBSON), txnNumber); // Insert tag documents for temp nss std::vector<BSONObj> zonesBSON(newZones.size()); @@ -286,11 +303,8 @@ void insertChunkAndTagDocsForTempNss(OperationContext* opCtx, return chunk.toBSON(); }); - ShardingCatalogManager::get(opCtx)->insertConfigDocumentsInTxn(opCtx, - TagsType::ConfigNS, - std::move(zonesBSON), - false, // startTransaction - txnNumber); + ShardingCatalogManager::get(opCtx)->insertConfigDocumentsInTxn( + opCtx, TagsType::ConfigNS, std::move(zonesBSON), txnNumber); } void removeChunkAndTagsDocsForOriginalNss(OperationContext* opCtx, @@ -298,29 +312,25 @@ void removeChunkAndTagsDocsForOriginalNss(OperationContext* opCtx, TxnNumber txnNumber) { // Remove all chunk documents for the original nss. We do not know how many chunk docs currently // exist, so cannot pass a value for expectedNumModified - writeConfigDocs(opCtx, - ChunkType::ConfigNS, - buildDeleteOp(ChunkType::ConfigNS, - BSON(ChunkType::ns(coordinatorDoc.getNss().ns())), // query - true // multi - ), - false, // startTransaction - txnNumber, - boost::none // expectedNumModified - ); + ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn( + opCtx, + ChunkType::ConfigNS, + buildDeleteOp(ChunkType::ConfigNS, + BSON(ChunkType::ns(coordinatorDoc.getNss().ns())), // query + true // multi + ), + txnNumber); // Remove all tag documents for the original nss. We do not know how many tag docs currently // exist, so cannot pass a value for expectedNumModified - writeConfigDocs(opCtx, - TagsType::ConfigNS, - buildDeleteOp(TagsType::ConfigNS, - BSON(ChunkType::ns(coordinatorDoc.getNss().ns())), // query - true // multi - ), - false, // startTransaction - txnNumber, - boost::none // expectedNumModified - ); + ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn( + opCtx, + TagsType::ConfigNS, + buildDeleteOp(TagsType::ConfigNS, + BSON(ChunkType::ns(coordinatorDoc.getNss().ns())), // query + true // multi + ), + txnNumber); } void updateChunkAndTagsDocsForTempNss(OperationContext* opCtx, @@ -332,34 +342,38 @@ void updateChunkAndTagsDocsForTempNss(OperationContext* opCtx, // Update all chunk documents that currently have 'ns' as the temporary collection namespace // such that 'ns' is now the original collection namespace and 'lastmodEpoch' is // newCollectionEpoch. - writeConfigDocs( - opCtx, - ChunkType::ConfigNS, + auto chunksRequest = buildUpdateOp(ChunkType::ConfigNS, BSON(ChunkType::ns(coordinatorDoc.getTempReshardingNss().ns())), // query BSON("$set" << BSON("ns" << coordinatorDoc.getNss().ns() << "lastmodEpoch" << newCollectionEpoch)), // update false, // upsert true // multi - ), - false, // startTransaction - txnNumber, - expectedNumChunksModified); + ); - // Update the 'ns' field to be the original collection namespace for all tags documents that - // currently have 'ns' as the temporary collection namespace - writeConfigDocs( - opCtx, - TagsType::ConfigNS, + auto chunksRes = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn( + opCtx, ChunkType::ConfigNS, chunksRequest, txnNumber); + + if (expectedNumChunksModified) { + assertNumDocsModifiedMatchesExpected(chunksRequest, chunksRes, *expectedNumChunksModified); + } + + auto tagsRequest = buildUpdateOp(TagsType::ConfigNS, BSON(TagsType::ns(coordinatorDoc.getTempReshardingNss().ns())), // query BSON("$set" << BSON("ns" << coordinatorDoc.getNss().ns())), // update false, // upsert true // multi - ), - false, // startTransaction - txnNumber, - expectedNumZonesModified); + ); + + // Update the 'ns' field to be the original collection namespace for all tags documents that + // currently have 'ns' as the temporary collection namespace + auto tagsRes = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn( + opCtx, TagsType::ConfigNS, tagsRequest, txnNumber); + + if (expectedNumZonesModified) { + assertNumDocsModifiedMatchesExpected(tagsRequest, tagsRes, *expectedNumZonesModified); + } } /** @@ -374,6 +388,95 @@ std::vector<ShardId> extractShardIds(const std::vector<T>& participantShardEntri [](auto& shardEntry) { return shardEntry.getId(); }); return shardIds; } + +// +// Helper methods for ensuring donors/ recipients are able to notice when certain state transitions +// occur. +// +// Donors/ recipients learn when to transition states by noticing a change in shard versions for one +// of the two collections involved in the resharding operations. +// +// Donors are notified when shard versions spanning the original resharding collection are +// incremented. Recipients are notified when shard versions spanning the temporary resharding +// collection are incremented. +// + +/** + * Maps which participants are to be notified when the coordinator transitions into a given state. + */ +enum class ParticipantsToNofityEnum { kDonors, kRecipients, kNone }; +stdx::unordered_map<CoordinatorStateEnum, ParticipantsToNofityEnum> notifyForStateTransition{ + {CoordinatorStateEnum::kUnused, ParticipantsToNofityEnum::kNone}, + {CoordinatorStateEnum::kInitializing, ParticipantsToNofityEnum::kNone}, + {CoordinatorStateEnum::kPreparingToDonate, ParticipantsToNofityEnum::kDonors}, + {CoordinatorStateEnum::kCloning, ParticipantsToNofityEnum::kRecipients}, + {CoordinatorStateEnum::kMirroring, ParticipantsToNofityEnum::kDonors}, + {CoordinatorStateEnum::kCommitted, ParticipantsToNofityEnum::kNone}, + {CoordinatorStateEnum::kRenaming, ParticipantsToNofityEnum::kRecipients}, + {CoordinatorStateEnum::kDropping, ParticipantsToNofityEnum::kDonors}, + {CoordinatorStateEnum::kDone, ParticipantsToNofityEnum::kNone}, + {CoordinatorStateEnum::kError, ParticipantsToNofityEnum::kNone}, +}; + +/** + * Runs resharding metadata changes in a transaction. + * + * This function should only be called if donor and recipient shards DO NOT need to be informed of + * the updatedCoordinatorDoc's state transition. If donor or recipient shards need to be informed, + * instead call bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn(). + */ +void executeStateTransitionAndMetadataChangesInTxn( + OperationContext* opCtx, + const ReshardingCoordinatorDocument& updatedCoordinatorDoc, + unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc) { + const auto& state = updatedCoordinatorDoc.getState(); + invariant(notifyForStateTransition.find(state) != notifyForStateTransition.end()); + invariant(notifyForStateTransition[state] == ParticipantsToNofityEnum::kNone); + + // Neither donors nor recipients need to be informed of the transition to + // updatedCoordinatorDoc's state. + ShardingCatalogManager::withTransaction(opCtx, + NamespaceString::kConfigReshardingOperationsNamespace, + [&](OperationContext* opCtx, TxnNumber txnNumber) { + changeMetadataFunc(opCtx, txnNumber); + }); +} + +/** + * In a single transaction, bumps the shard version for each shard spanning the corresponding + * resharding collection and executes changeMetadataFunc. + * + * This function should only be called if donor or recipient shards need to be informed of the + * updatedCoordinatorDoc's state transition. If donor or recipient shards do not need to be + * informed, instead call executeStateTransitionAndMetadataChangesInTxn(). + */ +void bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn( + OperationContext* opCtx, + const ReshardingCoordinatorDocument& updatedCoordinatorDoc, + unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc) { + const auto& state = updatedCoordinatorDoc.getState(); + invariant(notifyForStateTransition.find(state) != notifyForStateTransition.end()); + invariant(notifyForStateTransition[state] != ParticipantsToNofityEnum::kNone); + + auto participantsToNotify = notifyForStateTransition[state]; + if (participantsToNotify == ParticipantsToNofityEnum::kDonors) { + // Bump the donor shard versions for the original namespace along with updating the + // metadata. + ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( + opCtx, + updatedCoordinatorDoc.getNss(), + extractShardIds(updatedCoordinatorDoc.getDonorShards()), + std::move(changeMetadataFunc)); + } else if (participantsToNotify == ParticipantsToNofityEnum::kRecipients) { + // Bump the recipient shard versions for the original namespace along with updating the + // metadata. + ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn( + opCtx, + updatedCoordinatorDoc.getTempReshardingNss(), + extractShardIds(updatedCoordinatorDoc.getRecipientShards()), + std::move(changeMetadataFunc)); + } +} } // namespace namespace resharding { @@ -413,27 +516,25 @@ void persistInitialStateAndCatalogUpdates(OperationContext* opCtx, opCtx, coordinatorDoc.getNss(), repl::ReadConcernLevel::kMajorityReadConcern)); const auto collation = originalCollType.value.getDefaultCollation(); - withAlternateSession(opCtx, [=](OperationContext* opCtx, TxnNumber txnNumber) { - // Insert state doc to config.reshardingOperations - writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); - - // Update the config.collections entry for the original collection to include - // 'reshardingFields' - updateConfigCollectionsForOriginalNss(opCtx, coordinatorDoc, boost::none, txnNumber); + bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn( + opCtx, coordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) { + // Insert state doc to config.reshardingOperations. + writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); - // Insert the config.collections entry for the temporary resharding collection. The chunks - // all have the same epoch, so picking the last chunk here is arbitrary. - auto chunkVersion = initialChunks.back().getVersion(); - writeToConfigCollectionsForTempNss( - opCtx, coordinatorDoc, chunkVersion, collation, txnNumber); + // Update the config.collections entry for the original collection to include + // 'reshardingFields' + updateConfigCollectionsForOriginalNss(opCtx, coordinatorDoc, boost::none, txnNumber); - // Insert new initial chunk and tag documents - insertChunkAndTagDocsForTempNss( - opCtx, std::move(initialChunks), std::move(newZones), txnNumber); + // Insert the config.collections entry for the temporary resharding collection. The + // chunks all have the same epoch, so picking the last chunk here is arbitrary. + auto chunkVersion = initialChunks.back().getVersion(); + writeToConfigCollectionsForTempNss( + opCtx, coordinatorDoc, chunkVersion, collation, txnNumber); - // Commit the transaction - ShardingCatalogManager::get(opCtx)->commitTxnForConfigDocument(opCtx, txnNumber); - }); + // Insert new initial chunk and tag documents. + insertChunkAndTagDocsForTempNss( + opCtx, std::move(initialChunks), std::move(newZones), txnNumber); + }); } void persistCommittedState(OperationContext* opCtx, @@ -441,39 +542,44 @@ void persistCommittedState(OperationContext* opCtx, OID newCollectionEpoch, boost::optional<int> expectedNumChunksModified, boost::optional<int> expectedNumZonesModified) { - withAlternateSession(opCtx, [=](OperationContext* opCtx, TxnNumber txnNumber) { - // Update the config.reshardingOperations entry - writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); + executeStateTransitionAndMetadataChangesInTxn( + opCtx, coordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) { + // Update the config.reshardingOperations entry + writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); - // Remove the config.collections entry for the temporary collection - writeToConfigCollectionsForTempNss( - opCtx, coordinatorDoc, boost::none, boost::none, txnNumber); - - // Update the config.collections entry for the original namespace to reflect the new shard - // key, new epoch, and new UUID - updateConfigCollectionsForOriginalNss(opCtx, coordinatorDoc, newCollectionEpoch, txnNumber); - - // Remove all chunk and tag documents associated with the original collection, then update - // the chunk and tag docs currently associated with the temp nss to be associated with the - // original nss - removeChunkAndTagsDocsForOriginalNss(opCtx, coordinatorDoc, txnNumber); - updateChunkAndTagsDocsForTempNss(opCtx, - coordinatorDoc, - newCollectionEpoch, - expectedNumChunksModified, - expectedNumZonesModified, - txnNumber); - - // Commit the transaction - ShardingCatalogManager::get(opCtx)->commitTxnForConfigDocument(opCtx, txnNumber); - }); + // Remove the config.collections entry for the temporary collection + writeToConfigCollectionsForTempNss( + opCtx, coordinatorDoc, boost::none, boost::none, txnNumber); + + // Update the config.collections entry for the original namespace to reflect the new + // shard key, new epoch, and new UUID + updateConfigCollectionsForOriginalNss( + opCtx, coordinatorDoc, newCollectionEpoch, txnNumber); + + // Remove all chunk and tag documents associated with the original collection, then + // update the chunk and tag docs currently associated with the temp nss to be associated + // with the original nss + removeChunkAndTagsDocsForOriginalNss(opCtx, coordinatorDoc, txnNumber); + updateChunkAndTagsDocsForTempNss(opCtx, + coordinatorDoc, + newCollectionEpoch, + expectedNumChunksModified, + expectedNumZonesModified, + txnNumber); + }); } -void persistStateTransition(OperationContext* opCtx, - const ReshardingCoordinatorDocument& coordinatorDoc) { +void persistStateTransitionAndCatalogUpdatesThenBumpShardVersions( + OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc) { // Run updates to config.reshardingOperations and config.collections in a transaction auto nextState = coordinatorDoc.getState(); - withAlternateSession(opCtx, [=](OperationContext* opCtx, TxnNumber txnNumber) { + invariant(notifyForStateTransition.find(nextState) != notifyForStateTransition.end()); + // TODO SERVER-51800 Remove special casing for kError. + invariant(nextState == CoordinatorStateEnum::kError || + notifyForStateTransition[nextState] != ParticipantsToNofityEnum::kNone); + + // Resharding metadata changes to be executed. + auto changeMetadataFunc = [&](OperationContext* opCtx, TxnNumber txnNumber) { // Update the config.reshardingOperations entry writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); @@ -482,31 +588,36 @@ void persistStateTransition(OperationContext* opCtx, // Update the config.collections entry for the temporary resharding collection. If we've // already committed this operation, we've removed the entry for the temporary - // collection and updated the entry with original namespace to have the new shard key, UUID, - // and epoch + // collection and updated the entry with original namespace to have the new shard key, + // UUID, and epoch if (nextState < CoordinatorStateEnum::kCommitted || nextState == CoordinatorStateEnum::kError) { writeToConfigCollectionsForTempNss( opCtx, coordinatorDoc, boost::none, boost::none, txnNumber); } + }; - // Commit the transaction - ShardingCatalogManager::get(opCtx)->commitTxnForConfigDocument(opCtx, txnNumber); - }); + // TODO SERVER-51800 Remove special casing for kError. + if (nextState == CoordinatorStateEnum::kError) { + executeStateTransitionAndMetadataChangesInTxn( + opCtx, coordinatorDoc, std::move(changeMetadataFunc)); + return; + } + + bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn( + opCtx, coordinatorDoc, std::move(changeMetadataFunc)); } void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc) { - withAlternateSession(opCtx, [=](OperationContext* opCtx, TxnNumber txnNumber) { - // Remove entry for this resharding operation from config.reshardingOperations - writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); + executeStateTransitionAndMetadataChangesInTxn( + opCtx, coordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) { + // Remove entry for this resharding operation from config.reshardingOperations + writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber); - // Remove the resharding fields from the config.collections entry - updateConfigCollectionsForOriginalNss(opCtx, coordinatorDoc, boost::none, txnNumber); - - // Commit the transaction - ShardingCatalogManager::get(opCtx)->commitTxnForConfigDocument(opCtx, txnNumber); - }); + // Remove the resharding fields from the config.collections entry + updateConfigCollectionsForOriginalNss(opCtx, coordinatorDoc, boost::none, txnNumber); + }); } } // namespace resharding @@ -541,17 +652,19 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run( .then([this, executor] { return _awaitAllRecipientsFinishedApplying(executor); }) .then([this, executor] { _tellAllDonorsToRefresh(executor); }) .then([this, executor] { return _awaitAllRecipientsInStrictConsistency(executor); }) - .then([this](const ReshardingCoordinatorDocument& updatedStateDoc) { - return _commit(updatedStateDoc); + .then([this](const ReshardingCoordinatorDocument& updatedCoordinatorDoc) { + return _commit(updatedCoordinatorDoc); }) .then([this] { if (_coordinatorDoc.getState() > CoordinatorStateEnum::kRenaming) { return; } - this->_runUpdates(CoordinatorStateEnum::kRenaming, _coordinatorDoc); + _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kRenaming, + _coordinatorDoc); return; }) + .then([this, executor] { _tellAllRecipientsToRefresh(executor); }) .then([this, executor] { return _awaitAllRecipientsRenamedCollection(executor); }) .then([this, executor] { _tellAllDonorsToRefresh(executor); }) .then([this, executor] { return _awaitAllDonorsDroppedOriginalCollection(executor); }) @@ -564,7 +677,8 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run( return status; } - _runUpdates(CoordinatorStateEnum::kError, _coordinatorDoc); + _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kError, + _coordinatorDoc); LOGV2(4956902, "Resharding failed", @@ -662,11 +776,12 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonat return _reshardingCoordinatorObserver->awaitAllDonorsReadyToDonate() .thenRunOn(**executor) - .then([this](ReshardingCoordinatorDocument updatedStateDoc) { + .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { auto highestMinFetchTimestamp = - getHighestMinFetchTimestamp(updatedStateDoc.getDonorShards()); - this->_runUpdates( - CoordinatorStateEnum::kCloning, updatedStateDoc, highestMinFetchTimestamp); + getHighestMinFetchTimestamp(coordinatorDocChangedOnDisk.getDonorShards()); + _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kCloning, + coordinatorDocChangedOnDisk, + highestMinFetchTimestamp); }); } @@ -679,8 +794,9 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished return _reshardingCoordinatorObserver->awaitAllRecipientsFinishedCloning() .thenRunOn(**executor) - .then([this](ReshardingCoordinatorDocument updatedStateDoc) { - this->_runUpdates(CoordinatorStateEnum::kApplying, updatedStateDoc); + .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { + this->_updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kApplying, + coordinatorDocChangedOnDisk); }); } @@ -693,8 +809,9 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished return _reshardingCoordinatorObserver->awaitAllRecipientsFinishedApplying() .thenRunOn(**executor) - .then([this](ReshardingCoordinatorDocument updatedStateDoc) { - this->_runUpdates(CoordinatorStateEnum::kMirroring, updatedStateDoc); + .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { + this->_updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kMirroring, + coordinatorDocChangedOnDisk); }); } @@ -710,12 +827,12 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsInStrict } Future<void> ReshardingCoordinatorService::ReshardingCoordinator::_commit( - const ReshardingCoordinatorDocument& updatedDoc) { + const ReshardingCoordinatorDocument& coordinatorDoc) { if (_coordinatorDoc.getState() > CoordinatorStateEnum::kMirroring) { return Status::OK(); } - ReshardingCoordinatorDocument updatedCoordinatorDoc = updatedDoc; + ReshardingCoordinatorDocument updatedCoordinatorDoc = coordinatorDoc; updatedCoordinatorDoc.setState(CoordinatorStateEnum::kCommitted); // Get the number of initial chunks and new zones that we inserted during initialization in @@ -759,8 +876,9 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsRenamedC return _reshardingCoordinatorObserver->awaitAllRecipientsRenamedCollection() .thenRunOn(**executor) - .then([this](ReshardingCoordinatorDocument updatedStateDoc) { - this->_runUpdates(CoordinatorStateEnum::kDropping, updatedStateDoc); + .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { + _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kDropping, + coordinatorDocChangedOnDisk); }); } @@ -773,22 +891,24 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsDroppedOrigi return _reshardingCoordinatorObserver->awaitAllDonorsDroppedOriginalCollection() .thenRunOn(**executor) - .then([this](ReshardingCoordinatorDocument updatedStateDoc) { - this->_runUpdates(CoordinatorStateEnum::kDone, updatedStateDoc); + .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { + _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kDone, + coordinatorDocChangedOnDisk); }); } -void ReshardingCoordinatorService::ReshardingCoordinator::_runUpdates( - CoordinatorStateEnum nextState, - ReshardingCoordinatorDocument updatedStateDoc, - boost::optional<Timestamp> fetchTimestamp) { +void ReshardingCoordinatorService::ReshardingCoordinator:: + _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum nextState, + ReshardingCoordinatorDocument coordinatorDoc, + boost::optional<Timestamp> fetchTimestamp) { // Build new state doc for coordinator state update - ReshardingCoordinatorDocument updatedCoordinatorDoc = updatedStateDoc; + ReshardingCoordinatorDocument updatedCoordinatorDoc = coordinatorDoc; updatedCoordinatorDoc.setState(nextState); emplaceFetchTimestampIfExists(updatedCoordinatorDoc, std::move(fetchTimestamp)); auto opCtx = cc().makeOperationContext(); - resharding::persistStateTransition(opCtx.get(), updatedCoordinatorDoc); + resharding::persistStateTransitionAndCatalogUpdatesThenBumpShardVersions(opCtx.get(), + updatedCoordinatorDoc); // Update in-memory coordinator doc _coordinatorDoc = updatedCoordinatorDoc; diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h index cfcfb5badc8..adb161a710c 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h @@ -59,8 +59,8 @@ void persistCommittedState(OperationContext* opCtx, boost::optional<int> expectedNumChunksModified, boost::optional<int> expectedNumZonesModified); -void persistStateTransition(OperationContext* opCtx, - const ReshardingCoordinatorDocument& coordinatorDoc); +void persistStateTransitionAndCatalogUpdatesThenBumpShardVersions( + OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc); void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc); @@ -201,9 +201,10 @@ private: * Updates the entry for this resharding operation in config.reshardingOperations and the * catalog entries for the original and temporary namespaces in config.collections. */ - void _runUpdates(CoordinatorStateEnum nextState, - ReshardingCoordinatorDocument updatedStateDoc, - boost::optional<Timestamp> fetchTimestamp = boost::none); + void _updateCoordinatorDocStateAndCatalogEntries( + CoordinatorStateEnum nextState, + ReshardingCoordinatorDocument coordinatorDoc, + boost::optional<Timestamp> fetchTimestamp = boost::none); /** * Sends 'flushRoutingTableCacheUpdatesWithWriteConcern' for the temporary namespace to all diff --git a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp index 4ee4d193ff1..0b423a7dbf2 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp @@ -27,6 +27,8 @@ * it in the license file. */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + #include "mongo/platform/basic.h" #include <boost/optional.hpp> @@ -41,6 +43,7 @@ #include "mongo/db/s/resharding_util.h" #include "mongo/db/s/transaction_coordinator_service.h" #include "mongo/db/session_catalog_mongod.h" +#include "mongo/logv2/log.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/unittest/unittest.h" @@ -118,6 +121,34 @@ protected: return collType; } + // Returns the chunk for the donor shard. + ChunkType makeAndInsertChunksForDonorShard(const NamespaceString& nss, + OID epoch, + const ShardKeyPattern& shardKey, + std::vector<OID> ids) { + auto chunks = makeChunks(nss, epoch, shardKey, ids); + + // Only the chunk corresponding to shard0000 is stored as a donor in the coordinator state + // document constructed. + auto donorChunk = chunks[0]; + insertChunkAndZoneEntries({donorChunk}, {}); + return donorChunk; + } + + // Returns the chunk for the recipient shard. + ChunkType makeAndInsertChunksForRecipientShard(const NamespaceString& nss, + OID epoch, + const ShardKeyPattern& shardKey, + std::vector<OID> ids) { + auto chunks = makeChunks(nss, epoch, shardKey, ids); + + // Only the chunk corresponding to shard0001 is stored as a recipient in the coordinator + // state document constructed. + auto recipientChunk = chunks[1]; + insertChunkAndZoneEntries({recipientChunk}, {}); + return recipientChunk; + } + std::vector<ChunkType> makeChunks(const NamespaceString& nss, OID epoch, const ShardKeyPattern& shardKey, @@ -446,7 +477,8 @@ protected: void persistStateTransitionUpdateExpectSuccess( OperationContext* opCtx, ReshardingCoordinatorDocument expectedCoordinatorDoc) { - resharding::persistStateTransition(opCtx, expectedCoordinatorDoc); + resharding::persistStateTransitionAndCatalogUpdatesThenBumpShardVersions( + opCtx, expectedCoordinatorDoc); // Check that config.reshardingOperations and config.collections entries are updated // correctly @@ -503,6 +535,22 @@ protected: opCtx, collType, true); } + void assertChunkVersionDidNotIncreaseAfterStateTransition( + const ChunkType& chunk, const ChunkVersion& collectionVersion) { + auto chunkAfterTransition = getChunkDoc(operationContext(), chunk.getMin()); + ASSERT_EQ(chunkAfterTransition.getStatus(), Status::OK()); + ASSERT_EQ(chunkAfterTransition.getValue().getVersion().majorVersion(), + collectionVersion.majorVersion()); + } + + void assertChunkVersionIncreasedAfterStateTransition(const ChunkType& chunk, + const ChunkVersion& collectionVersion) { + auto chunkAfterTransition = getChunkDoc(operationContext(), chunk.getMin()); + ASSERT_EQ(chunkAfterTransition.getStatus(), Status::OK()); + ASSERT_EQ(chunkAfterTransition.getValue().getVersion().majorVersion(), + collectionVersion.majorVersion() + 1); + } + NamespaceString _originalNss = NamespaceString("db.foo"); UUID _originalUUID = UUID::gen(); OID _originalEpoch = OID::gen(); @@ -528,6 +576,13 @@ protected: TEST_F(ReshardingCoordinatorPersistenceTest, PersistInitialInfoSucceeds) { auto coordinatorDoc = makeCoordinatorDoc(CoordinatorStateEnum::kInitializing); + + // Ensure the chunks for the original namespace exist since they will be bumped as a product of + // the state transition to kPreparingToDonate. + auto donorChunk = makeAndInsertChunksForDonorShard( + _originalNss, _originalEpoch, _oldShardKey, std::vector{OID::gen(), OID::gen()}); + auto collectionVersion = donorChunk.getVersion(); + auto initialChunks = makeChunks(_tempNss, _tempEpoch, _newShardKey, std::vector{OID::gen(), OID::gen()}); auto newZones = makeZones(_tempNss, _newShardKey); @@ -538,28 +593,47 @@ TEST_F(ReshardingCoordinatorPersistenceTest, PersistInitialInfoSucceeds) { persistInitialStateAndCatalogUpdatesExpectSuccess( operationContext(), expectedCoordinatorDoc, initialChunks, newZones); + + // Confirm the shard version was increased for the donor shard. + auto donorChunkPostTransition = getChunkDoc(operationContext(), donorChunk.getMin()); + ASSERT_EQ(donorChunkPostTransition.getStatus(), Status::OK()); + ASSERT_EQ(donorChunkPostTransition.getValue().getVersion().majorVersion(), + collectionVersion.majorVersion() + 1); } TEST_F(ReshardingCoordinatorPersistenceTest, PersistBasicStateTransitionSucceeds) { auto coordinatorDoc = insertStateAndCatalogEntries(CoordinatorStateEnum::kCloning, _originalEpoch); + // Ensure the chunks for the original namespace exist since they will be bumped as a product of + // the state transition to kPreparingToDonate. + auto donorChunk = makeAndInsertChunksForDonorShard( + _originalNss, _originalEpoch, _oldShardKey, std::vector{OID::gen(), OID::gen()}); + auto collectionVersion = donorChunk.getVersion(); + // Persist the updates on disk auto expectedCoordinatorDoc = coordinatorDoc; expectedCoordinatorDoc.setState(CoordinatorStateEnum::kMirroring); persistStateTransitionUpdateExpectSuccess(operationContext(), expectedCoordinatorDoc); + assertChunkVersionIncreasedAfterStateTransition(donorChunk, collectionVersion); } TEST_F(ReshardingCoordinatorPersistenceTest, PersistFetchTimestampStateTransitionSucceeds) { auto coordinatorDoc = insertStateAndCatalogEntries(CoordinatorStateEnum::kPreparingToDonate, _originalEpoch); + auto recipientChunk = makeAndInsertChunksForRecipientShard( + _tempNss, _tempEpoch, _newShardKey, std::vector{OID::gen(), OID::gen()}); + auto collectionVersion = recipientChunk.getVersion(); + // Persist the updates on disk auto expectedCoordinatorDoc = coordinatorDoc; expectedCoordinatorDoc.setState(CoordinatorStateEnum::kCloning); emplaceFetchTimestampIfExists(expectedCoordinatorDoc, Timestamp(1, 1)); + persistStateTransitionUpdateExpectSuccess(operationContext(), expectedCoordinatorDoc); + assertChunkVersionIncreasedAfterStateTransition(recipientChunk, collectionVersion); } TEST_F(ReshardingCoordinatorPersistenceTest, PersistCommitSucceeds) { @@ -567,8 +641,11 @@ TEST_F(ReshardingCoordinatorPersistenceTest, PersistCommitSucceeds) { auto coordinatorDoc = insertStateAndCatalogEntries( CoordinatorStateEnum::kMirroring, _originalEpoch, fetchTimestamp); auto initialChunksIds = std::vector{OID::gen(), OID::gen()}; - insertChunkAndZoneEntries(makeChunks(_tempNss, _tempEpoch, _newShardKey, initialChunksIds), - makeZones(_tempNss, _newShardKey)); + + auto tempNssChunks = makeChunks(_tempNss, _tempEpoch, _newShardKey, initialChunksIds); + auto recipientChunk = tempNssChunks[1]; + insertChunkAndZoneEntries(tempNssChunks, makeZones(_tempNss, _newShardKey)); + insertChunkAndZoneEntries( makeChunks(_originalNss, OID::gen(), _oldShardKey, std::vector{OID::gen(), OID::gen()}), makeZones(_originalNss, _oldShardKey)); @@ -584,6 +661,9 @@ TEST_F(ReshardingCoordinatorPersistenceTest, PersistCommitSucceeds) { persistCommittedStateExpectSuccess( operationContext(), expectedCoordinatorDoc, fetchTimestamp, updatedChunks, updatedZones); + + assertChunkVersionDidNotIncreaseAfterStateTransition(recipientChunk, + recipientChunk.getVersion()); } TEST_F(ReshardingCoordinatorPersistenceTest, PersistTransitionToErrorSucceeds) { @@ -614,9 +694,10 @@ TEST_F(ReshardingCoordinatorPersistenceTest, // Do not insert initial entry into config.reshardingOperations. Attempt to update coordinator // state documents. auto coordinatorDoc = makeCoordinatorDoc(CoordinatorStateEnum::kCloning, Timestamp(1, 1)); - ASSERT_THROWS_CODE(resharding::persistStateTransition(operationContext(), coordinatorDoc), + ASSERT_THROWS_CODE(resharding::persistStateTransitionAndCatalogUpdatesThenBumpShardVersions( + operationContext(), coordinatorDoc), AssertionException, - 5030400); + 50577); } TEST_F(ReshardingCoordinatorPersistenceTest, PersistCommitDoesNotMatchChunksFails) { @@ -645,7 +726,7 @@ TEST_F(ReshardingCoordinatorPersistenceTest, PersistCommitDoesNotMatchChunksFail resharding::persistCommittedState( operationContext(), expectedCoordinatorDoc, _finalEpoch, updatedChunks.size(), 0), AssertionException, - 5030400); + 5030401); } TEST_F(ReshardingCoordinatorPersistenceTest, diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp index 1c8eeab3a90..d7b20524d75 100644 --- a/src/mongo/db/s/resharding_util.cpp +++ b/src/mongo/db/s/resharding_util.cpp @@ -49,6 +49,7 @@ #include "mongo/db/pipeline/document_source_sort.h" #include "mongo/db/pipeline/document_source_unwind.h" #include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/config/sharding_catalog_manager.h" #include "mongo/db/storage/write_unit_of_work.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -151,54 +152,6 @@ NamespaceString constructTemporaryReshardingNss(StringData db, const UUID& sourc sourceUuid.toString())); } -BatchedCommandRequest buildInsertOp(const NamespaceString& nss, std::vector<BSONObj> docs) { - BatchedCommandRequest request([&] { - write_ops::Insert insertOp(nss); - insertOp.setDocuments(docs); - return insertOp; - }()); - - return request; -} - -BatchedCommandRequest buildUpdateOp(const NamespaceString& nss, - const BSONObj& query, - const BSONObj& update, - bool upsert, - bool multi) { - BatchedCommandRequest request([&] { - write_ops::Update updateOp(nss); - updateOp.setUpdates({[&] { - write_ops::UpdateOpEntry entry; - entry.setQ(query); - entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(update)); - entry.setUpsert(upsert); - entry.setMulti(multi); - return entry; - }()}); - return updateOp; - }()); - - return request; -} - -BatchedCommandRequest buildDeleteOp(const NamespaceString& nss, - const BSONObj& query, - bool multiDelete) { - BatchedCommandRequest request([&] { - write_ops::Delete deleteOp(nss); - deleteOp.setDeletes({[&] { - write_ops::DeleteOpEntry entry; - entry.setQ(query); - entry.setMulti(multiDelete); - return entry; - }()}); - return deleteOp; - }()); - - return request; -} - void tellShardsToRefresh(OperationContext* opCtx, const std::vector<ShardId>& shardIds, const NamespaceString& nss, diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h index 33c491a4d23..bec052dc1e0 100644 --- a/src/mongo/db/s/resharding_util.h +++ b/src/mongo/db/s/resharding_util.h @@ -102,27 +102,6 @@ UUID getCollectionUUIDFromChunkManger(const NamespaceString& nss, const ChunkMan NamespaceString constructTemporaryReshardingNss(StringData db, const UUID& sourceUuid); /** - * Constructs a BatchedCommandRequest with batch type 'Insert'. - */ -BatchedCommandRequest buildInsertOp(const NamespaceString& nss, std::vector<BSONObj> docs); - -/** - * Constructs a BatchedCommandRequest with batch type 'Update'. - */ -BatchedCommandRequest buildUpdateOp(const NamespaceString& nss, - const BSONObj& query, - const BSONObj& update, - bool upsert, - bool multi); - -/** - * Constructs a BatchedCommandRequest with batch type 'Delete'. - */ -BatchedCommandRequest buildDeleteOp(const NamespaceString& nss, - const BSONObj& query, - bool multiDelete); - -/** * Sends _flushRoutingTableCacheUpdatesWithWriteConcern to a list of shards. Throws if one of the * shards fails to refresh. */ |