summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/SConscript3
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager.cpp93
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager.h46
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp221
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp122
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp69
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp486
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.h11
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_test.cpp93
-rw-r--r--src/mongo/db/s/resharding_util.cpp49
-rw-r--r--src/mongo/db/s/resharding_util.h21
11 files changed, 369 insertions, 845 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 4aad6f8db57..1a438554288 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -162,7 +162,7 @@ env.Library(
'$BUILD_DIR/mongo/s/common_s',
'$BUILD_DIR/mongo/s/grid',
'sharding_api_d',
- 'sharding_catalog_manager',
+
],
)
@@ -561,7 +561,6 @@ env.CppUnitTest(
'config/sharding_catalog_manager_add_shard_test.cpp',
'config/sharding_catalog_manager_add_shard_to_zone_test.cpp',
'config/sharding_catalog_manager_assign_key_range_to_zone_test.cpp',
- 'config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp',
'config/sharding_catalog_manager_clear_jumbo_flag_test.cpp',
'config/sharding_catalog_manager_commit_chunk_migration_test.cpp',
'config/sharding_catalog_manager_config_initialization_test.cpp',
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.cpp b/src/mongo/db/s/config/sharding_catalog_manager.cpp
index ff8e7c04f8d..8d0b36cac78 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager.cpp
@@ -35,9 +35,7 @@
#include "mongo/db/auth/authorization_session_impl.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/query/query_request.h"
#include "mongo/db/s/balancer/type_migration.h"
-#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog/config_server_version.h"
#include "mongo/s/catalog/sharding_catalog_client.h"
@@ -88,24 +86,9 @@ OpMsg runCommandInLocalTxn(OperationContext* opCtx,
.response);
}
-void startTransactionWithNoopFind(OperationContext* opCtx,
- const NamespaceString& nss,
- TxnNumber txnNumber) {
- BSONObjBuilder findCmdBuilder;
- QueryRequest qr(nss);
- qr.setBatchSize(0);
- qr.setWantMore(false);
- qr.asFindCommand(&findCmdBuilder);
-
- auto res = runCommandInLocalTxn(
- opCtx, nss.db(), true /*startTransaction*/, txnNumber, findCmdBuilder.done())
- .body;
- uassertStatusOK(getStatusFromCommandResult(res));
-}
-
-BSONObj commitOrAbortTransaction(OperationContext* opCtx,
- TxnNumber txnNumber,
- std::string cmdName) {
+BSONObj runCommitOrAbortTxnForConfigDocument(OperationContext* opCtx,
+ TxnNumber txnNumber,
+ std::string cmdName) {
// Swap out the clients in order to get a fresh opCtx. Previous operations in this transaction
// that have been run on this opCtx would have set the timeout in the locker on the opCtx, but
// commit should not have a lock timeout.
@@ -141,24 +124,6 @@ BSONObj commitOrAbortTransaction(OperationContext* opCtx,
return replyOpMsg.body;
}
-// Runs commit for the transaction with 'txnNumber'.
-void commitTransaction(OperationContext* opCtx, TxnNumber txnNumber) {
- auto response = commitOrAbortTransaction(opCtx, txnNumber, "commitTransaction");
- uassertStatusOK(getStatusFromCommandResult(response));
- uassertStatusOK(getWriteConcernStatusFromCommandResult(response));
-}
-
-// Runs abort for the transaction with 'txnNumber'.
-void abortTransaction(OperationContext* opCtx, TxnNumber txnNumber) {
- auto response = commitOrAbortTransaction(opCtx, txnNumber, "abortTransaction");
-
- auto status = getStatusFromCommandResult(response);
- if (status.code() != ErrorCodes::NoSuchTransaction) {
- uassertStatusOK(status);
- uassertStatusOK(getWriteConcernStatusFromCommandResult(response));
- }
-}
-
} // namespace
void ShardingCatalogManager::create(ServiceContext* serviceContext,
@@ -501,21 +466,17 @@ StatusWith<bool> ShardingCatalogManager::_isShardRequiredByZoneStillInUse(
BSONObj ShardingCatalogManager::writeToConfigDocumentInTxn(OperationContext* opCtx,
const NamespaceString& nss,
const BatchedCommandRequest& request,
+ bool startTransaction,
TxnNumber txnNumber) {
invariant(nss.db() == NamespaceString::kConfigDb);
- auto response = runCommandInLocalTxn(
- opCtx, nss.db(), false /* startTransaction */, txnNumber, request.toBSON())
- .body;
-
- uassertStatusOK(getStatusFromCommandResult(response));
- uassertStatusOK(getWriteConcernStatusFromCommandResult(response));
-
- return response;
+ return runCommandInLocalTxn(opCtx, nss.db(), startTransaction, txnNumber, request.toBSON())
+ .body;
}
void ShardingCatalogManager::insertConfigDocumentsInTxn(OperationContext* opCtx,
const NamespaceString& nss,
std::vector<BSONObj> docs,
+ bool startTransaction,
TxnNumber txnNumber) {
invariant(nss.db() == NamespaceString::kConfigDb);
@@ -530,7 +491,8 @@ void ShardingCatalogManager::insertConfigDocumentsInTxn(OperationContext* opCtx,
return insertOp;
}());
- writeToConfigDocumentInTxn(opCtx, nss, request, txnNumber);
+ uassertStatusOK(getStatusFromWriteCommandReply(
+ writeToConfigDocumentInTxn(opCtx, nss, request, startTransaction, txnNumber)));
};
while (!docs.empty()) {
@@ -558,29 +520,22 @@ void ShardingCatalogManager::insertConfigDocumentsInTxn(OperationContext* opCtx,
doBatchInsert();
}
-void ShardingCatalogManager::withTransaction(
- OperationContext* opCtx,
- const NamespaceString& namespaceForInitialFind,
- unique_function<void(OperationContext*, TxnNumber)> func) {
- AlternativeSessionRegion asr(opCtx);
- AuthorizationSession::get(asr.opCtx()->getClient())
- ->grantInternalAuthorization(asr.opCtx()->getClient());
- TxnNumber txnNumber = 0;
-
- auto guard = makeGuard([opCtx = asr.opCtx(), txnNumber] {
- try {
- abortTransaction(opCtx, txnNumber);
- } catch (DBException& e) {
- LOGV2_WARNING(5192100,
- "Failed to abort transaction in AlternativeSessionRegion",
- "error"_attr = redact(e));
- }
- });
+void ShardingCatalogManager::commitTxnForConfigDocument(OperationContext* opCtx,
+ TxnNumber txnNumber) {
+ auto response = runCommitOrAbortTxnForConfigDocument(opCtx, txnNumber, "commitTransaction");
+ uassertStatusOK(getStatusFromCommandResult(response));
+ uassertStatusOK(getWriteConcernStatusFromCommandResult(response));
+}
+
+void ShardingCatalogManager::abortTxnForConfigDocument(OperationContext* opCtx,
+ TxnNumber txnNumber) {
+ auto response = runCommitOrAbortTxnForConfigDocument(opCtx, txnNumber, "abortTransaction");
- startTransactionWithNoopFind(asr.opCtx(), namespaceForInitialFind, txnNumber);
- func(asr.opCtx(), txnNumber);
- commitTransaction(asr.opCtx(), txnNumber);
- guard.dismiss();
+ auto status = getStatusFromCommandResult(response);
+ if (status.code() != ErrorCodes::NoSuchTransaction) {
+ uassertStatusOK(status);
+ uassertStatusOK(getWriteConcernStatusFromCommandResult(response));
+ }
}
} // namespace mongo
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.h b/src/mongo/db/s/config/sharding_catalog_manager.h
index fafcbd62897..a6f97372025 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager.h
+++ b/src/mongo/db/s/config/sharding_catalog_manager.h
@@ -30,9 +30,7 @@
#pragma once
#include "mongo/base/status_with.h"
-#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/concurrency/d_concurrency.h"
-#include "mongo/db/logical_session_cache.h"
#include "mongo/db/repl/optime_with.h"
#include "mongo/db/s/config/namespace_serializer.h"
#include "mongo/executor/task_executor.h"
@@ -185,28 +183,14 @@ public:
*/
Lock::ExclusiveLock lockZoneMutex(OperationContext* opCtx);
- //
- // General utilities related to the ShardingCatalogManager
- //
-
- /**
- * Starts and commits a transaction on the config server, with a no-op find on the specified
- * namespace in order to internally start the transaction. All writes done inside the
- * passed-in function must assume that they are run inside a transaction that will be commited
- * after the function itself has completely finished.
- */
- static void withTransaction(OperationContext* opCtx,
- const NamespaceString& namespaceForInitialFind,
- unique_function<void(OperationContext*, TxnNumber)> func);
-
/**
* Runs the write 'request' on namespace 'nss' in a transaction with 'txnNumber'. Write must be
- * on a collection in the config database. If expectedNumModified is specified, the number of
- * documents modified must match expectedNumModified - throws otherwise.
+ * on a collection in the config database.
*/
BSONObj writeToConfigDocumentInTxn(OperationContext* opCtx,
const NamespaceString& nss,
const BatchedCommandRequest& request,
+ bool startTransaction,
TxnNumber txnNumber);
/**
@@ -217,8 +201,19 @@ public:
void insertConfigDocumentsInTxn(OperationContext* opCtx,
const NamespaceString& nss,
std::vector<BSONObj> docs,
+ bool startTransaction,
TxnNumber txnNumber);
+ /**
+ * Runs commit for the transaction with 'txnNumber'.
+ */
+ void commitTxnForConfigDocument(OperationContext* opCtx, TxnNumber txnNumber);
+
+ /**
+ * Runs abort for the transaction with 'txnNumber'.
+ */
+ void abortTxnForConfigDocument(OperationContext* opCtx, TxnNumber txnNumber);
+
//
// Chunk Operations
//
@@ -288,20 +283,6 @@ public:
const BSONObj& maxKey,
const ChunkVersion& version);
- /**
- * In a single transaction, effectively bumps the shard version for each shard in the collection
- * to be the current collection version's major version + 1 inside an already-running
- * transaction.
- *
- * Note: it's the responsibility of the caller to ensure that the list of shards is stable,
- * as any shards added after the shard ids have been passed in will be missed.
- */
- void bumpCollShardVersionsAndChangeMetadataInTxn(
- OperationContext* opCtx,
- const NamespaceString& nss,
- const std::vector<ShardId>& shardIds,
- unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc);
-
//
// Database Operations
//
@@ -400,6 +381,7 @@ public:
const NamespaceString& nss,
const CollectionType& coll,
const bool upsert,
+ const bool startTransaction,
TxnNumber txnNumber);
/**
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp
deleted file mode 100644
index 7df7df0e6c7..00000000000
--- a/src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Copyright (C) 2020-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/dbdirectclient.h"
-#include "mongo/db/logical_session_cache_noop.h"
-#include "mongo/db/repl/wait_for_majority_service.h"
-#include "mongo/db/s/config/config_server_test_fixture.h"
-#include "mongo/db/s/config/sharding_catalog_manager.h"
-#include "mongo/db/s/transaction_coordinator_service.h"
-#include "mongo/db/session_catalog_mongod.h"
-#include "mongo/db/transaction_participant.h"
-#include "mongo/logv2/log.h"
-
-namespace mongo {
-namespace {
-
-const NamespaceString kNss("TestDB", "TestColl");
-const ShardType kShard0("shard0000", "shard0000:1234");
-const ShardType kShard1("shard0001", "shard0001:1234");
-
-class ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest
- : public ConfigServerTestFixture {
- void setUp() {
- ConfigServerTestFixture::setUp();
- setupShards({kShard0, kShard1});
-
- // Create config.transactions collection.
- auto opCtx = operationContext();
- DBDirectClient client(opCtx);
- client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns());
- client.createCollection(CollectionType::ConfigNS.ns());
-
- LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>());
- TransactionCoordinatorService::get(operationContext())
- ->onShardingInitialization(operationContext(), true);
- }
-
- void tearDown() {
- TransactionCoordinatorService::get(operationContext())->onStepDown();
- ConfigServerTestFixture::tearDown();
- }
-
-protected:
- ChunkType generateChunkType(const NamespaceString& nss,
- const ChunkVersion& chunkVersion,
- const ShardId& shardId,
- const BSONObj& minKey,
- const BSONObj& maxKey) {
- ChunkType chunkType;
- chunkType.setName(OID::gen());
- chunkType.setNS(nss);
- chunkType.setVersion(chunkVersion);
- chunkType.setShard(shardId);
- chunkType.setMin(minKey);
- chunkType.setMax(maxKey);
- chunkType.setHistory({ChunkHistory(Timestamp(100, 0), shardId)});
- return chunkType;
- }
-
- /**
- * Determines if the chunk's version has been bumped to the targetChunkVersion.
- */
- bool chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged(
- const ChunkType& chunkTypeBefore,
- const StatusWith<ChunkType> swChunkTypeAfter,
- const ChunkVersion& targetChunkVersion) {
- ASSERT_OK(swChunkTypeAfter.getStatus());
- auto chunkTypeAfter = swChunkTypeAfter.getValue();
-
- // Regardless of whether the major version was bumped, the chunk's other fields should be
- // unchanged.
- ASSERT_EQ(chunkTypeBefore.getName(), chunkTypeAfter.getName());
- ASSERT_EQ(chunkTypeBefore.getNS(), chunkTypeAfter.getNS());
- ASSERT_BSONOBJ_EQ(chunkTypeBefore.getMin(), chunkTypeAfter.getMin());
- ASSERT_BSONOBJ_EQ(chunkTypeBefore.getMax(), chunkTypeAfter.getMax());
- ASSERT(chunkTypeBefore.getHistory() == chunkTypeAfter.getHistory());
-
- return chunkTypeAfter.getVersion().majorVersion() == targetChunkVersion.majorVersion();
- }
-
- /**
- * If there are multiple chunks per shard, the chunk whose version gets bumped is not
- * deterministic.
- *
- * Asserts that only chunk per shard has its major version increased.
- */
- void assertOnlyOneChunkVersionBumped(OperationContext* opCtx,
- std::vector<ChunkType> originalChunkTypes,
- const ChunkVersion& targetChunkVersion) {
- auto aChunkVersionWasBumped = false;
- for (auto originalChunkType : originalChunkTypes) {
- auto swChunkTypeAfter = getChunkDoc(opCtx, originalChunkType.getMin());
- auto wasBumped = chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged(
- originalChunkType, swChunkTypeAfter, targetChunkVersion);
- if (aChunkVersionWasBumped) {
- ASSERT_FALSE(wasBumped);
- } else {
- aChunkVersionWasBumped = wasBumped;
- }
- }
-
- ASSERT_TRUE(aChunkVersionWasBumped);
- }
-};
-
-TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest,
- BumpChunkVersionOneChunkPerShard) {
- const auto epoch = OID::gen();
- const auto shard0Chunk0 = generateChunkType(
- kNss, ChunkVersion(10, 1, epoch), kShard0.getName(), BSON("a" << 1), BSON("a" << 10));
- const auto shard1Chunk0 = generateChunkType(
- kNss, ChunkVersion(11, 2, epoch), kShard1.getName(), BSON("a" << 11), BSON("a" << 20));
-
- const auto collectionVersion = shard1Chunk0.getVersion();
- ChunkVersion targetChunkVersion(
- collectionVersion.majorVersion() + 1, 0, collectionVersion.epoch());
-
- setupChunks({shard0Chunk0, shard1Chunk0});
-
- auto opCtx = operationContext();
-
- std::vector<ShardId> shardIds{kShard0.getName(), kShard1.getName()};
- ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn(
- opCtx, kNss, shardIds, [&](OperationContext*, TxnNumber) {});
-
- ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged(
- shard0Chunk0, getChunkDoc(operationContext(), shard0Chunk0.getMin()), targetChunkVersion));
-
- ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged(
- shard1Chunk0, getChunkDoc(operationContext(), shard1Chunk0.getMin()), targetChunkVersion));
-}
-
-TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest,
- BumpChunkVersionTwoChunksOnOneShard) {
- const auto epoch = OID::gen();
- const auto shard0Chunk0 = generateChunkType(
- kNss, ChunkVersion(10, 1, epoch), kShard0.getName(), BSON("a" << 1), BSON("a" << 10));
- const auto shard0Chunk1 = generateChunkType(
- kNss, ChunkVersion(11, 2, epoch), kShard0.getName(), BSON("a" << 11), BSON("a" << 20));
- const auto shard1Chunk0 = generateChunkType(
- kNss, ChunkVersion(8, 1, epoch), kShard1.getName(), BSON("a" << 21), BSON("a" << 100));
-
- const auto collectionVersion = shard0Chunk1.getVersion();
- ChunkVersion targetChunkVersion(
- collectionVersion.majorVersion() + 1, 0, collectionVersion.epoch());
-
- setupChunks({shard0Chunk0, shard0Chunk1, shard1Chunk0});
-
- auto opCtx = operationContext();
- std::vector<ShardId> shardIds{kShard0.getName(), kShard1.getName()};
- ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn(
- opCtx, kNss, shardIds, [&](OperationContext*, TxnNumber) {});
-
- assertOnlyOneChunkVersionBumped(
- operationContext(), {shard0Chunk0, shard0Chunk1}, targetChunkVersion);
-
- ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged(
- shard1Chunk0, getChunkDoc(operationContext(), shard1Chunk0.getMin()), targetChunkVersion));
-}
-
-TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest,
- BumpChunkVersionTwoChunksOnTwoShards) {
- const auto epoch = OID::gen();
- const auto shard0Chunk0 = generateChunkType(
- kNss, ChunkVersion(10, 1, epoch), kShard0.getName(), BSON("a" << 1), BSON("a" << 10));
- const auto shard0Chunk1 = generateChunkType(
- kNss, ChunkVersion(11, 2, epoch), kShard0.getName(), BSON("a" << 11), BSON("a" << 20));
- const auto shard1Chunk0 = generateChunkType(
- kNss, ChunkVersion(8, 1, epoch), kShard1.getName(), BSON("a" << 21), BSON("a" << 100));
- const auto shard1Chunk1 = generateChunkType(
- kNss, ChunkVersion(12, 1, epoch), kShard1.getName(), BSON("a" << 101), BSON("a" << 200));
-
- const auto collectionVersion = shard1Chunk1.getVersion();
- ChunkVersion targetChunkVersion(
- collectionVersion.majorVersion() + 1, 0, collectionVersion.epoch());
-
- setupChunks({shard0Chunk0, shard0Chunk1, shard1Chunk0, shard1Chunk1});
-
- auto opCtx = operationContext();
- std::vector<ShardId> shardIds{kShard0.getName(), kShard1.getName()};
- ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn(
- opCtx, kNss, shardIds, [&](OperationContext*, TxnNumber) {});
-
- assertOnlyOneChunkVersionBumped(
- operationContext(), {shard0Chunk0, shard0Chunk1}, targetChunkVersion);
-
- assertOnlyOneChunkVersionBumped(
- operationContext(), {shard1Chunk0, shard1Chunk1}, targetChunkVersion);
-}
-
-} // namespace
-} // namespace mongo
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
index d1f09247cdb..be40c6a5ed6 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
@@ -40,7 +40,6 @@
#include "mongo/client/read_preference.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/dbdirectclient.h"
-#include "mongo/db/logical_session_cache.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/repl_client_info.h"
@@ -56,7 +55,6 @@
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/shard_key_pattern.h"
-#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/str.h"
@@ -69,27 +67,6 @@ MONGO_FAIL_POINT_DEFINE(skipExpiringOldChunkHistory);
const WriteConcernOptions kNoWaitWriteConcern(1, WriteConcernOptions::SyncMode::UNSET, Seconds(0));
-BatchedCommandRequest buildUpdateOp(const NamespaceString& nss,
- const BSONObj& query,
- const BSONObj& update,
- bool upsert,
- bool multi) {
- BatchedCommandRequest request([&] {
- write_ops::Update updateOp(nss);
- updateOp.setUpdates({[&] {
- write_ops::UpdateOpEntry entry;
- entry.setQ(query);
- entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(update));
- entry.setUpsert(upsert);
- entry.setMulti(multi);
- return entry;
- }()});
- return updateOp;
- }());
-
- return request;
-}
-
/**
* Append min, max and version information from chunk to the buffer for logChange purposes.
*/
@@ -323,9 +300,13 @@ StatusWith<ChunkVersion> getMaxChunkVersionFromQueryResponse(
return ChunkVersion::parseLegacyWithField(chunksVector.front(), ChunkType::lastmod());
}
-// Helper function to get the collection version for nss. Always uses kLocalReadConcern.
-StatusWith<ChunkVersion> getCollectionVersion(OperationContext* opCtx, const NamespaceString& nss) {
- return getMaxChunkVersionFromQueryResponse(
+// Helper function to get collection version and donor shard version following a merge/move/split
+BSONObj getShardAndCollectionVersion(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const ShardId& fromShard) {
+ BSONObjBuilder result;
+
+ auto swCollectionVersion = getMaxChunkVersionFromQueryResponse(
nss,
Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
opCtx,
@@ -335,15 +316,7 @@ StatusWith<ChunkVersion> getCollectionVersion(OperationContext* opCtx, const Nam
BSON("ns" << nss.ns()), // Query all chunks for this namespace.
BSON(ChunkType::lastmod << -1), // Sort by version.
1)); // Limit 1.
-}
-
-// Helper function to get collection version and donor shard version following a merge/move/split
-BSONObj getShardAndCollectionVersion(OperationContext* opCtx,
- const NamespaceString& nss,
- const ShardId& fromShard) {
- BSONObjBuilder result;
- auto swCollectionVersion = getCollectionVersion(opCtx, nss);
auto collectionVersion = uassertStatusOKWithContext(
std::move(swCollectionVersion), "Couldn't retrieve collection version from config server");
@@ -386,47 +359,6 @@ BSONObj getShardAndCollectionVersion(OperationContext* opCtx,
return result.obj();
}
-void bumpMajorVersionOneChunkPerShard(OperationContext* opCtx,
- const NamespaceString& nss,
- TxnNumber txnNumber,
- const std::vector<ShardId>& shardIds) {
- auto curCollectionVersion = uassertStatusOK(getCollectionVersion(opCtx, nss));
- ChunkVersion targetChunkVersion(
- curCollectionVersion.majorVersion() + 1, 0, curCollectionVersion.epoch());
-
- for (const auto& shardId : shardIds) {
- BSONObjBuilder updateBuilder;
- BSONObjBuilder updateVersionClause(updateBuilder.subobjStart("$set"));
- targetChunkVersion.appendLegacyWithField(&updateVersionClause, ChunkType::lastmod());
- updateVersionClause.doneFast();
- auto chunkUpdate = updateBuilder.obj();
-
- auto request = buildUpdateOp(
- ChunkType::ConfigNS,
- BSON(ChunkType::ns(nss.ns()) << ChunkType::shard(shardId.toString())), // query
- chunkUpdate, // update
- false, // upsert
- false // multi
- );
-
- auto res = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn(
- opCtx, ChunkType::ConfigNS, request, txnNumber);
-
- auto numDocsExpectedModified = 1;
- auto numDocsModified = res.getIntField("n");
-
- uassert(5030400,
- str::stream() << "Expected to match " << numDocsExpectedModified
- << " docs, but only matched " << numDocsModified
- << " for write request " << request.toString(),
- numDocsExpectedModified == numDocsModified);
-
- // There exists a constraint that a chunk version must be unique for a given namespace,
- // so the minor version is incremented for each chunk placed.
- targetChunkVersion.incMinor();
- }
-}
-
} // namespace
StatusWith<BSONObj> ShardingCatalogManager::commitChunkSplit(
@@ -443,7 +375,16 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkSplit(
Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock);
// Get the max chunk version for this namespace.
- auto swCollVersion = getCollectionVersion(opCtx, nss);
+ auto swCollVersion = getMaxChunkVersionFromQueryResponse(
+ nss,
+ Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ ChunkType::ConfigNS,
+ BSON("ns" << nss.ns()), // Query all chunks for this namespace.
+ BSON(ChunkType::lastmod << -1), // Sort by version.
+ 1)); // Limit 1.
if (!swCollVersion.isOK()) {
return swCollVersion.getStatus().withContext(
@@ -654,7 +595,17 @@ StatusWith<BSONObj> ShardingCatalogManager::commitChunkMerge(
}
// Get the max chunk version for this namespace.
- auto swCollVersion = getCollectionVersion(opCtx, nss);
+ auto swCollVersion = getMaxChunkVersionFromQueryResponse(
+ nss,
+ Grid::get(opCtx)->shardRegistry()->getConfigShard()->exhaustiveFindOnConfig(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ repl::ReadConcernLevel::kLocalReadConcern,
+ ChunkType::ConfigNS,
+ BSON("ns" << nss.ns()), // Query all chunks for this namespace.
+ BSON(ChunkType::lastmod << -1), // Sort by version.
+ 1)); // Limit 1.
+
if (!swCollVersion.isOK()) {
return swCollVersion.getStatus().withContext(str::stream()
<< "mergeChunk cannot merge chunks.");
@@ -1231,21 +1182,4 @@ void ShardingCatalogManager::ensureChunkVersionIsGreaterThan(OperationContext* o
}
}
-void ShardingCatalogManager::bumpCollShardVersionsAndChangeMetadataInTxn(
- OperationContext* opCtx,
- const NamespaceString& nss,
- const std::vector<ShardId>& shardIds,
- unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc) {
-
- // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and
- // migrations
- Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock);
- withTransaction(opCtx,
- NamespaceString::kConfigReshardingOperationsNamespace,
- [&](OperationContext* opCtx, TxnNumber txnNumber) {
- bumpMajorVersionOneChunkPerShard(opCtx, nss, txnNumber, shardIds);
- changeMetadataFunc(opCtx, txnNumber);
- });
-}
-
} // namespace mongo
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
index a216412912c..50470cf2a80 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_collection_operations.cpp
@@ -141,7 +141,7 @@ BatchedCommandRequest buildUpdateOp(const NamespaceString& nss,
const BSONObj& query,
const BSONObj& update,
bool upsert,
- bool multi) {
+ bool useMultiUpdate) {
BatchedCommandRequest request([&] {
write_ops::Update updateOp(nss);
updateOp.setUpdates({[&] {
@@ -149,7 +149,7 @@ BatchedCommandRequest buildUpdateOp(const NamespaceString& nss,
entry.setQ(query);
entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(update));
entry.setUpsert(upsert);
- entry.setMulti(multi);
+ entry.setMulti(useMultiUpdate);
return entry;
}()});
return updateOp;
@@ -637,10 +637,15 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx,
collType.setEpoch(newEpoch);
collType.setKeyPattern(newShardKeyPattern.getKeyPattern());
- auto updateCollectionAndChunksFn = [&](OperationContext* opCtx, TxnNumber txnNumber) {
+ {
// Update the config.collections entry for the given namespace.
+ AlternativeSessionRegion asr(opCtx);
+ AuthorizationSession::get(asr.opCtx()->getClient())
+ ->grantInternalAuthorization(asr.opCtx()->getClient());
+ TxnNumber txnNumber = 0;
+
updateShardingCatalogEntryForCollectionInTxn(
- opCtx, nss, collType, false /* upsert */, txnNumber);
+ asr.opCtx(), nss, collType, false /* upsert */, true /* startTransaction */, txnNumber);
LOGV2(21933,
"refineCollectionShardKey updated collection entry for {namespace}: took "
@@ -662,15 +667,17 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx,
// to the newly-generated objectid, (ii) their bounds for each new field in the refined
// key to MinKey (except for the global max chunk where the max bounds are set to
// MaxKey), and unsetting (iii) their jumbo field.
- writeToConfigDocumentInTxn(opCtx,
- ChunkType::ConfigNS,
- buildPipelineUpdateOp(ChunkType::ConfigNS,
- BSON("ns" << nss.ns()),
- chunkUpdates,
- false, // upsert
- true // useMultiUpdate
- ),
- txnNumber);
+ uassertStatusOK(getStatusFromWriteCommandReply(
+ writeToConfigDocumentInTxn(asr.opCtx(),
+ ChunkType::ConfigNS,
+ buildPipelineUpdateOp(ChunkType::ConfigNS,
+ BSON("ns" << nss.ns()),
+ chunkUpdates,
+ false, // upsert
+ true // useMultiUpdate
+ ),
+ false, // startTransaction
+ txnNumber)));
LOGV2(21935,
"refineCollectionShardKey: updated chunk entries for {namespace}: took "
@@ -684,15 +691,17 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx,
// Update all config.tags entries for the given namespace by setting their bounds for
// each new field in the refined key to MinKey (except for the global max tag where the
// max bounds are set to MaxKey).
- writeToConfigDocumentInTxn(opCtx,
- TagsType::ConfigNS,
- buildPipelineUpdateOp(TagsType::ConfigNS,
- BSON("ns" << nss.ns()),
- tagUpdates,
- false, // upsert
- true // useMultiUpdate
- ),
- txnNumber);
+ uassertStatusOK(getStatusFromWriteCommandReply(
+ writeToConfigDocumentInTxn(asr.opCtx(),
+ TagsType::ConfigNS,
+ buildPipelineUpdateOp(TagsType::ConfigNS,
+ BSON("ns" << nss.ns()),
+ tagUpdates,
+ false, // upsert
+ true // useMultiUpdate
+ ),
+ false, // startTransaction
+ txnNumber)));
LOGV2(21936,
@@ -707,9 +716,10 @@ void ShardingCatalogManager::refineCollectionShardKey(OperationContext* opCtx,
LOGV2(21937, "Hit hangRefineCollectionShardKeyBeforeCommit failpoint");
hangRefineCollectionShardKeyBeforeCommit.pauseWhileSet(opCtx);
}
- };
- withTransaction(opCtx, nss, std::move(updateCollectionAndChunksFn));
+ // Note this will wait for majority write concern.
+ commitTxnForConfigDocument(asr.opCtx(), txnNumber);
+ }
ShardingLogging::get(opCtx)->logChange(opCtx,
"refineCollectionShardKey.end",
@@ -737,8 +747,9 @@ void ShardingCatalogManager::updateShardingCatalogEntryForCollectionInTxn(
const NamespaceString& nss,
const CollectionType& coll,
const bool upsert,
+ const bool startTransaction,
TxnNumber txnNumber) {
- try {
+ auto status = getStatusFromCommandResult(
writeToConfigDocumentInTxn(opCtx,
CollectionType::ConfigNS,
buildUpdateOp(CollectionType::ConfigNS,
@@ -747,11 +758,9 @@ void ShardingCatalogManager::updateShardingCatalogEntryForCollectionInTxn(
upsert,
false /* multi */
),
- txnNumber);
- } catch (DBException& e) {
- e.addContext("Collection metadata write failed");
- throw;
- }
+ startTransaction,
+ txnNumber));
+ uassertStatusOKWithContext(status, "Collection metadata write failed");
}
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 1bac5597a72..08f23a349e2 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -50,62 +50,46 @@
namespace mongo {
namespace {
-BatchedCommandRequest buildInsertOp(const NamespaceString& nss, std::vector<BSONObj> docs) {
- BatchedCommandRequest request([&] {
- write_ops::Insert insertOp(nss);
- insertOp.setDocuments(docs);
- return insertOp;
- }());
+template <typename Callable>
+void withAlternateSession(OperationContext* opCtx, Callable&& callable) {
+ AlternativeSessionRegion asr(opCtx);
+ AuthorizationSession::get(asr.opCtx()->getClient())
+ ->grantInternalAuthorization(asr.opCtx()->getClient());
+ TxnNumber txnNumber = 0;
+
+ auto guard = makeGuard([opCtx = asr.opCtx(), txnNumber] {
+ try {
+ ShardingCatalogManager::get(opCtx)->abortTxnForConfigDocument(opCtx, txnNumber);
+ } catch (const DBException& ex) {
+ LOGV2(5165900,
+ "Failed to abort transaction to resharding metadata",
+ "error"_attr = redact(ex));
+ }
+ });
- return request;
-}
+ callable(asr.opCtx(), txnNumber);
-BatchedCommandRequest buildDeleteOp(const NamespaceString& nss,
- const BSONObj& query,
- bool multiDelete) {
- BatchedCommandRequest request([&] {
- write_ops::Delete deleteOp(nss);
- deleteOp.setDeletes({[&] {
- write_ops::DeleteOpEntry entry;
- entry.setQ(query);
- entry.setMulti(multiDelete);
- return entry;
- }()});
- return deleteOp;
- }());
-
- return request;
+ guard.dismiss();
}
-BatchedCommandRequest buildUpdateOp(const NamespaceString& nss,
- const BSONObj& query,
- const BSONObj& update,
- bool upsert,
- bool multi) {
- BatchedCommandRequest request([&] {
- write_ops::Update updateOp(nss);
- updateOp.setUpdates({[&] {
- write_ops::UpdateOpEntry entry;
- entry.setQ(query);
- entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(update));
- entry.setUpsert(upsert);
- entry.setMulti(multi);
- return entry;
- }()});
- return updateOp;
- }());
+void writeConfigDocs(OperationContext* opCtx,
+ const NamespaceString& nss,
+ BatchedCommandRequest request,
+ bool startTransaction,
+ TxnNumber txnNumber,
+ boost::optional<int> expectedNumModified) {
+ auto response = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn(
+ opCtx, nss, request, startTransaction, txnNumber);
+ uassertStatusOK(getStatusFromCommandResult(response));
- return request;
-}
+ if (!expectedNumModified)
+ return;
-void assertNumDocsModifiedMatchesExpected(const BatchedCommandRequest& request,
- const BSONObj& response,
- int expected) {
- auto numDocsModified = response.getIntField("n");
- uassert(5030401,
- str::stream() << "Expected to match " << expected << " docs, but only matched "
- << numDocsModified << " for write request " << request.toString(),
- expected == numDocsModified);
+ uassert(5030400,
+ str::stream() << "Expected to match " << expectedNumModified
+ << " docs, but only matched " << response.getIntField("n")
+ << " for write request " << request.toString(),
+ response.getIntField("n") == expectedNumModified);
}
void writeToCoordinatorStateNss(OperationContext* opCtx,
@@ -138,15 +122,12 @@ void writeToCoordinatorStateNss(OperationContext* opCtx,
auto expectedNumModified = (request.getBatchType() == BatchedCommandRequest::BatchType_Insert)
? boost::none
: boost::make_optional(1);
- auto res = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn(
- opCtx,
- NamespaceString::kConfigReshardingOperationsNamespace,
- std::move(request),
- txnNumber);
-
- if (expectedNumModified) {
- assertNumDocsModifiedMatchesExpected(request, res, *expectedNumModified);
- }
+ writeConfigDocs(opCtx,
+ NamespaceString::kConfigReshardingOperationsNamespace,
+ std::move(request),
+ true, // startTransaction
+ txnNumber,
+ expectedNumModified);
}
BSONObj createReshardingFieldsUpdateForOriginalNss(
@@ -203,18 +184,18 @@ void updateConfigCollectionsForOriginalNss(OperationContext* opCtx,
auto writeOp =
createReshardingFieldsUpdateForOriginalNss(opCtx, coordinatorDoc, newCollectionEpoch);
- auto request =
+ writeConfigDocs(
+ opCtx,
+ CollectionType::ConfigNS,
buildUpdateOp(CollectionType::ConfigNS,
BSON(CollectionType::kNssFieldName << coordinatorDoc.getNss().ns()), // query
writeOp,
false, // upsert
false // multi
- );
-
- auto res = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn(
- opCtx, CollectionType::ConfigNS, request, txnNumber);
-
- assertNumDocsModifiedMatchesExpected(request, res, 1 /* expected */);
+ ),
+ false, // startTransaction
+ txnNumber,
+ 1); // expectedNumModified
}
void writeToConfigCollectionsForTempNss(OperationContext* opCtx,
@@ -274,13 +255,12 @@ void writeToConfigCollectionsForTempNss(OperationContext* opCtx,
auto expectedNumModified = (request.getBatchType() == BatchedCommandRequest::BatchType_Insert)
? boost::none
: boost::make_optional(1);
-
- auto res = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn(
- opCtx, CollectionType::ConfigNS, std::move(request), txnNumber);
-
- if (expectedNumModified) {
- assertNumDocsModifiedMatchesExpected(request, res, *expectedNumModified);
- }
+ writeConfigDocs(opCtx,
+ CollectionType::ConfigNS,
+ std::move(request),
+ false, // startTransaction
+ txnNumber,
+ expectedNumModified);
}
void insertChunkAndTagDocsForTempNss(OperationContext* opCtx,
@@ -294,8 +274,11 @@ void insertChunkAndTagDocsForTempNss(OperationContext* opCtx,
initialChunksBSON.begin(),
[](ChunkType chunk) { return chunk.toConfigBSON(); });
- ShardingCatalogManager::get(opCtx)->insertConfigDocumentsInTxn(
- opCtx, ChunkType::ConfigNS, std::move(initialChunksBSON), txnNumber);
+ ShardingCatalogManager::get(opCtx)->insertConfigDocumentsInTxn(opCtx,
+ ChunkType::ConfigNS,
+ std::move(initialChunksBSON),
+ false, // startTransaction
+ txnNumber);
// Insert tag documents for temp nss
std::vector<BSONObj> zonesBSON(newZones.size());
@@ -303,8 +286,11 @@ void insertChunkAndTagDocsForTempNss(OperationContext* opCtx,
return chunk.toBSON();
});
- ShardingCatalogManager::get(opCtx)->insertConfigDocumentsInTxn(
- opCtx, TagsType::ConfigNS, std::move(zonesBSON), txnNumber);
+ ShardingCatalogManager::get(opCtx)->insertConfigDocumentsInTxn(opCtx,
+ TagsType::ConfigNS,
+ std::move(zonesBSON),
+ false, // startTransaction
+ txnNumber);
}
void removeChunkAndTagsDocsForOriginalNss(OperationContext* opCtx,
@@ -312,25 +298,29 @@ void removeChunkAndTagsDocsForOriginalNss(OperationContext* opCtx,
TxnNumber txnNumber) {
// Remove all chunk documents for the original nss. We do not know how many chunk docs currently
// exist, so cannot pass a value for expectedNumModified
- ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn(
- opCtx,
- ChunkType::ConfigNS,
- buildDeleteOp(ChunkType::ConfigNS,
- BSON(ChunkType::ns(coordinatorDoc.getNss().ns())), // query
- true // multi
- ),
- txnNumber);
+ writeConfigDocs(opCtx,
+ ChunkType::ConfigNS,
+ buildDeleteOp(ChunkType::ConfigNS,
+ BSON(ChunkType::ns(coordinatorDoc.getNss().ns())), // query
+ true // multi
+ ),
+ false, // startTransaction
+ txnNumber,
+ boost::none // expectedNumModified
+ );
// Remove all tag documents for the original nss. We do not know how many tag docs currently
// exist, so cannot pass a value for expectedNumModified
- ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn(
- opCtx,
- TagsType::ConfigNS,
- buildDeleteOp(TagsType::ConfigNS,
- BSON(ChunkType::ns(coordinatorDoc.getNss().ns())), // query
- true // multi
- ),
- txnNumber);
+ writeConfigDocs(opCtx,
+ TagsType::ConfigNS,
+ buildDeleteOp(TagsType::ConfigNS,
+ BSON(ChunkType::ns(coordinatorDoc.getNss().ns())), // query
+ true // multi
+ ),
+ false, // startTransaction
+ txnNumber,
+ boost::none // expectedNumModified
+ );
}
void updateChunkAndTagsDocsForTempNss(OperationContext* opCtx,
@@ -342,38 +332,34 @@ void updateChunkAndTagsDocsForTempNss(OperationContext* opCtx,
// Update all chunk documents that currently have 'ns' as the temporary collection namespace
// such that 'ns' is now the original collection namespace and 'lastmodEpoch' is
// newCollectionEpoch.
- auto chunksRequest =
+ writeConfigDocs(
+ opCtx,
+ ChunkType::ConfigNS,
buildUpdateOp(ChunkType::ConfigNS,
BSON(ChunkType::ns(coordinatorDoc.getTempReshardingNss().ns())), // query
BSON("$set" << BSON("ns" << coordinatorDoc.getNss().ns() << "lastmodEpoch"
<< newCollectionEpoch)), // update
false, // upsert
true // multi
- );
-
- auto chunksRes = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn(
- opCtx, ChunkType::ConfigNS, chunksRequest, txnNumber);
-
- if (expectedNumChunksModified) {
- assertNumDocsModifiedMatchesExpected(chunksRequest, chunksRes, *expectedNumChunksModified);
- }
+ ),
+ false, // startTransaction
+ txnNumber,
+ expectedNumChunksModified);
- auto tagsRequest =
+ // Update the 'ns' field to be the original collection namespace for all tags documents that
+ // currently have 'ns' as the temporary collection namespace
+ writeConfigDocs(
+ opCtx,
+ TagsType::ConfigNS,
buildUpdateOp(TagsType::ConfigNS,
BSON(TagsType::ns(coordinatorDoc.getTempReshardingNss().ns())), // query
BSON("$set" << BSON("ns" << coordinatorDoc.getNss().ns())), // update
false, // upsert
true // multi
- );
-
- // Update the 'ns' field to be the original collection namespace for all tags documents that
- // currently have 'ns' as the temporary collection namespace
- auto tagsRes = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn(
- opCtx, TagsType::ConfigNS, tagsRequest, txnNumber);
-
- if (expectedNumZonesModified) {
- assertNumDocsModifiedMatchesExpected(tagsRequest, tagsRes, *expectedNumZonesModified);
- }
+ ),
+ false, // startTransaction
+ txnNumber,
+ expectedNumZonesModified);
}
/**
@@ -388,95 +374,6 @@ std::vector<ShardId> extractShardIds(const std::vector<T>& participantShardEntri
[](auto& shardEntry) { return shardEntry.getId(); });
return shardIds;
}
-
-//
-// Helper methods for ensuring donors/ recipients are able to notice when certain state transitions
-// occur.
-//
-// Donors/ recipients learn when to transition states by noticing a change in shard versions for one
-// of the two collections involved in the resharding operations.
-//
-// Donors are notified when shard versions spanning the original resharding collection are
-// incremented. Recipients are notified when shard versions spanning the temporary resharding
-// collection are incremented.
-//
-
-/**
- * Maps which participants are to be notified when the coordinator transitions into a given state.
- */
-enum class ParticipantsToNofityEnum { kDonors, kRecipients, kNone };
-stdx::unordered_map<CoordinatorStateEnum, ParticipantsToNofityEnum> notifyForStateTransition{
- {CoordinatorStateEnum::kUnused, ParticipantsToNofityEnum::kNone},
- {CoordinatorStateEnum::kInitializing, ParticipantsToNofityEnum::kNone},
- {CoordinatorStateEnum::kPreparingToDonate, ParticipantsToNofityEnum::kDonors},
- {CoordinatorStateEnum::kCloning, ParticipantsToNofityEnum::kRecipients},
- {CoordinatorStateEnum::kMirroring, ParticipantsToNofityEnum::kDonors},
- {CoordinatorStateEnum::kCommitted, ParticipantsToNofityEnum::kNone},
- {CoordinatorStateEnum::kRenaming, ParticipantsToNofityEnum::kRecipients},
- {CoordinatorStateEnum::kDropping, ParticipantsToNofityEnum::kDonors},
- {CoordinatorStateEnum::kDone, ParticipantsToNofityEnum::kNone},
- {CoordinatorStateEnum::kError, ParticipantsToNofityEnum::kNone},
-};
-
-/**
- * Runs resharding metadata changes in a transaction.
- *
- * This function should only be called if donor and recipient shards DO NOT need to be informed of
- * the updatedCoordinatorDoc's state transition. If donor or recipient shards need to be informed,
- * instead call bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn().
- */
-void executeStateTransitionAndMetadataChangesInTxn(
- OperationContext* opCtx,
- const ReshardingCoordinatorDocument& updatedCoordinatorDoc,
- unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc) {
- const auto& state = updatedCoordinatorDoc.getState();
- invariant(notifyForStateTransition.find(state) != notifyForStateTransition.end());
- invariant(notifyForStateTransition[state] == ParticipantsToNofityEnum::kNone);
-
- // Neither donors nor recipients need to be informed of the transition to
- // updatedCoordinatorDoc's state.
- ShardingCatalogManager::withTransaction(opCtx,
- NamespaceString::kConfigReshardingOperationsNamespace,
- [&](OperationContext* opCtx, TxnNumber txnNumber) {
- changeMetadataFunc(opCtx, txnNumber);
- });
-}
-
-/**
- * In a single transaction, bumps the shard version for each shard spanning the corresponding
- * resharding collection and executes changeMetadataFunc.
- *
- * This function should only be called if donor or recipient shards need to be informed of the
- * updatedCoordinatorDoc's state transition. If donor or recipient shards do not need to be
- * informed, instead call executeStateTransitionAndMetadataChangesInTxn().
- */
-void bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn(
- OperationContext* opCtx,
- const ReshardingCoordinatorDocument& updatedCoordinatorDoc,
- unique_function<void(OperationContext*, TxnNumber)> changeMetadataFunc) {
- const auto& state = updatedCoordinatorDoc.getState();
- invariant(notifyForStateTransition.find(state) != notifyForStateTransition.end());
- invariant(notifyForStateTransition[state] != ParticipantsToNofityEnum::kNone);
-
- auto participantsToNotify = notifyForStateTransition[state];
- if (participantsToNotify == ParticipantsToNofityEnum::kDonors) {
- // Bump the donor shard versions for the original namespace along with updating the
- // metadata.
- ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn(
- opCtx,
- updatedCoordinatorDoc.getNss(),
- extractShardIds(updatedCoordinatorDoc.getDonorShards()),
- std::move(changeMetadataFunc));
- } else if (participantsToNotify == ParticipantsToNofityEnum::kRecipients) {
- // Bump the recipient shard versions for the original namespace along with updating the
- // metadata.
- ShardingCatalogManager::get(opCtx)->bumpCollShardVersionsAndChangeMetadataInTxn(
- opCtx,
- updatedCoordinatorDoc.getTempReshardingNss(),
- extractShardIds(updatedCoordinatorDoc.getRecipientShards()),
- std::move(changeMetadataFunc));
- }
-}
} // namespace
namespace resharding {
@@ -516,25 +413,27 @@ void persistInitialStateAndCatalogUpdates(OperationContext* opCtx,
opCtx, coordinatorDoc.getNss(), repl::ReadConcernLevel::kMajorityReadConcern));
const auto collation = originalCollType.value.getDefaultCollation();
- bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn(
- opCtx, coordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) {
- // Insert state doc to config.reshardingOperations.
- writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
+ withAlternateSession(opCtx, [=](OperationContext* opCtx, TxnNumber txnNumber) {
+ // Insert state doc to config.reshardingOperations
+ writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
+
+ // Update the config.collections entry for the original collection to include
+ // 'reshardingFields'
+ updateConfigCollectionsForOriginalNss(opCtx, coordinatorDoc, boost::none, txnNumber);
- // Update the config.collections entry for the original collection to include
- // 'reshardingFields'
- updateConfigCollectionsForOriginalNss(opCtx, coordinatorDoc, boost::none, txnNumber);
+ // Insert the config.collections entry for the temporary resharding collection. The chunks
+ // all have the same epoch, so picking the last chunk here is arbitrary.
+ auto chunkVersion = initialChunks.back().getVersion();
+ writeToConfigCollectionsForTempNss(
+ opCtx, coordinatorDoc, chunkVersion, collation, txnNumber);
- // Insert the config.collections entry for the temporary resharding collection. The
- // chunks all have the same epoch, so picking the last chunk here is arbitrary.
- auto chunkVersion = initialChunks.back().getVersion();
- writeToConfigCollectionsForTempNss(
- opCtx, coordinatorDoc, chunkVersion, collation, txnNumber);
+ // Insert new initial chunk and tag documents
+ insertChunkAndTagDocsForTempNss(
+ opCtx, std::move(initialChunks), std::move(newZones), txnNumber);
- // Insert new initial chunk and tag documents.
- insertChunkAndTagDocsForTempNss(
- opCtx, std::move(initialChunks), std::move(newZones), txnNumber);
- });
+ // Commit the transaction
+ ShardingCatalogManager::get(opCtx)->commitTxnForConfigDocument(opCtx, txnNumber);
+ });
}
void persistCommittedState(OperationContext* opCtx,
@@ -542,44 +441,39 @@ void persistCommittedState(OperationContext* opCtx,
OID newCollectionEpoch,
boost::optional<int> expectedNumChunksModified,
boost::optional<int> expectedNumZonesModified) {
- executeStateTransitionAndMetadataChangesInTxn(
- opCtx, coordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) {
- // Update the config.reshardingOperations entry
- writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
-
- // Remove the config.collections entry for the temporary collection
- writeToConfigCollectionsForTempNss(
- opCtx, coordinatorDoc, boost::none, boost::none, txnNumber);
+ withAlternateSession(opCtx, [=](OperationContext* opCtx, TxnNumber txnNumber) {
+ // Update the config.reshardingOperations entry
+ writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
- // Update the config.collections entry for the original namespace to reflect the new
- // shard key, new epoch, and new UUID
- updateConfigCollectionsForOriginalNss(
- opCtx, coordinatorDoc, newCollectionEpoch, txnNumber);
-
- // Remove all chunk and tag documents associated with the original collection, then
- // update the chunk and tag docs currently associated with the temp nss to be associated
- // with the original nss
- removeChunkAndTagsDocsForOriginalNss(opCtx, coordinatorDoc, txnNumber);
- updateChunkAndTagsDocsForTempNss(opCtx,
- coordinatorDoc,
- newCollectionEpoch,
- expectedNumChunksModified,
- expectedNumZonesModified,
- txnNumber);
- });
+ // Remove the config.collections entry for the temporary collection
+ writeToConfigCollectionsForTempNss(
+ opCtx, coordinatorDoc, boost::none, boost::none, txnNumber);
+
+ // Update the config.collections entry for the original namespace to reflect the new shard
+ // key, new epoch, and new UUID
+ updateConfigCollectionsForOriginalNss(opCtx, coordinatorDoc, newCollectionEpoch, txnNumber);
+
+ // Remove all chunk and tag documents associated with the original collection, then update
+ // the chunk and tag docs currently associated with the temp nss to be associated with the
+ // original nss
+ removeChunkAndTagsDocsForOriginalNss(opCtx, coordinatorDoc, txnNumber);
+ updateChunkAndTagsDocsForTempNss(opCtx,
+ coordinatorDoc,
+ newCollectionEpoch,
+ expectedNumChunksModified,
+ expectedNumZonesModified,
+ txnNumber);
+
+ // Commit the transaction
+ ShardingCatalogManager::get(opCtx)->commitTxnForConfigDocument(opCtx, txnNumber);
+ });
}
-void persistStateTransitionAndCatalogUpdatesThenBumpShardVersions(
- OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc) {
+void persistStateTransition(OperationContext* opCtx,
+ const ReshardingCoordinatorDocument& coordinatorDoc) {
// Run updates to config.reshardingOperations and config.collections in a transaction
auto nextState = coordinatorDoc.getState();
- invariant(notifyForStateTransition.find(nextState) != notifyForStateTransition.end());
- // TODO SERVER-51800 Remove special casing for kError.
- invariant(nextState == CoordinatorStateEnum::kError ||
- notifyForStateTransition[nextState] != ParticipantsToNofityEnum::kNone);
-
- // Resharding metadata changes to be executed.
- auto changeMetadataFunc = [&](OperationContext* opCtx, TxnNumber txnNumber) {
+ withAlternateSession(opCtx, [=](OperationContext* opCtx, TxnNumber txnNumber) {
// Update the config.reshardingOperations entry
writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
@@ -588,36 +482,31 @@ void persistStateTransitionAndCatalogUpdatesThenBumpShardVersions(
// Update the config.collections entry for the temporary resharding collection. If we've
// already committed this operation, we've removed the entry for the temporary
- // collection and updated the entry with original namespace to have the new shard key,
- // UUID, and epoch
+ // collection and updated the entry with original namespace to have the new shard key, UUID,
+ // and epoch
if (nextState < CoordinatorStateEnum::kCommitted ||
nextState == CoordinatorStateEnum::kError) {
writeToConfigCollectionsForTempNss(
opCtx, coordinatorDoc, boost::none, boost::none, txnNumber);
}
- };
-
- // TODO SERVER-51800 Remove special casing for kError.
- if (nextState == CoordinatorStateEnum::kError) {
- executeStateTransitionAndMetadataChangesInTxn(
- opCtx, coordinatorDoc, std::move(changeMetadataFunc));
- return;
- }
- bumpShardVersionsThenExecuteStateTransitionAndMetadataChangesInTxn(
- opCtx, coordinatorDoc, std::move(changeMetadataFunc));
+ // Commit the transaction
+ ShardingCatalogManager::get(opCtx)->commitTxnForConfigDocument(opCtx, txnNumber);
+ });
}
void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx,
const ReshardingCoordinatorDocument& coordinatorDoc) {
- executeStateTransitionAndMetadataChangesInTxn(
- opCtx, coordinatorDoc, [&](OperationContext* opCtx, TxnNumber txnNumber) {
- // Remove entry for this resharding operation from config.reshardingOperations
- writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
+ withAlternateSession(opCtx, [=](OperationContext* opCtx, TxnNumber txnNumber) {
+ // Remove entry for this resharding operation from config.reshardingOperations
+ writeToCoordinatorStateNss(opCtx, coordinatorDoc, txnNumber);
- // Remove the resharding fields from the config.collections entry
- updateConfigCollectionsForOriginalNss(opCtx, coordinatorDoc, boost::none, txnNumber);
- });
+ // Remove the resharding fields from the config.collections entry
+ updateConfigCollectionsForOriginalNss(opCtx, coordinatorDoc, boost::none, txnNumber);
+
+ // Commit the transaction
+ ShardingCatalogManager::get(opCtx)->commitTxnForConfigDocument(opCtx, txnNumber);
+ });
}
} // namespace resharding
@@ -652,19 +541,17 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run(
.then([this, executor] { return _awaitAllRecipientsFinishedApplying(executor); })
.then([this, executor] { _tellAllDonorsToRefresh(executor); })
.then([this, executor] { return _awaitAllRecipientsInStrictConsistency(executor); })
- .then([this](const ReshardingCoordinatorDocument& updatedCoordinatorDoc) {
- return _commit(updatedCoordinatorDoc);
+ .then([this](const ReshardingCoordinatorDocument& updatedStateDoc) {
+ return _commit(updatedStateDoc);
})
.then([this] {
if (_coordinatorDoc.getState() > CoordinatorStateEnum::kRenaming) {
return;
}
- _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kRenaming,
- _coordinatorDoc);
+ this->_runUpdates(CoordinatorStateEnum::kRenaming, _coordinatorDoc);
return;
})
- .then([this, executor] { _tellAllRecipientsToRefresh(executor); })
.then([this, executor] { return _awaitAllRecipientsRenamedCollection(executor); })
.then([this, executor] { _tellAllDonorsToRefresh(executor); })
.then([this, executor] { return _awaitAllDonorsDroppedOriginalCollection(executor); })
@@ -677,8 +564,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run(
return status;
}
- _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kError,
- _coordinatorDoc);
+ _runUpdates(CoordinatorStateEnum::kError, _coordinatorDoc);
LOGV2(4956902,
"Resharding failed",
@@ -776,12 +662,11 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonat
return _reshardingCoordinatorObserver->awaitAllDonorsReadyToDonate()
.thenRunOn(**executor)
- .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) {
+ .then([this](ReshardingCoordinatorDocument updatedStateDoc) {
auto highestMinFetchTimestamp =
- getHighestMinFetchTimestamp(coordinatorDocChangedOnDisk.getDonorShards());
- _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kCloning,
- coordinatorDocChangedOnDisk,
- highestMinFetchTimestamp);
+ getHighestMinFetchTimestamp(updatedStateDoc.getDonorShards());
+ this->_runUpdates(
+ CoordinatorStateEnum::kCloning, updatedStateDoc, highestMinFetchTimestamp);
});
}
@@ -794,9 +679,8 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished
return _reshardingCoordinatorObserver->awaitAllRecipientsFinishedCloning()
.thenRunOn(**executor)
- .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) {
- this->_updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kApplying,
- coordinatorDocChangedOnDisk);
+ .then([this](ReshardingCoordinatorDocument updatedStateDoc) {
+ this->_runUpdates(CoordinatorStateEnum::kApplying, updatedStateDoc);
});
}
@@ -809,9 +693,8 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished
return _reshardingCoordinatorObserver->awaitAllRecipientsFinishedApplying()
.thenRunOn(**executor)
- .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) {
- this->_updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kMirroring,
- coordinatorDocChangedOnDisk);
+ .then([this](ReshardingCoordinatorDocument updatedStateDoc) {
+ this->_runUpdates(CoordinatorStateEnum::kMirroring, updatedStateDoc);
});
}
@@ -827,12 +710,12 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsInStrict
}
Future<void> ReshardingCoordinatorService::ReshardingCoordinator::_commit(
- const ReshardingCoordinatorDocument& coordinatorDoc) {
+ const ReshardingCoordinatorDocument& updatedDoc) {
if (_coordinatorDoc.getState() > CoordinatorStateEnum::kMirroring) {
return Status::OK();
}
- ReshardingCoordinatorDocument updatedCoordinatorDoc = coordinatorDoc;
+ ReshardingCoordinatorDocument updatedCoordinatorDoc = updatedDoc;
updatedCoordinatorDoc.setState(CoordinatorStateEnum::kCommitted);
// Get the number of initial chunks and new zones that we inserted during initialization in
@@ -876,9 +759,8 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsRenamedC
return _reshardingCoordinatorObserver->awaitAllRecipientsRenamedCollection()
.thenRunOn(**executor)
- .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) {
- _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kDropping,
- coordinatorDocChangedOnDisk);
+ .then([this](ReshardingCoordinatorDocument updatedStateDoc) {
+ this->_runUpdates(CoordinatorStateEnum::kDropping, updatedStateDoc);
});
}
@@ -891,24 +773,22 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsDroppedOrigi
return _reshardingCoordinatorObserver->awaitAllDonorsDroppedOriginalCollection()
.thenRunOn(**executor)
- .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) {
- _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum::kDone,
- coordinatorDocChangedOnDisk);
+ .then([this](ReshardingCoordinatorDocument updatedStateDoc) {
+ this->_runUpdates(CoordinatorStateEnum::kDone, updatedStateDoc);
});
}
-void ReshardingCoordinatorService::ReshardingCoordinator::
- _updateCoordinatorDocStateAndCatalogEntries(CoordinatorStateEnum nextState,
- ReshardingCoordinatorDocument coordinatorDoc,
- boost::optional<Timestamp> fetchTimestamp) {
+void ReshardingCoordinatorService::ReshardingCoordinator::_runUpdates(
+ CoordinatorStateEnum nextState,
+ ReshardingCoordinatorDocument updatedStateDoc,
+ boost::optional<Timestamp> fetchTimestamp) {
// Build new state doc for coordinator state update
- ReshardingCoordinatorDocument updatedCoordinatorDoc = coordinatorDoc;
+ ReshardingCoordinatorDocument updatedCoordinatorDoc = updatedStateDoc;
updatedCoordinatorDoc.setState(nextState);
emplaceFetchTimestampIfExists(updatedCoordinatorDoc, std::move(fetchTimestamp));
auto opCtx = cc().makeOperationContext();
- resharding::persistStateTransitionAndCatalogUpdatesThenBumpShardVersions(opCtx.get(),
- updatedCoordinatorDoc);
+ resharding::persistStateTransition(opCtx.get(), updatedCoordinatorDoc);
// Update in-memory coordinator doc
_coordinatorDoc = updatedCoordinatorDoc;
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h
index adb161a710c..cfcfb5badc8 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h
@@ -59,8 +59,8 @@ void persistCommittedState(OperationContext* opCtx,
boost::optional<int> expectedNumChunksModified,
boost::optional<int> expectedNumZonesModified);
-void persistStateTransitionAndCatalogUpdatesThenBumpShardVersions(
- OperationContext* opCtx, const ReshardingCoordinatorDocument& coordinatorDoc);
+void persistStateTransition(OperationContext* opCtx,
+ const ReshardingCoordinatorDocument& coordinatorDoc);
void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx,
const ReshardingCoordinatorDocument& coordinatorDoc);
@@ -201,10 +201,9 @@ private:
* Updates the entry for this resharding operation in config.reshardingOperations and the
* catalog entries for the original and temporary namespaces in config.collections.
*/
- void _updateCoordinatorDocStateAndCatalogEntries(
- CoordinatorStateEnum nextState,
- ReshardingCoordinatorDocument coordinatorDoc,
- boost::optional<Timestamp> fetchTimestamp = boost::none);
+ void _runUpdates(CoordinatorStateEnum nextState,
+ ReshardingCoordinatorDocument updatedStateDoc,
+ boost::optional<Timestamp> fetchTimestamp = boost::none);
/**
* Sends 'flushRoutingTableCacheUpdatesWithWriteConcern' for the temporary namespace to all
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
index 0b423a7dbf2..4ee4d193ff1 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
@@ -27,8 +27,6 @@
* it in the license file.
*/
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
-
#include "mongo/platform/basic.h"
#include <boost/optional.hpp>
@@ -43,7 +41,6 @@
#include "mongo/db/s/resharding_util.h"
#include "mongo/db/s/transaction_coordinator_service.h"
#include "mongo/db/session_catalog_mongod.h"
-#include "mongo/logv2/log.h"
#include "mongo/s/catalog/type_collection.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/unittest/unittest.h"
@@ -121,34 +118,6 @@ protected:
return collType;
}
- // Returns the chunk for the donor shard.
- ChunkType makeAndInsertChunksForDonorShard(const NamespaceString& nss,
- OID epoch,
- const ShardKeyPattern& shardKey,
- std::vector<OID> ids) {
- auto chunks = makeChunks(nss, epoch, shardKey, ids);
-
- // Only the chunk corresponding to shard0000 is stored as a donor in the coordinator state
- // document constructed.
- auto donorChunk = chunks[0];
- insertChunkAndZoneEntries({donorChunk}, {});
- return donorChunk;
- }
-
- // Returns the chunk for the recipient shard.
- ChunkType makeAndInsertChunksForRecipientShard(const NamespaceString& nss,
- OID epoch,
- const ShardKeyPattern& shardKey,
- std::vector<OID> ids) {
- auto chunks = makeChunks(nss, epoch, shardKey, ids);
-
- // Only the chunk corresponding to shard0001 is stored as a recipient in the coordinator
- // state document constructed.
- auto recipientChunk = chunks[1];
- insertChunkAndZoneEntries({recipientChunk}, {});
- return recipientChunk;
- }
-
std::vector<ChunkType> makeChunks(const NamespaceString& nss,
OID epoch,
const ShardKeyPattern& shardKey,
@@ -477,8 +446,7 @@ protected:
void persistStateTransitionUpdateExpectSuccess(
OperationContext* opCtx, ReshardingCoordinatorDocument expectedCoordinatorDoc) {
- resharding::persistStateTransitionAndCatalogUpdatesThenBumpShardVersions(
- opCtx, expectedCoordinatorDoc);
+ resharding::persistStateTransition(opCtx, expectedCoordinatorDoc);
// Check that config.reshardingOperations and config.collections entries are updated
// correctly
@@ -535,22 +503,6 @@ protected:
opCtx, collType, true);
}
- void assertChunkVersionDidNotIncreaseAfterStateTransition(
- const ChunkType& chunk, const ChunkVersion& collectionVersion) {
- auto chunkAfterTransition = getChunkDoc(operationContext(), chunk.getMin());
- ASSERT_EQ(chunkAfterTransition.getStatus(), Status::OK());
- ASSERT_EQ(chunkAfterTransition.getValue().getVersion().majorVersion(),
- collectionVersion.majorVersion());
- }
-
- void assertChunkVersionIncreasedAfterStateTransition(const ChunkType& chunk,
- const ChunkVersion& collectionVersion) {
- auto chunkAfterTransition = getChunkDoc(operationContext(), chunk.getMin());
- ASSERT_EQ(chunkAfterTransition.getStatus(), Status::OK());
- ASSERT_EQ(chunkAfterTransition.getValue().getVersion().majorVersion(),
- collectionVersion.majorVersion() + 1);
- }
-
NamespaceString _originalNss = NamespaceString("db.foo");
UUID _originalUUID = UUID::gen();
OID _originalEpoch = OID::gen();
@@ -576,13 +528,6 @@ protected:
TEST_F(ReshardingCoordinatorPersistenceTest, PersistInitialInfoSucceeds) {
auto coordinatorDoc = makeCoordinatorDoc(CoordinatorStateEnum::kInitializing);
-
- // Ensure the chunks for the original namespace exist since they will be bumped as a product of
- // the state transition to kPreparingToDonate.
- auto donorChunk = makeAndInsertChunksForDonorShard(
- _originalNss, _originalEpoch, _oldShardKey, std::vector{OID::gen(), OID::gen()});
- auto collectionVersion = donorChunk.getVersion();
-
auto initialChunks =
makeChunks(_tempNss, _tempEpoch, _newShardKey, std::vector{OID::gen(), OID::gen()});
auto newZones = makeZones(_tempNss, _newShardKey);
@@ -593,47 +538,28 @@ TEST_F(ReshardingCoordinatorPersistenceTest, PersistInitialInfoSucceeds) {
persistInitialStateAndCatalogUpdatesExpectSuccess(
operationContext(), expectedCoordinatorDoc, initialChunks, newZones);
-
- // Confirm the shard version was increased for the donor shard.
- auto donorChunkPostTransition = getChunkDoc(operationContext(), donorChunk.getMin());
- ASSERT_EQ(donorChunkPostTransition.getStatus(), Status::OK());
- ASSERT_EQ(donorChunkPostTransition.getValue().getVersion().majorVersion(),
- collectionVersion.majorVersion() + 1);
}
TEST_F(ReshardingCoordinatorPersistenceTest, PersistBasicStateTransitionSucceeds) {
auto coordinatorDoc =
insertStateAndCatalogEntries(CoordinatorStateEnum::kCloning, _originalEpoch);
- // Ensure the chunks for the original namespace exist since they will be bumped as a product of
- // the state transition to kPreparingToDonate.
- auto donorChunk = makeAndInsertChunksForDonorShard(
- _originalNss, _originalEpoch, _oldShardKey, std::vector{OID::gen(), OID::gen()});
- auto collectionVersion = donorChunk.getVersion();
-
// Persist the updates on disk
auto expectedCoordinatorDoc = coordinatorDoc;
expectedCoordinatorDoc.setState(CoordinatorStateEnum::kMirroring);
persistStateTransitionUpdateExpectSuccess(operationContext(), expectedCoordinatorDoc);
- assertChunkVersionIncreasedAfterStateTransition(donorChunk, collectionVersion);
}
TEST_F(ReshardingCoordinatorPersistenceTest, PersistFetchTimestampStateTransitionSucceeds) {
auto coordinatorDoc =
insertStateAndCatalogEntries(CoordinatorStateEnum::kPreparingToDonate, _originalEpoch);
- auto recipientChunk = makeAndInsertChunksForRecipientShard(
- _tempNss, _tempEpoch, _newShardKey, std::vector{OID::gen(), OID::gen()});
- auto collectionVersion = recipientChunk.getVersion();
-
// Persist the updates on disk
auto expectedCoordinatorDoc = coordinatorDoc;
expectedCoordinatorDoc.setState(CoordinatorStateEnum::kCloning);
emplaceFetchTimestampIfExists(expectedCoordinatorDoc, Timestamp(1, 1));
-
persistStateTransitionUpdateExpectSuccess(operationContext(), expectedCoordinatorDoc);
- assertChunkVersionIncreasedAfterStateTransition(recipientChunk, collectionVersion);
}
TEST_F(ReshardingCoordinatorPersistenceTest, PersistCommitSucceeds) {
@@ -641,11 +567,8 @@ TEST_F(ReshardingCoordinatorPersistenceTest, PersistCommitSucceeds) {
auto coordinatorDoc = insertStateAndCatalogEntries(
CoordinatorStateEnum::kMirroring, _originalEpoch, fetchTimestamp);
auto initialChunksIds = std::vector{OID::gen(), OID::gen()};
-
- auto tempNssChunks = makeChunks(_tempNss, _tempEpoch, _newShardKey, initialChunksIds);
- auto recipientChunk = tempNssChunks[1];
- insertChunkAndZoneEntries(tempNssChunks, makeZones(_tempNss, _newShardKey));
-
+ insertChunkAndZoneEntries(makeChunks(_tempNss, _tempEpoch, _newShardKey, initialChunksIds),
+ makeZones(_tempNss, _newShardKey));
insertChunkAndZoneEntries(
makeChunks(_originalNss, OID::gen(), _oldShardKey, std::vector{OID::gen(), OID::gen()}),
makeZones(_originalNss, _oldShardKey));
@@ -661,9 +584,6 @@ TEST_F(ReshardingCoordinatorPersistenceTest, PersistCommitSucceeds) {
persistCommittedStateExpectSuccess(
operationContext(), expectedCoordinatorDoc, fetchTimestamp, updatedChunks, updatedZones);
-
- assertChunkVersionDidNotIncreaseAfterStateTransition(recipientChunk,
- recipientChunk.getVersion());
}
TEST_F(ReshardingCoordinatorPersistenceTest, PersistTransitionToErrorSucceeds) {
@@ -694,10 +614,9 @@ TEST_F(ReshardingCoordinatorPersistenceTest,
// Do not insert initial entry into config.reshardingOperations. Attempt to update coordinator
// state documents.
auto coordinatorDoc = makeCoordinatorDoc(CoordinatorStateEnum::kCloning, Timestamp(1, 1));
- ASSERT_THROWS_CODE(resharding::persistStateTransitionAndCatalogUpdatesThenBumpShardVersions(
- operationContext(), coordinatorDoc),
+ ASSERT_THROWS_CODE(resharding::persistStateTransition(operationContext(), coordinatorDoc),
AssertionException,
- 50577);
+ 5030400);
}
TEST_F(ReshardingCoordinatorPersistenceTest, PersistCommitDoesNotMatchChunksFails) {
@@ -726,7 +645,7 @@ TEST_F(ReshardingCoordinatorPersistenceTest, PersistCommitDoesNotMatchChunksFail
resharding::persistCommittedState(
operationContext(), expectedCoordinatorDoc, _finalEpoch, updatedChunks.size(), 0),
AssertionException,
- 5030401);
+ 5030400);
}
TEST_F(ReshardingCoordinatorPersistenceTest,
diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp
index d7b20524d75..1c8eeab3a90 100644
--- a/src/mongo/db/s/resharding_util.cpp
+++ b/src/mongo/db/s/resharding_util.cpp
@@ -49,7 +49,6 @@
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/pipeline/document_source_unwind.h"
#include "mongo/db/s/collection_sharding_state.h"
-#include "mongo/db/s/config/sharding_catalog_manager.h"
#include "mongo/db/storage/write_unit_of_work.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -152,6 +151,54 @@ NamespaceString constructTemporaryReshardingNss(StringData db, const UUID& sourc
sourceUuid.toString()));
}
+BatchedCommandRequest buildInsertOp(const NamespaceString& nss, std::vector<BSONObj> docs) {
+ BatchedCommandRequest request([&] {
+ write_ops::Insert insertOp(nss);
+ insertOp.setDocuments(docs);
+ return insertOp;
+ }());
+
+ return request;
+}
+
+BatchedCommandRequest buildUpdateOp(const NamespaceString& nss,
+ const BSONObj& query,
+ const BSONObj& update,
+ bool upsert,
+ bool multi) {
+ BatchedCommandRequest request([&] {
+ write_ops::Update updateOp(nss);
+ updateOp.setUpdates({[&] {
+ write_ops::UpdateOpEntry entry;
+ entry.setQ(query);
+ entry.setU(write_ops::UpdateModification::parseFromClassicUpdate(update));
+ entry.setUpsert(upsert);
+ entry.setMulti(multi);
+ return entry;
+ }()});
+ return updateOp;
+ }());
+
+ return request;
+}
+
+BatchedCommandRequest buildDeleteOp(const NamespaceString& nss,
+ const BSONObj& query,
+ bool multiDelete) {
+ BatchedCommandRequest request([&] {
+ write_ops::Delete deleteOp(nss);
+ deleteOp.setDeletes({[&] {
+ write_ops::DeleteOpEntry entry;
+ entry.setQ(query);
+ entry.setMulti(multiDelete);
+ return entry;
+ }()});
+ return deleteOp;
+ }());
+
+ return request;
+}
+
void tellShardsToRefresh(OperationContext* opCtx,
const std::vector<ShardId>& shardIds,
const NamespaceString& nss,
diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h
index bec052dc1e0..33c491a4d23 100644
--- a/src/mongo/db/s/resharding_util.h
+++ b/src/mongo/db/s/resharding_util.h
@@ -102,6 +102,27 @@ UUID getCollectionUUIDFromChunkManger(const NamespaceString& nss, const ChunkMan
NamespaceString constructTemporaryReshardingNss(StringData db, const UUID& sourceUuid);
/**
+ * Constructs a BatchedCommandRequest with batch type 'Insert'.
+ */
+BatchedCommandRequest buildInsertOp(const NamespaceString& nss, std::vector<BSONObj> docs);
+
+/**
+ * Constructs a BatchedCommandRequest with batch type 'Update'.
+ */
+BatchedCommandRequest buildUpdateOp(const NamespaceString& nss,
+ const BSONObj& query,
+ const BSONObj& update,
+ bool upsert,
+ bool multi);
+
+/**
+ * Constructs a BatchedCommandRequest with batch type 'Delete'.
+ */
+BatchedCommandRequest buildDeleteOp(const NamespaceString& nss,
+ const BSONObj& query,
+ bool multiDelete);
+
+/**
* Sends _flushRoutingTableCacheUpdatesWithWriteConcern to a list of shards. Throws if one of the
* shards fails to refresh.
*/