summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBlake Oler <blake.oler@mongodb.com>2020-10-21 20:17:10 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-11-03 00:43:59 +0000
commit411dab1fa8060e48e11bf67b396be3ff24b94d3b (patch)
tree57fdf9ca5f48e77a0dabe792e0d9795d03435db6 /src
parentdbd149f6c61abb501bac289fd4e6136aa9bbfc36 (diff)
downloadmongo-411dab1fa8060e48e11bf67b396be3ff24b94d3b.tar.gz
SERVER-51291 Increment shard version when changing original or temporary resharding collection
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/SConscript3
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager.cpp93
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager.h46
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp221
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp122
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp69
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp486
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.h11
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_test.cpp93
-rw-r--r--src/mongo/db/s/resharding_util.cpp49
-rw-r--r--src/mongo/db/s/resharding_util.h21
11 files changed, 845 insertions, 369 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 1a438554288..4aad6f8db57 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -162,7 +162,7 @@ env.Library(
'$BUILD_DIR/mongo/s/common_s',
'$BUILD_DIR/mongo/s/grid',
'sharding_api_d',
-
+ 'sharding_catalog_manager',
],
)
@@ -561,6 +561,7 @@ env.CppUnitTest(
'config/sharding_catalog_manager_add_shard_test.cpp',
'config/sharding_catalog_manager_add_shard_to_zone_test.cpp',
'config/sharding_catalog_manager_assign_key_range_to_zone_test.cpp',
+ 'config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp',
'config/sharding_catalog_manager_clear_jumbo_flag_test.cpp',
'config/sharding_catalog_manager_commit_chunk_migration_test.cpp',
'config/sharding_catalog_manager_config_initialization_test.cpp',
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.cpp b/src/mongo/db/s/config/sharding_catalog_manager.cpp
index 8d0b36cac78..ff8e7c04f8d 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager.cpp
@@ -35,7 +35,9 @@
#include "mongo/db/auth/authorization_session_impl.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/query/query_request.h"
#include "mongo/db/s/balancer/type_migration.h"
+#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog/config_server_version.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
@@ -86,9 +88,24 @@ OpMsg runCommandInLocalTxn(OperationContext* opCtx,
.response);
}
-BSONObj runCommitOrAbortTxnForConfigDocument(OperationContext* opCtx,
- TxnNumber txnNumber,
- std::string cmdName) {
+void startTransactionWithNoopFind(OperationContext* opCtx,
+ const NamespaceString& nss,
+ TxnNumber txnNumber) {
+ BSONObjBuilder findCmdBuilder;
+ QueryRequest qr(nss);
+ qr.setBatchSize(0);
+ qr.setWantMore(false);
+ qr.asFindCommand(&findCmdBuilder);
+
+ auto res = runCommandInLocalTxn(
+ opCtx, nss.db(), true /*startTransaction*/, txnNumber, findCmdBuilder.done())
+ .body;
+ uassertStatusOK(getStatusFromCommandResult(res));
+}
+
+BSONObj commitOrAbortTransaction(OperationContext* opCtx,
+ TxnNumber txnNumber,
+ std::string cmdName) {
// Swap out the clients in order to get a fresh opCtx. Previous operations in this transaction
// that have been run on this opCtx would have set the timeout in the locker on the opCtx, but
// commit should not have a lock timeout.
@@ -124,6 +141,24 @@ BSONObj runCommitOrAbortTxnForConfigDocument(OperationContext* opCtx,
return replyOpMsg.body;
}
+// Runs commit for the transaction with 'txnNumber'.
+void commitTransaction(OperationContext* opCtx, TxnNumber txnNumber) {
+ auto response = commitOrAbortTransaction(opCtx, txnNumber, "commitTransaction");
+ uassertStatusOK(getStatusFromCommandResult(response));
+ uassertStatusOK(getWriteConcernStatusFromCommandResult(response));
+}
+
+// Runs abort for the transaction with 'txnNumber'.
+void abortTransaction(OperationContext* opCtx, TxnNumber txnNumber) {
+ auto response = commitOrAbortTransaction(opCtx, txnNumber, "abortTransaction");
+
+ auto status = getStatusFromCommandResult(response);
+ if (status.code() != ErrorCodes::NoSuchTransaction) {
+ uassertStatusOK(status);
+ uassertStatusOK(getWriteConcernStatusFromCommandResult(response));
+ }
+}
+
} // namespace
void ShardingCatalogManager::create(ServiceContext* serviceContext,
@@ -466,17 +501,21 @@ StatusWith<bool> ShardingCatalogManager::_isShardRequiredByZoneStillInUse(
BSONObj ShardingCatalogManager::writeToConfigDocumentInTxn(OperationContext* opCtx,
const NamespaceString& nss,
const BatchedCommandRequest& request,
- bool startTransaction,
TxnNumber txnNumber) {
invariant(nss.db() == NamespaceString::kConfigDb);
- return runCommandInLocalTxn(opCtx, nss.db(), startTransaction, txnNumber, request.toBSON())
- .body;
+ auto response = runCommandInLocalTxn(
+ opCtx, nss.db(), false /* startTransaction */, txnNumber, request.toBSON())
+ .body;
+
+ uassertStatusOK(getStatusFromCommandResult(response));
+ uassertStatusOK(getWriteConcernStatusFromCommandResult(response));
+
+ return response;
}
void ShardingCatalogManager::insertConfigDocumentsInTxn(OperationContext* opCtx,
const NamespaceString& nss,
std::vector<BSONObj> docs,
- bool startTransaction,
TxnNumber txnNumber) {
invariant(nss.db() == NamespaceString::kConfigDb);
@@ -491,8 +530,7 @@ void ShardingCatalogManager::insertConfigDocumentsInTxn(OperationContext* opCtx,
return insertOp;
}());
- uassertStatusOK(getStatusFromWriteCommandReply(
- writeToConfigDocumentInTxn(opCtx, nss, request, startTransaction, txnNumber)));
+ writeToConfigDocumentInTxn(opCtx, nss, request, txnNumber);
};
while (!docs.empty()) {
@@ -520,22 +558,29 @@ void ShardingCatalogManager::insertConfigDocumentsInTxn(OperationContext* opCtx,
doBatchInsert();
}
-void ShardingCatalogManager::commitTxnForConfigDocument(OperationContext* opCtx,
- TxnNumber txnNumber) {
- auto response = runCommitOrAbortTxnForConfigDocument(opCtx, txnNumber, "commitTransaction");
- uassertStatusOK(getStatusFromCommandResult(response));
- uassertStatusOK(getWriteConcernStatusFromCommandResult(response));
-}
-
-void ShardingCatalogManager::abortTxnForConfigDocument(OperationContext* opCtx,
- TxnNumber txnNumber) {
- auto response = runCommitOrAbortTxnForConfigDocument(opCtx, txnNumber, "abortTransaction");
+void ShardingCatalogManager::withTransaction(
+ OperationContext* opCtx,
+ const NamespaceString& namespaceForInitialFind,
+ unique_function<void(OperationContext*, TxnNumber)> func) {
+ AlternativeSessionRegion asr(opCtx);
+ AuthorizationSession::get(asr.opCtx()->getClient())
+ ->grantInternalAuthorization(asr.opCtx()->getClient());
+ TxnNumber txnNumber = 0;
+
+ auto guard = makeGuard([opCtx = asr.opCtx(), txnNumber] {
+ try {
+ abortTransaction(opCtx, txnNumber);
+ } catch (DBException& e) {
+ LOGV2_WARNING(5192100,
+ "Failed to abort transaction in AlternativeSessionRegion",
+ "error"_attr = redact(e));
+ }
+ });
- auto status = getStatusFromCommandResult(response);
- if (status.code() != ErrorCodes::NoSuchTransaction) {
- uassertStatusOK(status);
- uassertStatusOK(getWriteConcernStatusFromCommandResult(response));
- }
+ startTransactionWithNoopFind(asr.opCtx(), namespaceForInitialFind, txnNumber);
+ func(asr.opCtx(), txnNumber);
+ commitTransaction(asr.opCtx(), txnNumber);
+ guard.dismiss();
}
} // namespace mongo
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h
index a6f97372025..fafcbd62897 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager.h
+++ b/src/mongo/db/s/config/sharding_catalog_manager.h
@@ -30,7 +30,9 @@
#pragma once
#include "mongo/base/status_with.h"
+#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/logical_session_cache.h"
#include "mongo/db/repl/optime_with.h"
#include "mongo/db/s/config/namespace_serializer.h"
#include "mongo/executor/task_executor.h"
@@ -183,14 +185,28 @@ public:
*/
Lock::ExclusiveLock lockZoneMutex(OperationContext* opCtx);
+ //
+ // General utilities related to the ShardingCatalogManager
+ //
+
+ /**
+ * Starts and commits a transaction on the config server, with a no-op find on the specified
+ * namespace in order to internally start the transaction. All writes done inside the
+ * passed-in function must assume that they are run inside a transaction that will be commited
+ * after the function itself has completely finished.
+ */
+ static void withTransaction(OperationContext* opCtx,
+ const NamespaceString& namespaceForInitialFind,
+ unique_function<void(OperationContext*, TxnNumber)> func);
+
/**
* Runs the write 'request' on namespace 'nss' in a transaction with 'txnNumber'. Write must be
- * on a collection in the config database.
+ * on a collection in the config database. If expectedNumModified is specified, the number of
+ * documents modified must match expectedNumModified - throws otherwise.
*/
BSONObj writeToConfigDocumentInTxn(OperationContext* opCtx,
const NamespaceString& nss,
const BatchedCommandRequest& request,
- bool startTransaction,
TxnNumber txnNumber);
/**
@@ -201,19 +217,8 @@ public:
void insertConfigDocumentsInTxn(OperationContext* opCtx,
const NamespaceString& nss,
std::vector<BSONObj> docs,
- bool startTransaction,
TxnNumber txnNumber);
- /**
- * Runs commit for the transaction with 'txnNumber'.
- */
- void commitTxnForConfigDocument(OperationContext* opCtx, TxnNumber txnNumber);
-
- /**
- * Runs abort for the transaction with 'txnNumber'.
- */
- void abortTxnForConfigDocument(OperationContext* opCtx, TxnNumber txnNumber);
-
//
// Chunk Operations
//
@@ -283,6 +288,20 @@ public:
const BSONObj& maxKey,
const ChunkVersion& version);
+ /**
+ * In a single transaction, effectively bumps the shard version for each shard in the collection
+ * to be the current collection version's major version + 1 inside an already-running
+ * transaction.
+ *
+ * Note: it's the responsibility of the caller to ensure that the list of shards is stable,
+ * as any shards added after the shard ids have been passed in will be missed.
+ */
+ void bumpCollShardVersionsAndChangeMetadataInTxn(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const std::vector<ShardId>& shardIds,
+ unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc);
+
//
// Database Operations
//
@@ -381,7 +400,6 @@ public:
const NamespaceString& nss,
const CollectionType& coll,
const bool upsert,
- const bool startTransaction,
TxnNumber txnNumber);
/**
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp
new file mode 100644
index 00000000000..7df7df0e6c7
--- /dev/null
+++ b/src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp
@@ -0,0 +1,221 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/logical_session_cache_noop.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/db/s/config/config_server_test_fixture.h"
+#include "mongo/db/s/config/sharding_catalog_manager.h"
+#include "mongo/db/s/transaction_coordinator_service.h"
+#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/db/transaction_participant.h"
+#include "mongo/logv2/log.h"
+
+namespace mongo {
+namespace {
+
+const NamespaceString kNss("TestDB", "TestColl");
+const ShardType kShard0("shard0000", "shard0000:1234");
+const ShardType kShard1("shard0001", "shard0001:1234");
+
+class ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest
+ : public ConfigServerTestFixture {
+ void setUp() {
+ ConfigServerTestFixture::setUp();
+ setupShards({kShard0, kShard1});
+
+ // Create config.transactions collection.
+ auto opCtx = operationContext();
+ DBDirectClient client(opCtx);
+ client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns());
+ client.createCollection(CollectionType::ConfigNS.ns());
+
+ LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>());
+ TransactionCoordinatorService::get(operationContext())
+ ->onShardingInitialization(operationContext(), true);
+ }
+
+ void tearDown() {
+ TransactionCoordinatorService::get(operationContext())->onStepDown();
+ ConfigServerTestFixture::tearDown();
+ }
+
+protected:
+ ChunkType generateChunkType(const NamespaceString& nss,
+ const ChunkVersion& chunkVersion,
+ const ShardId& shardId,
+ const BSONObj& minKey,
+ const BSONObj& maxKey) {
+ ChunkType chunkType;
+ chunkType.setName(OID::gen());
+ chunkType.setNS(nss);
+ chunkType.setVersion(chunkVersion);
+ chunkType.setShard(shardId);
+ chunkType.setMin(minKey);
+ chunkType.setMax(maxKey);
+ chunkType.setHistory({ChunkHistory(Timestamp(100, 0), shardId)});
+ return chunkType;
+ }
+
+ /**
+ * Determines if the chunk's version has been bumped to the targetChunkVersion.
+ */
+ bool chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged(
+ const ChunkType& chunkTypeBefore,
+ const StatusWith<ChunkType> swChunkTypeAfter,
+ const ChunkVersion& targetChunkVersion) {
+ ASSERT_OK(swChunkTypeAfter.getStatus());
+ auto chunkTypeAfter = swChunkTypeAfter.getValue();
+
+ // Regardless of whether the major version was bumped, the chunk's other fields should be
+ // unchanged.
+ ASSERT_EQ(chunkTypeBefore.getName(), chunkTypeAfter.getName());
+ ASSERT_EQ(chunkTypeBefore.getNS(), chunkTypeAfter.getNS());
+ ASSERT_BSONOBJ_EQ(chunkTypeBefore.getMin(), chunkTypeAfter.getMin());
+ ASSERT_BSONOBJ_EQ(chunkTypeBefore.getMax(), chunkTypeAfter.getMax());
+ ASSERT(chunkTypeBefore.getHistory() == chunkTypeAfter.getHistory());
+
+ return chunkTypeAfter.getVersion().majorVersion() == targetChunkVersion.majorVersion();
+ }
+
+ /**
+ * If there are multiple chunks per shard, the chunk whose version gets bumped is not
+ * deterministic.
+ *
+ * Asserts that only chunk per shard has its major version increased.
+ */
+ void assertOnlyOneChunkVersionBumped(OperationContext* opCtx,
+ std::vector<ChunkType> originalChunkTypes,
+ const ChunkVersion& targetChunkVersion) {
+ auto aChunkVersionWasBumped = false;
+ for (auto originalChunkType : originalChunkTypes) {
+ auto swChunkTypeAfter = getChunkDoc(opCtx, originalChunkType.getMin());
+ auto wasBumped = chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged(
+ originalChunkType, swChunkTypeAfter, targetChunkVersion);
+ if (aChunkVersionWasBumped) {
+ ASSERT_FALSE(wasBumped);
+ } else {
+ aChunkVersionWasBumped = wasBumped;
+ }
+ }
+
+ ASSERT_TRUE(aChunkVersionWasBumped);
+ }
+};
+
+TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest,
+ BumpChunkVersionOneChunkPerShard) {
+ const auto epoch = OID::gen();
+ const auto shard0Chunk0 = generateChunkType(
+ kNss, ChunkVersion(10, 1, epoch), kShard0.getName(), BSON("a" << 1), BSON("a" << 10));
+ const auto shard1Chunk0 = generateChunkType(
+ kNss, ChunkVersion(11, 2, epoch), kShard1.getName(), BSON("a" << 11), BSON("a" << 20));
+
+ const auto collectionVersion = shard1Chunk0.getVersion();
+ ChunkVersion targetChunkVersion(
+ collectionVersion.majorVersion() + 1, 0, collectionVersion.epoch());
+
+ setupChunks({shard0Chunk0, shard1Chunk0});
+
+ auto opCtx = operationContext();
+
+ std::vector<ShardId> shardIds{kShard0.getName(), kShard1.getName()};
+ ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn(
+ opCtx, kNss, shardIds, [&](OperationContext*, TxnNumber) {});
+
+ ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged(
+ shard0Chunk0, getChunkDoc(operationContext(), shard0Chunk0.getMin()), targetChunkVersion));
+
+ ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged(
+ shard1Chunk0, getChunkDoc(operationContext(), shard1Chunk0.getMin()), targetChunkVersion));
+}
+
+TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest,
+ BumpChunkVersionTwoChunksOnOneShard) {
+ const auto epoch = OID::gen();
+ const auto shard0Chunk0 = generateChunkType(
+ kNss, ChunkVersion(10, 1, epoch), kShard0.getName(), BSON("a" << 1), BSON("a" << 10));
+ const auto shard0Chunk1 = generateChunkType(
+ kNss, ChunkVersion(11, 2, epoch), kShard0.getName(), BSON("a" << 11), BSON("a" << 20));
+ const auto shard1Chunk0 = generateChunkType(
+ kNss, ChunkVersion(8, 1, epoch), kShard1.getName(), BSON("a" << 21), BSON("a" << 100));
+
+ const auto collectionVersion = shard0Chunk1.getVersion();
+ ChunkVersion targetChunkVersion(
+ collectionVersion.majorVersion() + 1, 0, collectionVersion.epoch());
+
+ setupChunks({shard0Chunk0, shard0Chunk1, shard1Chunk0});
+
+ auto opCtx = operationContext();
+ std::vector<ShardId> shardIds{kShard0.getName(), kShard1.getName()};
+ ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn(
+ opCtx, kNss, shardIds, [&](OperationContext*, TxnNumber) {});
+
+ assertOnlyOneChunkVersionBumped(
+ operationContext(), {shard0Chunk0, shard0Chunk1}, targetChunkVersion);
+
+ ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged(
+ shard1Chunk0, getChunkDoc(operationContext(), shard1Chunk0.getMin()), targetChunkVersion));
+}
+
+TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest,
+ BumpChunkVersionTwoChunksOnTwoShards) {
+ const auto epoch = OID::gen();
+ const auto shard0Chunk0 = generateChunkType(
+ kNss, ChunkVersion(10, 1, epoch), kShard0.getName(), BSON("a" << 1), BSON("a" << 10));
+ const auto shard0Chunk1 = generateChunkType(
+ kNss, ChunkVersion(11, 2, epoch), kShard0.getName(), BSON("a" << 11), BSON("a" << 20));
+ const auto shard1Chunk0 = generateChunkType(
+ kNss, ChunkVersion(8, 1, epoch), kShard1.getName(), BSON("a" << 21), BSON("a" << 100));
+ const auto shard1Chunk1 = generateChunkType(
+ kNss, ChunkVersion(12, 1, epoch), kShard1.getName(), BSON("a" << 101), BSON("a" << 200));
+
+ const auto collectionVersion = shard1Chunk1.getVersion();
+ ChunkVersion targetChunkVersion(
+ collectionVersion.majorVersion() + 1, 0, collectionVersion.epoch());
+
+ setupChunks({shard0Chunk0, shard0Chunk1, shard1Chunk0, shard1Chunk1});
+
+ auto opCtx = operationContext();
+ std::vector<ShardId> shardIds{kShard0.getName(), kShard1.getName()};
+ ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn(
+ opCtx, kNss, shardIds, [&](OperationContext*, TxnNumber) {});
+
+ assertOnlyOneChunkVersionBumped(
+ operationContext(), {shard0Chunk0, shard0Chunk1}, targetChunkVersion);
+
+ assertOnlyOneChunkVersionBumped(
+ operationContext(), {shard1Chunk0, shard1Chunk1}, targetChunkVersion);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
index be40c6a5ed6..d1f09247cdb 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
@@ -40,6 +40,7 @@
#include "mongo/client/read_preference.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/logical_session_cache.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/repl_client_info.h"
@@ -55,6 +56,7 @@
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/shard_key_pattern.h"
+#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/str.h"
@@ -67,6 +69,27 @@ MONGO_FAIL_POINT_DEFINE(skipExpiringOldChunkHistory);
const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::UNSET, Seconds(0));
+BatchedCommandRequest buildUpdateOp(const NamespaceString& nss,
+ const BSONObj& query,
+ const BSONObj& update,
+ bool upsert,
+ bool multi) {
+ BatchedCommandRequest request([&] {
+ write_ops::Update updateOp(nss);
+ updateOp.setUpdates({[&] {
+ write_ops::UpdateOpEntry entry;
+ entry.setQ(query);
+ entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(update));
+ entry.setUpsert(upsert);
+ entry.setMulti(multi);
+ return entry;
+ }()});
+ return updateOp;
+ }());
+
+ return request;
+}
+
/**
* Append min, max and version information from chunk to the buffer for logChange purposes.
*/
@@ -300,13 +323,9 @@ StatusWith<ChunkVersion> getMaxChunkVersionFromQueryResponse(
return ChunkVersion::parseLegacyWithField(chunksVector.front(), ChunkType::lastmod());
}
-// Helper function to get collection version and donor shard version following a merge/move/split
-BSONObj getShardAndCollectionVersion(OperationContext* opCtx,
- const NamespaceString& nss,
- const ShardId& fromShard) {
- BSONObjBuilder result;
-
- auto swCollectionVersion = getMaxChunkVersionFromQueryResponse(
+// Helper function to get the collection version for nss. Always uses kLocalReadConcern.
+StatusWith<ChunkVersion> getCollectionVersion(OperationContext* opCtx, const NamespaceString& nss) {
+ return getMaxChunkVersionFromQueryResponse(
nss,
Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
opCtx,
@@ -316,7 +335,15 @@ BSONObj getShardAndCollectionVersion(OperationContext* opCtx,
BSON("ns" << nss.ns()), // Query all chunks for this namespace.
BSON(ChunkType::lastmod << -1), // Sort by version.
1)); // Limit 1.
+}
+
+// Helper function to get collection version and donor shard version following a merge/move/split
+BSONObj getShardAndCollectionVersion(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ShardId& fromShard) {
+ BSONObjBuilder result;
+ auto swCollectionVersion = getCollectionVersion(opCtx, nss);
auto collectionVersion = uassertStatusOKWithContext(
std::move(swCollectionVersion), "Couldn't retrieve collection version from config server");
@@ -359,6 +386,47 @@ BSONObj getShardAndCollectionVersion(OperationContext* opCtx,
return result.obj();
}
+void bumpMajorVersionOneChunkPerShard(OperationContext* opCtx,
+ const NamespaceString& nss,
+ TxnNumber txnNumber,
+ const std::vector<ShardId>& shardIds) {
+ auto curCollectionVersion = uassertStatusOK(getCollectionVersion(opCtx, nss));
+ ChunkVersion targetChunkVersion(
+ curCollectionVersion.majorVersion() + 1, 0, curCollectionVersion.epoch());
+
+ for (const auto& shardId : shardIds) {
+ BSONObjBuilder updateBuilder;
+ BSONObjBuilder updateVersionClause(updateBuilder.subobjStart("$set"));
+ targetChunkVersion.appendLegacyWithField(&updateVersionClause, ChunkType::lastmod());
+ updateVersionClause.doneFast();
+ auto chunkUpdate = updateBuilder.obj();
+
+ auto request = buildUpdateOp(
+ ChunkType::ConfigNS,
+ BSON(ChunkType::ns(nss.ns()) << ChunkType::shard(shardId.toString())), // query
+ chunkUpdate, // update
+ false, // upsert
+ false // multi
+ );
+
+ auto res = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn(
+ opCtx, ChunkType::ConfigNS, request, txnNumber);
+
+ auto numDocsExpectedModified = 1;
+ auto numDocsModified = res.getIntField("n");
+
+ uassert(5030400,
+ str::stream() << "Expected to match " << numDocsExpectedModified
+ << " docs, but only matched " << numDocsModified
+ << " for write request " << request.toString(),
+ numDocsExpectedModified == numDocsModified);
+
+ // There exists a constraint that a chunk version must be unique for a given namespace,
+ // so the minor version is incremented for each chunk placed.
+ targetChunkVersion.incMinor();
+ }
+}
+
} // namespace
StatusWith<BSONObj> ShardingCatalogManager::commitChunkSplit(
@@ -375,16 +443,7 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkSplit(
Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock);
// Get the max chunk version for this namespace.
- auto swCollVersion = getMaxChunkVersionFromQueryResponse(
- nss,
- Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
- opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- repl::ReadConcernLevel::kLocalReadConcern,
- ChunkType::ConfigNS,
- BSON("ns" << nss.ns()), // Query all chunks for this namespace.
- BSON(ChunkType::lastmod << -1), // Sort by version.
- 1)); // Limit 1.
+ auto swCollVersion = getCollectionVersion(opCtx, nss);
if (!swCollVersion.isOK()) {
return swCollVersion.getStatus().withContext(
@@ -595,17 +654,7 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMerge(
}
// Get the max chunk version for this namespace.
- auto swCollVersion = getMaxChunkVersionFromQueryResponse(
- nss,
- Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
- opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- repl::ReadConcernLevel::kLocalReadConcern,
- ChunkType::ConfigNS,
- BSON("ns" << nss.ns()), // Query all chunks for this namespace.
- BSON(ChunkType::lastmod << -1), // Sort by version.
- 1)); // Limit 1.
-
+ auto swCollVersion = getCollectionVersion(opCtx, nss);
if (!swCollVersion.isOK()) {
return swCollVersion.getStatus().withContext(str::stream()
<< "mergeChunk cannot merge chunks.");
@@ -1182,4 +1231,21 @@ void ShardingCatalogManager::ensureChunkVersionIsGreaterThan(OperationContext* o
}
}
+void ShardingCatalogManager::bumpCollShardVersionsAndChangeMetadataInTxn(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const std::vector<ShardId>& shardIds,
+ unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc) {
+
+ // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and
+ // migrations
+ Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock);
+ withTransaction(opCtx,
+ NamespaceString::kConfigReshardingOperationsNamespace,
+ [&](OperationContext* opCtx, TxnNumber txnNumber) {
+ bumpMajorVersionOneChunkPerShard(opCtx, nss, txnNumber, shardIds);
+ changeMetadataFunc(opCtx, txnNumber);
+ });
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
index 50470cf2a80..a216412912c 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
@@ -141,7 +141,7 @@ BatchedCommandRequest buildUpdateOp(const NamespaceString& nss,
const BSONObj& query,
const BSONObj& update,
bool upsert,
- bool useMultiUpdate) {
+ bool multi) {
BatchedCommandRequest request([&] {
write_ops::Update updateOp(nss);
updateOp.setUpdates({[&] {
@@ -149,7 +149,7 @@ BatchedCommandRequest buildUpdateOp(const NamespaceString& nss,
entry.setQ(query);
entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(update));
entry.setUpsert(upsert);
- entry.setMulti(useMultiUpdate);
+ entry.setMulti(multi);
return entry;
}()});
return updateOp;
@@ -637,15 +637,10 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx,
collType.setEpoch(newEpoch);
collType.setKeyPattern(newShardKeyPattern.getKeyPattern());
- {
+ auto updateCollectionAndChunksFn = [&](OperationContext* opCtx, TxnNumber txnNumber) {
// Update the config.collections entry for the given namespace.
- AlternativeSessionRegion asr(opCtx);
- AuthorizationSession::get(asr.opCtx()->getClient())
- ->grantInternalAuthorization(asr.opCtx()->getClient());
- TxnNumber txnNumber = 0;
-
updateShardingCatalogEntryForCollectionInTxn(
- asr.opCtx(), nss, collType, false /* upsert */, true /* startTransaction */, txnNumber);
+ opCtx, nss, collType, false /* upsert */, txnNumber);
LOGV2(21933,
"refineCollectionShardKey updated collection entry for {namespace}: took "
@@ -667,17 +662,15 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx,
// to the newly-generated objectid, (ii) their bounds for each new field in the refined
// key to MinKey (except for the global max chunk where the max bounds are set to
// MaxKey), and unsetting (iii) their jumbo field.
- uassertStatusOK(getStatusFromWriteCommandReply(
- writeToConfigDocumentInTxn(asr.opCtx(),
- ChunkType::ConfigNS,
- buildPipelineUpdateOp(ChunkType::ConfigNS,
- BSON("ns" << nss.ns()),
- chunkUpdates,
- false, // upsert
- true // useMultiUpdate
- ),
- false, // startTransaction
- txnNumber)));
+ writeToConfigDocumentInTxn(opCtx,
+ ChunkType::ConfigNS,
+ buildPipelineUpdateOp(ChunkType::ConfigNS,
+ BSON("ns" << nss.ns()),
+ chunkUpdates,
+ false, // upsert
+ true // useMultiUpdate
+ ),
+ txnNumber);
LOGV2(21935,
"refineCollectionShardKey: updated chunk entries for {namespace}: took "
@@ -691,17 +684,15 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx,
// Update all config.tags entries for the given namespace by setting their bounds for
// each new field in the refined key to MinKey (except for the global max tag where the
// max bounds are set to MaxKey).
- uassertStatusOK(getStatusFromWriteCommandReply(
- writeToConfigDocumentInTxn(asr.opCtx(),
- TagsType::ConfigNS,
- buildPipelineUpdateOp(TagsType::ConfigNS,
- BSON("ns" << nss.ns()),
- tagUpdates,
- false, // upsert
- true // useMultiUpdate
- ),
- false, // startTransaction
- txnNumber)));
+ writeToConfigDocumentInTxn(opCtx,
+ TagsType::ConfigNS,
+ buildPipelineUpdateOp(TagsType::ConfigNS,
+ BSON("ns" << nss.ns()),
+ tagUpdates,
+ false, // upsert
+ true // useMultiUpdate
+ ),
+ txnNumber);
LOGV2(21936,
@@ -716,10 +707,9 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx,
LOGV2(21937, "Hit hangRefineCollectionShardKeyBeforeCommit failpoint");
hangRefineCollectionShardKeyBeforeCommit.pauseWhileSet(opCtx);
}
+ };
- // Note this will wait for majority write concern.
- commitTxnForConfigDocument(asr.opCtx(), txnNumber);
- }
+ withTransaction(opCtx, nss, std::move(updateCollectionAndChunksFn));
ShardingLogging::get(opCtx)->logChange(opCtx,
"refineCollectionShardKey.end",
@@ -747,9 +737,8 @@ void ShardingCatalogManager::updateShardingCatalogEntryForCollectionInTxn(
const NamespaceString& nss,
const CollectionType& coll,
const bool upsert,
- const bool startTransaction,
TxnNumber txnNumber) {
- auto status = getStatusFromCommandResult(
+ try {
writeToConfigDocumentInTxn(opCtx,
CollectionType::ConfigNS,
buildUpdateOp(CollectionType::ConfigNS,
@@ -758,9 +747,11 @@ void ShardingCatalogManager::updateShardingCatalogEntryForCollectionInTxn(
upsert,
false /* multi */
),
- startTransaction,
- txnNumber));
- uassertStatusOKWithContext(status, "Collection metadata write failed");
+ txnNumber);
+ } catch (DBException& e) {
+ e.addContext("Collection metadata write failed");
+ throw;
+ }
}
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 08f23a349e2..1bac5597a72 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -50,46 +50,62 @@
namespace mongo {
namespace {
-template <typename Callable>
-void withAlternateSession(OperationContext* opCtx, Callable&& callable) {
- AlternativeSessionRegion asr(opCtx);
- AuthorizationSession::get(asr.opCtx()->getClient())
- ->grantInternalAuthorization(asr.opCtx()->getClient());
- TxnNumber txnNumber = 0;
-
- auto guard = makeGuard([opCtx = asr.opCtx(), txnNumber] {
- try {
- ShardingCatalogManager::get(opCtx)->abortTxnForConfigDocument(opCtx, txnNumber);
- } catch (const DBException& ex) {
- LOGV2(5165900,
- "Failed to abort transaction to resharding metadata",
- "error"_attr = redact(ex));
- }
- });
+BatchedCommandRequest buildInsertOp(const NamespaceString& nss, std::vector<BSONObj> docs) {
+ BatchedCommandRequest request([&] {
+ write_ops::Insert insertOp(nss);
+ insertOp.setDocuments(docs);
+ return insertOp;
+ }());
- callable(asr.opCtx(), txnNumber);
+ return request;
+}
- guard.dismiss();
+BatchedCommandRequest buildDeleteOp(const NamespaceString& nss,
+ const BSONObj& query,
+ bool multiDelete) {
+ BatchedCommandRequest request([&] {
+ write_ops::Delete deleteOp(nss);
+ deleteOp.setDeletes({[&] {
+ write_ops::DeleteOpEntry entry;
+ entry.setQ(query);
+ entry.setMulti(multiDelete);
+ return entry;
+ }()});
+ return deleteOp;
+ }());
+
+ return request;
}
-void writeConfigDocs(OperationContext* opCtx,
- const NamespaceString& nss,
- BatchedCommandRequest request,
- bool startTransaction,
- TxnNumber txnNumber,
- boost::optional<int> expectedNumModified) {
- auto response = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn(
- opCtx, nss, request, startTransaction, txnNumber);
- uassertStatusOK(getStatusFromCommandResult(response));
+BatchedCommandRequest buildUpdateOp(const NamespaceString& nss,
+ const BSONObj& query,
+ const BSONObj& update,
+ bool upsert,
+ bool multi) {
+ BatchedCommandRequest request([&] {
+ write_ops::Update updateOp(nss);
+ updateOp.setUpdates({[&] {
+ write_ops::UpdateOpEntry entry;
+ entry.setQ(query);
+ entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(update));
+ entry.setUpsert(upsert);
+ entry.setMulti(multi);
+ return entry;
+ }()});
+ return updateOp;
+ }());
- if (!expectedNumModified)
- return;
+ return request;
+}
- uassert(5030400,
- str::stream() << "Expected to match " << expectedNumModified
- << " docs, but only matched " << response.getIntField("n")
- << " for write request " << request.toString(),
- response.getIntField("n") == expectedNumModified);
+void assertNumDocsModifiedMatchesExpected(const BatchedCommandRequest& request,
+ const BSONObj& response,
+ int expected) {
+ auto numDocsModified = response.getIntField("n");
+ uassert(5030401,
+ str::stream() << "Expected to match " << expected << " docs, but only matched "
+ << numDocsModified << " for write request " << request.toString(),
+ expected == numDocsModified);
}
void writeToCoordinatorStateNss(OperationContext* opCtx,
@@ -122,12 +138,15 @@ void writeToCoordinatorStateNss(OperationContext* opCtx,
auto expectedNumModified = (request.getBatchType() == BatchedCommandRequest::BatchType_Insert)
? boost::none
: boost::make_optional(1);
- writeConfigDocs(opCtx,
- NamespaceString::kConfigReshardingOperationsNamespace,
- std::move(request),
- true, // startTransaction
- txnNumber,
- expectedNumModified);
+ auto res = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn(
+ opCtx,
+ NamespaceString::kConfigReshardingOperationsNamespace,
+ std::move(request),
+ txnNumber);
+
+ if (expectedNumModified) {
+ assertNumDocsModifiedMatchesExpected(request, res, *expectedNumModified);
+ }
}
BSONObj createReshardingFieldsUpdateForOriginalNss(
@@ -184,18 +203,18 @@ void updateConfigCollectionsForOriginalNss(OperationContext* opCtx,
auto writeOp =
createReshardingFieldsUpdateForOriginalNss(opCtx, coordinatorDoc, newCollectionEpoch);
- writeConfigDocs(
- opCtx,
- CollectionType::ConfigNS,
+ auto request =
buildUpdateOp(CollectionType::ConfigNS,
BSON(CollectionType::kNssFieldName << coordinatorDoc.getNss().ns()), // query
writeOp,
false, // upsert
false // multi
- ),
- false, // startTransaction
- txnNumber,
- 1); // expectedNumModified
+ );
+
+ auto res = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn(
+ opCtx, CollectionType::ConfigNS, request, txnNumber);
+
+ assertNumDocsModifiedMatchesExpected(request, res, 1 /* expected */);
}
void writeToConfigCollectionsForTempNss(OperationContext* opCtx,
@@ -255,12 +274,13 @@ void writeToConfigCollectionsForTempNss(OperationContext* opCtx,
auto expectedNumModified = (request.getBatchType() == BatchedCommandRequest::BatchType_Insert)
? boost::none
: boost::make_optional(1);
- writeConfigDocs(opCtx,
- CollectionType::ConfigNS,
- std::move(request),
- false, // startTransaction
- txnNumber,
- expectedNumModified);
+
+ auto res = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn(
+ opCtx, CollectionType::ConfigNS, std::move(request), txnNumber);
+
+ if (expectedNumModified) {
+ assertNumDocsModifiedMatchesExpected(request, res, *expectedNumModified);
+ }
}
void insertChunkAndTagDocsForTempNss(OperationContext* opCtx,
@@ -274,11 +294,8 @@ void insertChunkAndTagDocsForTempNss(OperationContext* opCtx,
initialChunksBSON.begin(),
[](ChunkType chunk) { return chunk.toConfigBSON(); });
- ShardingCatalogManager::get(opCtx)->insertConfigDocumentsInTxn(opCtx,
- ChunkType::ConfigNS,
- std::move(initialChunksBSON),
- false, // startTransaction
- txnNumber);
+ ShardingCatalogManager::get(opCtx)->insertConfigDocumentsInTxn(
+ opCtx, ChunkType::ConfigNS, std::move(initialChunksBSON), txnNumber);
// Insert tag documents for temp nss
std::vector<BSONObj> zonesBSON(newZones.size());
@@ -286,11 +303,8 @@ void insertChunkAndTagDocsForTempNss(OperationContext* opCtx,
return chunk.toBSON();
});
- ShardingCatalogManager::get(opCtx)->insertConfigDocumentsInTxn(opCtx,
- TagsType::ConfigNS,
- std::move(zonesBSON),
- false, // startTransaction
- txnNumber);
+ ShardingCatalogManager::get(opCtx)->insertConfigDocumentsInTxn(
+ opCtx, TagsType::ConfigNS, std::move(zonesBSON), txnNumber);
}
void removeChunkAndTagsDocsForOriginalNss(OperationContext* opCtx,
@@ -298,29 +312,25 @@ void removeChunkAndTagsDocsForOriginalNss(OperationContext* opCtx,
TxnNumber txnNumber) {
// Remove all chunk documents for the original nss. We do not know how many chunk docs currently
// exist, so cannot pass a value for expectedNumModified
- writeConfigDocs(opCtx,
- ChunkType::ConfigNS,
- buildDeleteOp(ChunkType::ConfigNS,
- BSON(ChunkType::ns(coordinatorDoc.getNss().ns())), // query
- true // multi
- ),
- false, // startTransaction
- txnNumber,
- boost::none // expectedNumModified
- );
+ ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn(
+ opCtx,
+ ChunkType::ConfigNS,
+ buildDeleteOp(ChunkType::ConfigNS,
+ BSON(ChunkType::ns(coordinatorDoc.getNss().ns())), // query
+ true // multi
+ ),
+ txnNumber);
// Remove all tag documents for the original nss. We do not know how many tag docs currently
// exist, so cannot pass a value for expectedNumModified
- writeConfigDocs(opCtx,
- TagsType::ConfigNS,
- buildDeleteOp(TagsType::ConfigNS,
- BSON(ChunkType::ns(coordinatorDoc.getNss().ns())), // query
- true // multi
- ),
- false, // startTransaction
- txnNumber,
- boost::none // expectedNumModified
- );
+ ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn(
+ opCtx,
+ TagsType::ConfigNS,
+ buildDeleteOp(TagsType::ConfigNS,
+ BSON(ChunkType::ns(coordinatorDoc.getNss().ns())), // query
+ true // multi
+ ),
+ txnNumber);
}
void updateChunkAndTagsDocsForTempNss(OperationContext* opCtx,
@@ -332,34 +342,38 @@ void updateChunkAndTagsDocsForTempNss(OperationContext* opCtx,
// Update all chunk documents that currently have 'ns' as the temporary collection namespace
// such that 'ns' is now the original collection namespace and 'lastmodEpoch' is
// newCollectionEpoch.
- writeConfigDocs(
- opCtx,
- ChunkType::ConfigNS,
+ auto chunksRequest =
buildUpdateOp(ChunkType::ConfigNS,
BSON(ChunkType::ns(coordinatorDoc.getTempReshardingNss().ns())), // query
BSON("$set" << BSON("ns" << coordinatorDoc.getNss().ns() << "lastmodEpoch"
<< newCollectionEpoch)), // update
false, // upsert
true // multi
- ),
- false, // startTransaction
- txnNumber,
- expectedNumChunksModified);
+ );
- // Update the 'ns' field to be the original collection namespace for all tags documents that
- // currently have 'ns' as the temporary collection namespace
- writeConfigDocs(
- opCtx,
- TagsType::ConfigNS,
+ auto chunksRes = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn(
+ opCtx, ChunkType::ConfigNS, chunksRequest, txnNumber);
+
+ if (expectedNumChunksModified) {
+ assertNumDocsModifiedMatchesExpected(chunksRequest, chunksRes, *expectedNumChunksModified);
+ }
+
+ auto tagsRequest =
buildUpdateOp(TagsType::ConfigNS,
BSON(TagsType::ns(coordinatorDoc.getTempReshardingNss().ns())), // query
BSON("$set" << BSON("ns" << coordinatorDoc.getNss().ns())), // update
false, // upsert
true // multi
- ),
- false, // startTransaction
- txnNumber,
- expectedNumZonesModified);
+ );
+
+ // Update the 'ns' field to be the original collection namespace for all tags documents that
+ // currently have 'ns' as the temporary collection namespace
+ auto tagsRes = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn(
+ opCtx, TagsType::ConfigNS, tagsRequest, txnNumber);
+
+ if (expectedNumZonesModified) {
+ assertNumDocsModifiedMatchesExpected(tagsRequest, tagsRes, *expectedNumZonesModified);
+ }
}
/**
@@ -374,6 +388,95 @@ std::vector<ShardId> extractShardIds(const std::vector<T>& participantShardEntri
[](auto& shardEntry) { return shardEntry.getId(); });
return shardIds;
}
+
+//
+// Helper methods for ensuring donors/ recipients are able to notice when certain state transitions
+// occur.
+//
+// Donors/ recipients learn when to transition states by noticing a change in shard versions for one
+// of the two collections involved in the resharding operations.
+//
+// Donors are notified when shard versions spanning the original resharding collection are
+// incremented. Recipients are notified when shard versions spanning the temporary resharding
+// collection are incremented.
+//
+
+/**
+ * Maps which participants are to be notified when the coordinator transitions into a given state.
+ */
+enum class ParticipantsToNofityEnum { kDonors, kRecipients, kNone };
+stdx::unordered_map<CoordinatorStateEnum, ParticipantsToNofityEnum> notifyForStateTransition{
+ {CoordinatorStateEnum::kUnused, ParticipantsToNofityEnum::kNone},
+ {CoordinatorStateEnum::kInitializing, ParticipantsToNofityEnum::kNone},
+ {CoordinatorStateEnum::kPreparingToDonate, ParticipantsToNofityEnum::kDonors},
+ {CoordinatorStateEnum::kCloning, ParticipantsToNofityEnum::kRecipients},
+ {CoordinatorStateEnum::kMirroring, ParticipantsToNofityEnum::kDonors},
+ {CoordinatorStateEnum::kCommitted, ParticipantsToNofityEnum::kNone},
+ {CoordinatorStateEnum::kRenaming, ParticipantsToNofityEnum::kRecipients},
+ {CoordinatorStateEnum::kDropping, ParticipantsToNofityEnum::kDonors},
+ {CoordinatorStateEnum::kDone, ParticipantsToNofityEnum::kNone},
+ {CoordinatorStateEnum::kError, ParticipantsToNofityEnum::kNone},
+};
+
+/**
+ * Runs resharding metadata changes in a transaction.
+ *
+ * This function should only be called if donor and recipient shards DO NOT need to be informed of
+ * the updatedCoordinatorDoc's state transition. If donor or recipient shards need to be informed,
+ * instead call bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn().
+ */
+void executeStateTransitionAndMetadataChangesInTxn(
+ OperationContext* opCtx,
+ const ReshardingCoordinatorDocument& updatedCoordinatorDoc,
+ unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc) {
+ const auto& state = updatedCoordinatorDoc.getState();
+ invariant(notifyForStateTransition.find(state) != notifyForStateTransition.end());
+ invariant(notifyForStateTransition[state] == ParticipantsToNofityEnum::kNone);
+
+ // Neither donors nor recipients need to be informed of the transition to
+ // updatedCoordinatorDoc's state.
+ ShardingCatalogManager::withTransaction(opCtx,
+ NamespaceString::kConfigReshardingOperationsNamespace,
+ [&](OperationContext* opCtx, TxnNumber txnNumber) {
+ changeMetadataFunc(opCtx, txnNumber);
+ });
+}
+
+/**
+ * In a single transaction, bumps the shard version for each shard spanning the corresponding
+ * resharding collection and executes changeMetadataFunc.
+ *
+ * This function should only be called if donor or recipient shards need to be informed of the
+ * updatedCoordinatorDoc's state transition. If donor or recipient shards do not need to be
+ * informed, instead call executeStateTransitionAndMetadataChangesInTxn().
+ */
+void bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn(
+ OperationContext* opCtx,
+ const ReshardingCoordinatorDocument& updatedCoordinatorDoc,
+ unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc) {
+ const auto& state = updatedCoordinatorDoc.getState();
+ invariant(notifyForStateTransition.find(state) != notifyForStateTransition.end());
+ invariant(notifyForStateTransition[state] != ParticipantsToNofityEnum::kNone);
+
+ auto participantsToNotify = notifyForStateTransition[state];
+ if (participantsToNotify == ParticipantsToNofityEnum::kDonors) {
+ // Bump the donor shard versions for the original namespace along with updating the
+ // metadata.
+ ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn(
+ opCtx,
+ updatedCoordinatorDoc.getNss(),
+ extractShardIds(updatedCoordinatorDoc.getDonorShards()),
+ std::move(changeMetadataFunc));
+ } else if (participantsToNotify == ParticipantsToNofityEnum::kRecipients) {
+ // Bump the recipient shard versions for the original namespace along with updating the
+ // metadata.
+ ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn(
+ opCtx,
+ updatedCoordinatorDoc.getTempReshardingNss(),
+ extractShardIds(updatedCoordinatorDoc.getRecipientShards()),
+ std::move(changeMetadataFunc));
+ }
+}
} // namespace
namespace resharding {
@@ -413,27 +516,25 @@ void persistInitialStateAndCatalogUpdates(OperationContext* opCtx,
opCtx, coordinatorDoc.getNss(), repl::ReadConcernLevel::kMajorityReadConcern));
const auto collation = originalCollType.value.getDefaultCollation();
- withAlternateSession(opCtx, [=](OperationContext* opCtx, TxnNumber txnNumber) {
- // Insert state doc to config.reshardingOperations
- writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
-
- // Update the config.collections entry for the original collection to include
- // 'reshardingFields'
- updateConfigCollectionsForOriginalNss(opCtx, coordinatorDoc, boost::none, txnNumber);
+ bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn(
+ opCtx, coordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) {
+ // Insert state doc to config.reshardingOperations.
+ writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
- // Insert the config.collections entry for the temporary resharding collection. The chunks
- // all have the same epoch, so picking the last chunk here is arbitrary.
- auto chunkVersion = initialChunks.back().getVersion();
- writeToConfigCollectionsForTempNss(
- opCtx, coordinatorDoc, chunkVersion, collation, txnNumber);
+ // Update the config.collections entry for the original collection to include
+ // 'reshardingFields'
+ updateConfigCollectionsForOriginalNss(opCtx, coordinatorDoc, boost::none, txnNumber);
- // Insert new initial chunk and tag documents
- insertChunkAndTagDocsForTempNss(
- opCtx, std::move(initialChunks), std::move(newZones), txnNumber);
+ // Insert the config.collections entry for the temporary resharding collection. The
+ // chunks all have the same epoch, so picking the last chunk here is arbitrary.
+ auto chunkVersion = initialChunks.back().getVersion();
+ writeToConfigCollectionsForTempNss(
+ opCtx, coordinatorDoc, chunkVersion, collation, txnNumber);
- // Commit the transaction
- ShardingCatalogManager::get(opCtx)->commitTxnForConfigDocument(opCtx, txnNumber);
- });
+ // Insert new initial chunk and tag documents.
+ insertChunkAndTagDocsForTempNss(
+ opCtx, std::move(initialChunks), std::move(newZones), txnNumber);
+ });
}
void persistCommittedState(OperationContext* opCtx,
@@ -441,39 +542,44 @@ void persistCommittedState(OperationContext* opCtx,
OID newCollectionEpoch,
boost::optional<int> expectedNumChunksModified,
boost::optional<int> expectedNumZonesModified) {
- withAlternateSession(opCtx, [=](OperationContext* opCtx, TxnNumber txnNumber) {
- // Update the config.reshardingOperations entry
- writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
+ executeStateTransitionAndMetadataChangesInTxn(
+ opCtx, coordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) {
+ // Update the config.reshardingOperations entry
+ writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
- // Remove the config.collections entry for the temporary collection
- writeToConfigCollectionsForTempNss(
- opCtx, coordinatorDoc, boost::none, boost::none, txnNumber);
-
- // Update the config.collections entry for the original namespace to reflect the new shard
- // key, new epoch, and new UUID
- updateConfigCollectionsForOriginalNss(opCtx, coordinatorDoc, newCollectionEpoch, txnNumber);
-
- // Remove all chunk and tag documents associated with the original collection, then update
- // the chunk and tag docs currently associated with the temp nss to be associated with the
- // original nss
- removeChunkAndTagsDocsForOriginalNss(opCtx, coordinatorDoc, txnNumber);
- updateChunkAndTagsDocsForTempNss(opCtx,
- coordinatorDoc,
- newCollectionEpoch,
- expectedNumChunksModified,
- expectedNumZonesModified,
- txnNumber);
-
- // Commit the transaction
- ShardingCatalogManager::get(opCtx)->commitTxnForConfigDocument(opCtx, txnNumber);
- });
+ // Remove the config.collections entry for the temporary collection
+ writeToConfigCollectionsForTempNss(
+ opCtx, coordinatorDoc, boost::none, boost::none, txnNumber);
+
+ // Update the config.collections entry for the original namespace to reflect the new
+ // shard key, new epoch, and new UUID
+ updateConfigCollectionsForOriginalNss(
+ opCtx, coordinatorDoc, newCollectionEpoch, txnNumber);
+
+ // Remove all chunk and tag documents associated with the original collection, then
+ // update the chunk and tag docs currently associated with the temp nss to be associated
+ // with the original nss
+ removeChunkAndTagsDocsForOriginalNss(opCtx, coordinatorDoc, txnNumber);
+ updateChunkAndTagsDocsForTempNss(opCtx,
+ coordinatorDoc,
+ newCollectionEpoch,
+ expectedNumChunksModified,
+ expectedNumZonesModified,
+ txnNumber);
+ });
}
-void persistStateTransition(OperationContext* opCtx,
- const ReshardingCoordinatorDocument& coordinatorDoc) {
+void persistStateTransitionAndCatalogUpdatesThenBumpShardVersions(
+ OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc) {
// Run updates to config.reshardingOperations and config.collections in a transaction
auto nextState = coordinatorDoc.getState();
- withAlternateSession(opCtx, [=](OperationContext* opCtx, TxnNumber txnNumber) {
+ invariant(notifyForStateTransition.find(nextState) != notifyForStateTransition.end());
+ // TODO SERVER-51800 Remove special casing for kError.
+ invariant(nextState == CoordinatorStateEnum::kError ||
+ notifyForStateTransition[nextState] != ParticipantsToNofityEnum::kNone);
+
+ // Resharding metadata changes to be executed.
+ auto changeMetadataFunc = [&](OperationContext* opCtx, TxnNumber txnNumber) {
// Update the config.reshardingOperations entry
writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
@@ -482,31 +588,36 @@ void persistStateTransition(OperationContext* opCtx,
// Update the config.collections entry for the temporary resharding collection. If we've
// already committed this operation, we've removed the entry for the temporary
- // collection and updated the entry with original namespace to have the new shard key, UUID,
- // and epoch
+ // collection and updated the entry with original namespace to have the new shard key,
+ // UUID, and epoch
if (nextState < CoordinatorStateEnum::kCommitted ||
nextState == CoordinatorStateEnum::kError) {
writeToConfigCollectionsForTempNss(
opCtx, coordinatorDoc, boost::none, boost::none, txnNumber);
}
+ };
- // Commit the transaction
- ShardingCatalogManager::get(opCtx)->commitTxnForConfigDocument(opCtx, txnNumber);
- });
+ // TODO SERVER-51800 Remove special casing for kError.
+ if (nextState == CoordinatorStateEnum::kError) {
+ executeStateTransitionAndMetadataChangesInTxn(
+ opCtx, coordinatorDoc, std::move(changeMetadataFunc));
+ return;
+ }
+
+ bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn(
+ opCtx, coordinatorDoc, std::move(changeMetadataFunc));
}
void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx,
const ReshardingCoordinatorDocument& coordinatorDoc) {
- withAlternateSession(opCtx, [=](OperationContext* opCtx, TxnNumber txnNumber) {
- // Remove entry for this resharding operation from config.reshardingOperations
- writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
+ executeStateTransitionAndMetadataChangesInTxn(
+ opCtx, coordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) {
+ // Remove entry for this resharding operation from config.reshardingOperations
+ writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
- // Remove the resharding fields from the config.collections entry
- updateConfigCollectionsForOriginalNss(opCtx, coordinatorDoc, boost::none, txnNumber);
-
- // Commit the transaction
- ShardingCatalogManager::get(opCtx)->commitTxnForConfigDocument(opCtx, txnNumber);
- });
+ // Remove the resharding fields from the config.collections entry
+ updateConfigCollectionsForOriginalNss(opCtx, coordinatorDoc, boost::none, txnNumber);
+ });
}
} // namespace resharding
@@ -541,17 +652,19 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run(
.then([this, executor] { return _awaitAllRecipientsFinishedApplying(executor); })
.then([this, executor] { _tellAllDonorsToRefresh(executor); })
.then([this, executor] { return _awaitAllRecipientsInStrictConsistency(executor); })
- .then([this](const ReshardingCoordinatorDocument& updatedStateDoc) {
- return _commit(updatedStateDoc);
+ .then([this](const ReshardingCoordinatorDocument& updatedCoordinatorDoc) {
+ return _commit(updatedCoordinatorDoc);
})
.then([this] {
if (_coordinatorDoc.getState() > CoordinatorStateEnum::kRenaming) {
return;
}
- this->_runUpdates(CoordinatorStateEnum::kRenaming, _coordinatorDoc);
+ _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kRenaming,
+ _coordinatorDoc);
return;
})
+ .then([this, executor] { _tellAllRecipientsToRefresh(executor); })
.then([this, executor] { return _awaitAllRecipientsRenamedCollection(executor); })
.then([this, executor] { _tellAllDonorsToRefresh(executor); })
.then([this, executor] { return _awaitAllDonorsDroppedOriginalCollection(executor); })
@@ -564,7 +677,8 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run(
return status;
}
- _runUpdates(CoordinatorStateEnum::kError, _coordinatorDoc);
+ _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kError,
+ _coordinatorDoc);
LOGV2(4956902,
"Resharding failed",
@@ -662,11 +776,12 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonat
return _reshardingCoordinatorObserver->awaitAllDonorsReadyToDonate()
.thenRunOn(**executor)
- .then([this](ReshardingCoordinatorDocument updatedStateDoc) {
+ .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) {
auto highestMinFetchTimestamp =
- getHighestMinFetchTimestamp(updatedStateDoc.getDonorShards());
- this->_runUpdates(
- CoordinatorStateEnum::kCloning, updatedStateDoc, highestMinFetchTimestamp);
+ getHighestMinFetchTimestamp(coordinatorDocChangedOnDisk.getDonorShards());
+ _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kCloning,
+ coordinatorDocChangedOnDisk,
+ highestMinFetchTimestamp);
});
}
@@ -679,8 +794,9 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished
return _reshardingCoordinatorObserver->awaitAllRecipientsFinishedCloning()
.thenRunOn(**executor)
- .then([this](ReshardingCoordinatorDocument updatedStateDoc) {
- this->_runUpdates(CoordinatorStateEnum::kApplying, updatedStateDoc);
+ .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) {
+ this->_updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kApplying,
+ coordinatorDocChangedOnDisk);
});
}
@@ -693,8 +809,9 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished
return _reshardingCoordinatorObserver->awaitAllRecipientsFinishedApplying()
.thenRunOn(**executor)
- .then([this](ReshardingCoordinatorDocument updatedStateDoc) {
- this->_runUpdates(CoordinatorStateEnum::kMirroring, updatedStateDoc);
+ .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) {
+ this->_updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kMirroring,
+ coordinatorDocChangedOnDisk);
});
}
@@ -710,12 +827,12 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsInStrict
}
Future<void> ReshardingCoordinatorService::ReshardingCoordinator::_commit(
- const ReshardingCoordinatorDocument& updatedDoc) {
+ const ReshardingCoordinatorDocument& coordinatorDoc) {
if (_coordinatorDoc.getState() > CoordinatorStateEnum::kMirroring) {
return Status::OK();
}
- ReshardingCoordinatorDocument updatedCoordinatorDoc = updatedDoc;
+ ReshardingCoordinatorDocument updatedCoordinatorDoc = coordinatorDoc;
updatedCoordinatorDoc.setState(CoordinatorStateEnum::kCommitted);
// Get the number of initial chunks and new zones that we inserted during initialization in
@@ -759,8 +876,9 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsRenamedC
return _reshardingCoordinatorObserver->awaitAllRecipientsRenamedCollection()
.thenRunOn(**executor)
- .then([this](ReshardingCoordinatorDocument updatedStateDoc) {
- this->_runUpdates(CoordinatorStateEnum::kDropping, updatedStateDoc);
+ .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) {
+ _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kDropping,
+ coordinatorDocChangedOnDisk);
});
}
@@ -773,22 +891,24 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsDroppedOrigi
return _reshardingCoordinatorObserver->awaitAllDonorsDroppedOriginalCollection()
.thenRunOn(**executor)
- .then([this](ReshardingCoordinatorDocument updatedStateDoc) {
- this->_runUpdates(CoordinatorStateEnum::kDone, updatedStateDoc);
+ .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) {
+ _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kDone,
+ coordinatorDocChangedOnDisk);
});
}
-void ReshardingCoordinatorService::ReshardingCoordinator::_runUpdates(
- CoordinatorStateEnum nextState,
- ReshardingCoordinatorDocument updatedStateDoc,
- boost::optional<Timestamp> fetchTimestamp) {
+void ReshardingCoordinatorService::ReshardingCoordinator::
+ _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum nextState,
+ ReshardingCoordinatorDocument coordinatorDoc,
+ boost::optional<Timestamp> fetchTimestamp) {
// Build new state doc for coordinator state update
- ReshardingCoordinatorDocument updatedCoordinatorDoc = updatedStateDoc;
+ ReshardingCoordinatorDocument updatedCoordinatorDoc = coordinatorDoc;
updatedCoordinatorDoc.setState(nextState);
emplaceFetchTimestampIfExists(updatedCoordinatorDoc, std::move(fetchTimestamp));
auto opCtx = cc().makeOperationContext();
- resharding::persistStateTransition(opCtx.get(), updatedCoordinatorDoc);
+ resharding::persistStateTransitionAndCatalogUpdatesThenBumpShardVersions(opCtx.get(),
+ updatedCoordinatorDoc);
// Update in-memory coordinator doc
_coordinatorDoc = updatedCoordinatorDoc;
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h
index cfcfb5badc8..adb161a710c 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h
@@ -59,8 +59,8 @@ void persistCommittedState(OperationContext* opCtx,
boost::optional<int> expectedNumChunksModified,
boost::optional<int> expectedNumZonesModified);
-void persistStateTransition(OperationContext* opCtx,
- const ReshardingCoordinatorDocument& coordinatorDoc);
+void persistStateTransitionAndCatalogUpdatesThenBumpShardVersions(
+ OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc);
void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx,
const ReshardingCoordinatorDocument& coordinatorDoc);
@@ -201,9 +201,10 @@ private:
* Updates the entry for this resharding operation in config.reshardingOperations and the
* catalog entries for the original and temporary namespaces in config.collections.
*/
- void _runUpdates(CoordinatorStateEnum nextState,
- ReshardingCoordinatorDocument updatedStateDoc,
- boost::optional<Timestamp> fetchTimestamp = boost::none);
+ void _updateCoordinatorDocStateAndCatalogEntries(
+ CoordinatorStateEnum nextState,
+ ReshardingCoordinatorDocument coordinatorDoc,
+ boost::optional<Timestamp> fetchTimestamp = boost::none);
/**
* Sends 'flushRoutingTableCacheUpdatesWithWriteConcern' for the temporary namespace to all
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
index 4ee4d193ff1..0b423a7dbf2 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
@@ -27,6 +27,8 @@
* it in the license file.
*/
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
#include "mongo/platform/basic.h"
#include <boost/optional.hpp>
@@ -41,6 +43,7 @@
#include "mongo/db/s/resharding_util.h"
#include "mongo/db/s/transaction_coordinator_service.h"
#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/logv2/log.h"
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/unittest/unittest.h"
@@ -118,6 +121,34 @@ protected:
return collType;
}
+ // Returns the chunk for the donor shard.
+ ChunkType makeAndInsertChunksForDonorShard(const NamespaceString& nss,
+ OID epoch,
+ const ShardKeyPattern& shardKey,
+ std::vector<OID> ids) {
+ auto chunks = makeChunks(nss, epoch, shardKey, ids);
+
+ // Only the chunk corresponding to shard0000 is stored as a donor in the coordinator state
+ // document constructed.
+ auto donorChunk = chunks[0];
+ insertChunkAndZoneEntries({donorChunk}, {});
+ return donorChunk;
+ }
+
+ // Returns the chunk for the recipient shard.
+ ChunkType makeAndInsertChunksForRecipientShard(const NamespaceString& nss,
+ OID epoch,
+ const ShardKeyPattern& shardKey,
+ std::vector<OID> ids) {
+ auto chunks = makeChunks(nss, epoch, shardKey, ids);
+
+ // Only the chunk corresponding to shard0001 is stored as a recipient in the coordinator
+ // state document constructed.
+ auto recipientChunk = chunks[1];
+ insertChunkAndZoneEntries({recipientChunk}, {});
+ return recipientChunk;
+ }
+
std::vector<ChunkType> makeChunks(const NamespaceString& nss,
OID epoch,
const ShardKeyPattern& shardKey,
@@ -446,7 +477,8 @@ protected:
void persistStateTransitionUpdateExpectSuccess(
OperationContext* opCtx, ReshardingCoordinatorDocument expectedCoordinatorDoc) {
- resharding::persistStateTransition(opCtx, expectedCoordinatorDoc);
+ resharding::persistStateTransitionAndCatalogUpdatesThenBumpShardVersions(
+ opCtx, expectedCoordinatorDoc);
// Check that config.reshardingOperations and config.collections entries are updated
// correctly
@@ -503,6 +535,22 @@ protected:
opCtx, collType, true);
}
+ void assertChunkVersionDidNotIncreaseAfterStateTransition(
+ const ChunkType& chunk, const ChunkVersion& collectionVersion) {
+ auto chunkAfterTransition = getChunkDoc(operationContext(), chunk.getMin());
+ ASSERT_EQ(chunkAfterTransition.getStatus(), Status::OK());
+ ASSERT_EQ(chunkAfterTransition.getValue().getVersion().majorVersion(),
+ collectionVersion.majorVersion());
+ }
+
+ void assertChunkVersionIncreasedAfterStateTransition(const ChunkType& chunk,
+ const ChunkVersion& collectionVersion) {
+ auto chunkAfterTransition = getChunkDoc(operationContext(), chunk.getMin());
+ ASSERT_EQ(chunkAfterTransition.getStatus(), Status::OK());
+ ASSERT_EQ(chunkAfterTransition.getValue().getVersion().majorVersion(),
+ collectionVersion.majorVersion() + 1);
+ }
+
NamespaceString _originalNss = NamespaceString("db.foo");
UUID _originalUUID = UUID::gen();
OID _originalEpoch = OID::gen();
@@ -528,6 +576,13 @@ protected:
TEST_F(ReshardingCoordinatorPersistenceTest, PersistInitialInfoSucceeds) {
auto coordinatorDoc = makeCoordinatorDoc(CoordinatorStateEnum::kInitializing);
+
+ // Ensure the chunks for the original namespace exist since they will be bumped as a product of
+ // the state transition to kPreparingToDonate.
+ auto donorChunk = makeAndInsertChunksForDonorShard(
+ _originalNss, _originalEpoch, _oldShardKey, std::vector{OID::gen(), OID::gen()});
+ auto collectionVersion = donorChunk.getVersion();
+
auto initialChunks =
makeChunks(_tempNss, _tempEpoch, _newShardKey, std::vector{OID::gen(), OID::gen()});
auto newZones = makeZones(_tempNss, _newShardKey);
@@ -538,28 +593,47 @@ TEST_F(ReshardingCoordinatorPersistenceTest, PersistInitialInfoSucceeds) {
persistInitialStateAndCatalogUpdatesExpectSuccess(
operationContext(), expectedCoordinatorDoc, initialChunks, newZones);
+
+ // Confirm the shard version was increased for the donor shard.
+ auto donorChunkPostTransition = getChunkDoc(operationContext(), donorChunk.getMin());
+ ASSERT_EQ(donorChunkPostTransition.getStatus(), Status::OK());
+ ASSERT_EQ(donorChunkPostTransition.getValue().getVersion().majorVersion(),
+ collectionVersion.majorVersion() + 1);
}
TEST_F(ReshardingCoordinatorPersistenceTest, PersistBasicStateTransitionSucceeds) {
auto coordinatorDoc =
insertStateAndCatalogEntries(CoordinatorStateEnum::kCloning, _originalEpoch);
+ // Ensure the chunks for the original namespace exist since they will be bumped as a product of
+ // the state transition to kPreparingToDonate.
+ auto donorChunk = makeAndInsertChunksForDonorShard(
+ _originalNss, _originalEpoch, _oldShardKey, std::vector{OID::gen(), OID::gen()});
+ auto collectionVersion = donorChunk.getVersion();
+
// Persist the updates on disk
auto expectedCoordinatorDoc = coordinatorDoc;
expectedCoordinatorDoc.setState(CoordinatorStateEnum::kMirroring);
persistStateTransitionUpdateExpectSuccess(operationContext(), expectedCoordinatorDoc);
+ assertChunkVersionIncreasedAfterStateTransition(donorChunk, collectionVersion);
}
TEST_F(ReshardingCoordinatorPersistenceTest, PersistFetchTimestampStateTransitionSucceeds) {
auto coordinatorDoc =
insertStateAndCatalogEntries(CoordinatorStateEnum::kPreparingToDonate, _originalEpoch);
+ auto recipientChunk = makeAndInsertChunksForRecipientShard(
+ _tempNss, _tempEpoch, _newShardKey, std::vector{OID::gen(), OID::gen()});
+ auto collectionVersion = recipientChunk.getVersion();
+
// Persist the updates on disk
auto expectedCoordinatorDoc = coordinatorDoc;
expectedCoordinatorDoc.setState(CoordinatorStateEnum::kCloning);
emplaceFetchTimestampIfExists(expectedCoordinatorDoc, Timestamp(1, 1));
+
persistStateTransitionUpdateExpectSuccess(operationContext(), expectedCoordinatorDoc);
+ assertChunkVersionIncreasedAfterStateTransition(recipientChunk, collectionVersion);
}
TEST_F(ReshardingCoordinatorPersistenceTest, PersistCommitSucceeds) {
@@ -567,8 +641,11 @@ TEST_F(ReshardingCoordinatorPersistenceTest, PersistCommitSucceeds) {
auto coordinatorDoc = insertStateAndCatalogEntries(
CoordinatorStateEnum::kMirroring, _originalEpoch, fetchTimestamp);
auto initialChunksIds = std::vector{OID::gen(), OID::gen()};
- insertChunkAndZoneEntries(makeChunks(_tempNss, _tempEpoch, _newShardKey, initialChunksIds),
- makeZones(_tempNss, _newShardKey));
+
+ auto tempNssChunks = makeChunks(_tempNss, _tempEpoch, _newShardKey, initialChunksIds);
+ auto recipientChunk = tempNssChunks[1];
+ insertChunkAndZoneEntries(tempNssChunks, makeZones(_tempNss, _newShardKey));
+
insertChunkAndZoneEntries(
makeChunks(_originalNss, OID::gen(), _oldShardKey, std::vector{OID::gen(), OID::gen()}),
makeZones(_originalNss, _oldShardKey));
@@ -584,6 +661,9 @@ TEST_F(ReshardingCoordinatorPersistenceTest, PersistCommitSucceeds) {
persistCommittedStateExpectSuccess(
operationContext(), expectedCoordinatorDoc, fetchTimestamp, updatedChunks, updatedZones);
+
+ assertChunkVersionDidNotIncreaseAfterStateTransition(recipientChunk,
+ recipientChunk.getVersion());
}
TEST_F(ReshardingCoordinatorPersistenceTest, PersistTransitionToErrorSucceeds) {
@@ -614,9 +694,10 @@ TEST_F(ReshardingCoordinatorPersistenceTest,
// Do not insert initial entry into config.reshardingOperations. Attempt to update coordinator
// state documents.
auto coordinatorDoc = makeCoordinatorDoc(CoordinatorStateEnum::kCloning, Timestamp(1, 1));
- ASSERT_THROWS_CODE(resharding::persistStateTransition(operationContext(), coordinatorDoc),
+ ASSERT_THROWS_CODE(resharding::persistStateTransitionAndCatalogUpdatesThenBumpShardVersions(
+ operationContext(), coordinatorDoc),
AssertionException,
- 5030400);
+ 50577);
}
TEST_F(ReshardingCoordinatorPersistenceTest, PersistCommitDoesNotMatchChunksFails) {
@@ -645,7 +726,7 @@ TEST_F(ReshardingCoordinatorPersistenceTest, PersistCommitDoesNotMatchChunksFail
resharding::persistCommittedState(
operationContext(), expectedCoordinatorDoc, _finalEpoch, updatedChunks.size(), 0),
AssertionException,
- 5030400);
+ 5030401);
}
TEST_F(ReshardingCoordinatorPersistenceTest,
diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp
index 1c8eeab3a90..d7b20524d75 100644
--- a/src/mongo/db/s/resharding_util.cpp
+++ b/src/mongo/db/s/resharding_util.cpp
@@ -49,6 +49,7 @@
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/document_source_unwind.h"
#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/s/config/sharding_catalog_manager.h"
#include "mongo/db/storage/write_unit_of_work.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -151,54 +152,6 @@ NamespaceString constructTemporaryReshardingNss(StringData db, const UUID& sourc
sourceUuid.toString()));
}
-BatchedCommandRequest buildInsertOp(const NamespaceString& nss, std::vector<BSONObj> docs) {
- BatchedCommandRequest request([&] {
- write_ops::Insert insertOp(nss);
- insertOp.setDocuments(docs);
- return insertOp;
- }());
-
- return request;
-}
-
-BatchedCommandRequest buildUpdateOp(const NamespaceString& nss,
- const BSONObj& query,
- const BSONObj& update,
- bool upsert,
- bool multi) {
- BatchedCommandRequest request([&] {
- write_ops::Update updateOp(nss);
- updateOp.setUpdates({[&] {
- write_ops::UpdateOpEntry entry;
- entry.setQ(query);
- entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(update));
- entry.setUpsert(upsert);
- entry.setMulti(multi);
- return entry;
- }()});
- return updateOp;
- }());
-
- return request;
-}
-
-BatchedCommandRequest buildDeleteOp(const NamespaceString& nss,
- const BSONObj& query,
- bool multiDelete) {
- BatchedCommandRequest request([&] {
- write_ops::Delete deleteOp(nss);
- deleteOp.setDeletes({[&] {
- write_ops::DeleteOpEntry entry;
- entry.setQ(query);
- entry.setMulti(multiDelete);
- return entry;
- }()});
- return deleteOp;
- }());
-
- return request;
-}
-
void tellShardsToRefresh(OperationContext* opCtx,
const std::vector<ShardId>& shardIds,
const NamespaceString& nss,
diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h
index 33c491a4d23..bec052dc1e0 100644
--- a/src/mongo/db/s/resharding_util.h
+++ b/src/mongo/db/s/resharding_util.h
@@ -102,27 +102,6 @@ UUID getCollectionUUIDFromChunkManger(const NamespaceString& nss, const ChunkMan
NamespaceString constructTemporaryReshardingNss(StringData db, const UUID& sourceUuid);
/**
- * Constructs a BatchedCommandRequest with batch type 'Insert'.
- */
-BatchedCommandRequest buildInsertOp(const NamespaceString& nss, std::vector<BSONObj> docs);
-
-/**
- * Constructs a BatchedCommandRequest with batch type 'Update'.
- */
-BatchedCommandRequest buildUpdateOp(const NamespaceString& nss,
- const BSONObj& query,
- const BSONObj& update,
- bool upsert,
- bool multi);
-
-/**
- * Constructs a BatchedCommandRequest with batch type 'Delete'.
- */
-BatchedCommandRequest buildDeleteOp(const NamespaceString& nss,
- const BSONObj& query,
- bool multiDelete);
-
-/**
* Sends _flushRoutingTableCacheUpdatesWithWriteConcern to a list of shards. Throws if one of the
* shards fails to refresh.
*/