summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBlake Oler <blake.oler@mongodb.com>2020-10-07 23:09:20 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-10-15 21:19:28 +0000
commit235371574161936f50f712de54bad7c806f4cbd6 (patch)
treee6e90ab204bd04a54c633f7bde5b611e44393525
parentb370bdde808ed5791fe37c3d760a9f356cef8dc5 (diff)
downloadmongo-235371574161936f50f712de54bad7c806f4cbd6.tar.gz
SERVER-51212 Handle resharding fields from shard version mismatch function
-rw-r--r--src/mongo/db/mongod_main.cpp10
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp5
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp16
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_test.cpp17
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp184
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common.h80
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp374
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.h5
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp10
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h5
-rw-r--r--src/mongo/db/s/resharding_util.cpp25
-rw-r--r--src/mongo/db/s/resharding_util.h31
-rw-r--r--src/mongo/db/s/shard_filtering_metadata_refresh.cpp11
14 files changed, 708 insertions, 67 deletions
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp
index 88dde2eb787..ce8b7f477e7 100644
--- a/src/mongo/db/mongod_main.cpp
+++ b/src/mongo/db/mongod_main.cpp
@@ -137,7 +137,9 @@
#include "mongo/db/s/op_observer_sharding_impl.h"
#include "mongo/db/s/periodic_sharded_index_consistency_checker.h"
#include "mongo/db/s/resharding/resharding_coordinator_service.h"
+#include "mongo/db/s/resharding/resharding_donor_service.h"
#include "mongo/db/s/resharding/resharding_op_observer.h"
+#include "mongo/db/s/resharding/resharding_recipient_service.h"
#include "mongo/db/s/shard_server_op_observer.h"
#include "mongo/db/s/sharding_initialization_mongod.h"
#include "mongo/db/s/sharding_state_recovery.h"
@@ -309,7 +311,13 @@ void registerPrimaryOnlyServices(ServiceContext* serviceContext) {
std::vector<std::unique_ptr<repl::PrimaryOnlyService>> services;
services.push_back(std::make_unique<TenantMigrationDonorService>(serviceContext));
services.push_back(std::make_unique<repl::TenantMigrationRecipientService>(serviceContext));
- services.push_back(std::make_unique<ReshardingCoordinatorService>(serviceContext));
+
+ if (serverGlobalParams.clusterRole == ClusterRole::ConfigServer) {
+ services.push_back(std::make_unique<ReshardingCoordinatorService>(serviceContext));
+ } else if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
+ services.push_back(std::make_unique<ReshardingDonorService>(serviceContext));
+ services.push_back(std::make_unique<ReshardingRecipientService>(serviceContext));
+ }
for (auto& service : services) {
registry->registerService(std::move(service));
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index ddbc7ab1f0b..dc0100104da 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -67,6 +67,7 @@ env.Library(
'periodic_sharded_index_consistency_checker.cpp',
'range_deletion_util.cpp',
'read_only_catalog_cache_loader.cpp',
+ 'resharding/resharding_donor_recipient_common.cpp',
'resharding/resharding_op_observer.cpp',
'resharding/resharding_coordinator_observer.cpp',
'resharding/resharding_coordinator_service.cpp',
@@ -461,6 +462,7 @@ env.CppUnitTest(
'migration_session_id_test.cpp',
'migration_util_test.cpp',
'namespace_metadata_change_notifications_test.cpp',
+ 'resharding/resharding_donor_recipient_common_test.cpp',
'resharding/resharding_recipient_service_test.cpp',
'resharding_collection_test.cpp',
'resharding_destined_recipient_test.cpp',
diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
index d024f4f1e4e..d24eaba115d 100644
--- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
+++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
@@ -127,7 +127,8 @@ public:
std::set<ShardId> recipientShardIds;
std::vector<ChunkType> initialChunks;
ChunkVersion version(1, 0, OID::gen());
- auto tempReshardingNss = constructTemporaryReshardingNss(nss, cm);
+ auto tempReshardingNss = constructTemporaryReshardingNss(
+ nss.db(), getCollectionUUIDFromChunkManger(nss, cm));
if (presetReshardedChunksSpecified) {
const auto chunks = request().get_presetReshardedChunks().get();
@@ -211,7 +212,7 @@ public:
// This promise will currently be falsely fulfilled by a call to interrupt() inside
// the ReshardingCoordinatorService. This is to enable jsTests to pass while code
// is still being committed.
- // TODO SERVER-51212 Change this comment and assess the current call to .wait().
+ // TODO SERVER-51398 Change this comment and assess the current call to .wait().
instance->getObserver()->awaitAllDonorsReadyToDonate().wait();
}
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index e17e86635bd..b0d30953211 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -391,9 +391,7 @@ CollectionType createTempReshardingCollectionType(
TypeCollectionRecipientFields recipient(
std::move(donorShardIds), coordinatorDoc.getExistingUUID(), coordinatorDoc.getNss());
- if (coordinatorDoc.getFetchTimestampStruct().getFetchTimestamp()) {
- recipient.setFetchTimestampStruct(coordinatorDoc.getFetchTimestampStruct());
- }
+ emplaceFetchTimestampIfExists(recipient, coordinatorDoc.getFetchTimestamp());
tempEntryReshardingFields.setRecipientFields(recipient);
collType.setReshardingFields(std::move(tempEntryReshardingFields));
@@ -650,7 +648,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonat
return ExecutorFuture<void>(**executor, Status::OK());
}
- // TODO SERVER-51212 Remove this call.
+ // TODO SERVER-51398 Remove this call.
interrupt({ErrorCodes::InternalError, "Early exit to support jsTesting"});
return _reshardingCoordinatorObserver->awaitAllDonorsReadyToDonate()
@@ -764,15 +762,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_runUpdates(
// Build new state doc for coordinator state update
ReshardingCoordinatorDocument updatedCoordinatorDoc = updatedStateDoc;
updatedCoordinatorDoc.setState(nextState);
- if (fetchTimestamp) {
- auto& fetchTimestampStruct = updatedCoordinatorDoc.getFetchTimestampStruct();
- if (fetchTimestampStruct.getFetchTimestamp())
- invariant(fetchTimestampStruct.getFetchTimestamp().get() == fetchTimestamp.get());
-
- invariant(!fetchTimestamp->isNull());
-
- fetchTimestampStruct.setFetchTimestamp(std::move(fetchTimestamp));
- }
+ emplaceFetchTimestampIfExists(updatedCoordinatorDoc, std::move(fetchTimestamp));
auto opCtx = cc().makeOperationContext();
resharding::persistStateTransition(opCtx.get(), updatedCoordinatorDoc);
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
index 66256efcac6..e6a7daeafb6 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/s/config/config_server_test_fixture.h"
#include "mongo/db/s/resharding/resharding_coordinator_service.h"
+#include "mongo/db/s/resharding_util.h"
#include "mongo/db/s/transaction_coordinator_service.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/s/catalog/type_collection.h"
@@ -86,15 +87,7 @@ protected:
{DonorShardEntry(ShardId("shard0000"))},
{RecipientShardEntry(ShardId("shard0001"))});
doc.setCommonReshardingMetadata(meta);
- if (fetchTimestamp) {
- auto fetchTimestampStruct = doc.getFetchTimestampStruct();
- if (fetchTimestampStruct.getFetchTimestamp())
- invariant(fetchTimestampStruct.getFetchTimestamp().get() == fetchTimestamp.get());
-
- fetchTimestampStruct.setFetchTimestamp(std::move(fetchTimestamp));
- doc.setFetchTimestampStruct(fetchTimestampStruct);
- }
-
+ emplaceFetchTimestampIfExists(doc, std::move(fetchTimestamp));
return doc;
}
@@ -565,11 +558,7 @@ TEST_F(ReshardingCoordinatorPersistenceTest, PersistFetchTimestampStateTransitio
// Persist the updates on disk
auto expectedCoordinatorDoc = coordinatorDoc;
expectedCoordinatorDoc.setState(CoordinatorStateEnum::kCloning);
- Timestamp fetchTimestamp = Timestamp(1, 1);
- auto fetchTimestampStruct = expectedCoordinatorDoc.getFetchTimestampStruct();
- fetchTimestampStruct.setFetchTimestamp(std::move(fetchTimestamp));
- expectedCoordinatorDoc.setFetchTimestampStruct(fetchTimestampStruct);
-
+ emplaceFetchTimestampIfExists(expectedCoordinatorDoc, Timestamp(1, 1));
persistStateTransitionUpdateExpectSuccess(operationContext(), expectedCoordinatorDoc);
}
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
new file mode 100644
index 00000000000..8b848a95271
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.cpp
@@ -0,0 +1,184 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/resharding/resharding_donor_recipient_common.h"
+
+namespace mongo {
+namespace resharding {
+
+using DonorStateMachine = ReshardingDonorService::DonorStateMachine;
+using RecipientStateMachine = ReshardingRecipientService::RecipientStateMachine;
+
+namespace {
+std::vector<DonorShardMirroringEntry> createDonorShardMirroringEntriesFromDonorShardIds(
+ const std::vector<ShardId>& shardIds) {
+ std::vector<DonorShardMirroringEntry> donorShards(shardIds.size());
+ for (size_t i = 0; i < shardIds.size(); ++i) {
+ donorShards[i] = {shardIds[i], false /* mirroring */};
+ }
+ return donorShards;
+}
+
+/*
+ * Creates a ReshardingStateMachine with the assumption that the state machine does not already
+ * exist.
+ */
+template <class Service, class StateMachine, class ReshardingDocument>
+void createReshardingStateMachine(OperationContext* opCtx, const ReshardingDocument& doc) {
+ auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext());
+ auto service = registry->lookupServiceByName(Service::kServiceName);
+ StateMachine::getOrCreate(opCtx, service, doc.toBSON());
+}
+
+/*
+ * Either constructs a new ReshardingDonorStateMachine with 'reshardingFields' or passes
+ * 'reshardingFields' to an already-existing ReshardingDonorStateMachine.
+ */
+void processReshardingFieldsForDonorCollection(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const CollectionMetadata& metadata,
+ const ReshardingFields& reshardingFields) {
+ if (auto donorStateMachine = tryGetReshardingStateMachine<ReshardingDonorService,
+ DonorStateMachine,
+ ReshardingDonorDocument>(
+ opCtx, reshardingFields.getUuid())) {
+ donorStateMachine->get()->onReshardingFieldsChanges(reshardingFields);
+ return;
+ }
+
+ // If a resharding operation is past state kPreparingToDonate but does not currently have a
+ // donor document in-memory, this means that the document will be recovered by the
+ // ReshardingDonorService, and at that time the latest instance of 'reshardingFields' will be
+ // read. Return no-op.
+ if (reshardingFields.getState() > CoordinatorStateEnum::kPreparingToDonate) {
+ return;
+ }
+
+ auto donorDoc = constructDonorDocumentFromReshardingFields(nss, metadata, reshardingFields);
+ createReshardingStateMachine<ReshardingDonorService,
+ DonorStateMachine,
+ ReshardingDonorDocument>(opCtx, donorDoc);
+}
+
+/*
+ * Either constructs a new ReshardingRecipientStateMachine with 'reshardingFields' or passes
+ * 'reshardingFields' to an already-existing ReshardingRecipientStateMachine.
+ */
+void processReshardingFieldsForRecipientCollection(OperationContext* opCtx,
+ const CollectionMetadata& metadata,
+ const ReshardingFields& reshardingFields) {
+ if (auto recipientStateMachine = tryGetReshardingStateMachine<ReshardingRecipientService,
+ RecipientStateMachine,
+ ReshardingRecipientDocument>(
+ opCtx, reshardingFields.getUuid())) {
+ recipientStateMachine->get()->onReshardingFieldsChanges(reshardingFields);
+ return;
+ }
+
+ // If a resharding operation is past state kCloning but does not currently have a recipient
+ // document in-memory, this means that the document will be recovered by the
+ // ReshardingRecipientService, and at that time the latest instance of 'reshardingFields'
+ // will be read. Return no-op.
+ if (reshardingFields.getState() > CoordinatorStateEnum::kCloning) {
+ return;
+ }
+
+ auto recipientDoc =
+ constructRecipientDocumentFromReshardingFields(opCtx, metadata, reshardingFields);
+ createReshardingStateMachine<ReshardingRecipientService,
+ RecipientStateMachine,
+ ReshardingRecipientDocument>(opCtx, recipientDoc);
+}
+
+} // namespace
+
+ReshardingDonorDocument constructDonorDocumentFromReshardingFields(
+ const NamespaceString& nss,
+ const CollectionMetadata& metadata,
+ const ReshardingFields& reshardingFields) {
+ auto donorDoc = ReshardingDonorDocument(DonorStateEnum::kPreparingToDonate);
+
+ auto commonMetadata =
+ CommonReshardingMetadata(reshardingFields.getUuid(),
+ nss,
+ getCollectionUUIDFromChunkManger(nss, *metadata.getChunkManager()),
+ reshardingFields.getDonorFields()->getReshardingKey().toBSON());
+ donorDoc.setCommonReshardingMetadata(std::move(commonMetadata));
+
+ return donorDoc;
+}
+
+ReshardingRecipientDocument constructRecipientDocumentFromReshardingFields(
+ OperationContext* opCtx,
+ const CollectionMetadata& metadata,
+ const ReshardingFields& reshardingFields) {
+ std::vector<DonorShardMirroringEntry> donorShards =
+ createDonorShardMirroringEntriesFromDonorShardIds(
+ reshardingFields.getRecipientFields()->getDonorShardIds());
+
+ auto recipientDoc =
+ ReshardingRecipientDocument(RecipientStateEnum::kCloning, std::move(donorShards));
+
+ auto commonMetadata =
+ CommonReshardingMetadata(reshardingFields.getUuid(),
+ reshardingFields.getRecipientFields()->getOriginalNamespace(),
+ reshardingFields.getRecipientFields()->getExistingUUID(),
+ metadata.getShardKeyPattern().toBSON());
+ recipientDoc.setCommonReshardingMetadata(std::move(commonMetadata));
+
+ emplaceFetchTimestampIfExists(recipientDoc,
+ reshardingFields.getRecipientFields()->getFetchTimestamp());
+
+ return recipientDoc;
+}
+
+void processReshardingFieldsForCollection(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const CollectionMetadata& metadata,
+ const ReshardingFields& reshardingFields) {
+ if (reshardingFields.getDonorFields()) {
+ invariant(!reshardingFields.getRecipientFields());
+ processReshardingFieldsForDonorCollection(opCtx, nss, metadata, reshardingFields);
+ return;
+ }
+
+ if (reshardingFields.getRecipientFields()) {
+ invariant(!reshardingFields.getDonorFields());
+ processReshardingFieldsForRecipientCollection(opCtx, metadata, reshardingFields);
+ return;
+ }
+
+ MONGO_UNREACHABLE
+}
+
+} // namespace resharding
+
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common.h b/src/mongo/db/s/resharding/resharding_donor_recipient_common.h
new file mode 100644
index 00000000000..309be7d5230
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common.h
@@ -0,0 +1,80 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+
+#include "mongo/db/s/collection_metadata.h"
+#include "mongo/db/s/resharding/resharding_donor_service.h"
+#include "mongo/db/s/resharding/resharding_recipient_service.h"
+#include "mongo/db/s/resharding_util.h"
+
+namespace mongo {
+namespace resharding {
+
+using ReshardingFields = TypeCollectionReshardingFields;
+
+/**
+ * Looks up the StateMachine by the 'reshardingUUID'. If it does not exist, returns boost::none.
+ */
+template <class Service, class StateMachine, class ReshardingDocument>
+boost::optional<std::shared_ptr<StateMachine>> tryGetReshardingStateMachine(
+ OperationContext* opCtx, const UUID& reshardingUUID) {
+ auto instanceId = BSON(ReshardingDocument::k_idFieldName << reshardingUUID);
+ auto registry = repl::PrimaryOnlyServiceRegistry::get(opCtx->getServiceContext());
+ auto service = registry->lookupServiceByName(Service::kServiceName);
+ return StateMachine::lookup(opCtx, service, instanceId);
+}
+
+/**
+ * The following functions construct a ReshardingDocument from the given 'reshardingFields'.
+ */
+ReshardingDonorDocument constructDonorDocumentFromReshardingFields(
+ const NamespaceString& nss,
+ const CollectionMetadata& metadata,
+ const ReshardingFields& reshardingFields);
+
+ReshardingRecipientDocument constructRecipientDocumentFromReshardingFields(
+ OperationContext* opCtx,
+ const CollectionMetadata& metadata,
+ const ReshardingFields& reshardingFields);
+
+/**
+ * Takes in the reshardingFields from a collection's config.collections entry and gives the
+ * corresponding ReshardingDonorStateMachine or ReshardingRecipientStateMachine the updated
+ * information. Will construct a ReshardingDonorStateMachine or ReshardingRecipientStateMachine if:
+ * 1. The reshardingFields state indicates that the resharding operation is new, and
+ * 2. A state machine does not exist on this node for the given namespace.
+ */
+void processReshardingFieldsForCollection(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const CollectionMetadata& metadata,
+ const ReshardingFields& reshardingFields);
+
+} // namespace resharding
+
+} // namespace mongo \ No newline at end of file
diff --git a/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
new file mode 100644
index 00000000000..f479d02942e
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_donor_recipient_common_test.cpp
@@ -0,0 +1,374 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/repl/wait_for_majority_service.h"
+#include "mongo/db/s/collection_sharding_runtime.h"
+#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/s/resharding/resharding_donor_recipient_common.h"
+#include "mongo/db/s/shard_server_test_fixture.h"
+#include "mongo/unittest/death_test.h"
+#include "mongo/util/fail_point.h"
+
+namespace mongo {
+namespace {
+
+using namespace fmt::literals;
+
+class ReshardingDonorRecipientCommonInternalsTest : public ShardServerTestFixture {
+public:
+ const UUID kExistingUUID = UUID::gen();
+ const NamespaceString kOriginalNss = NamespaceString("db", "foo");
+
+ const NamespaceString kTemporaryReshardingNss =
+ constructTemporaryReshardingNss("db", kExistingUUID);
+ const std::string kOriginalShardKey = "oldKey";
+ const BSONObj kOriginalShardKeyPattern = BSON(kOriginalShardKey << 1);
+ const std::string kReshardingKey = "newKey";
+ const BSONObj kReshardingKeyPattern = BSON(kReshardingKey << 1);
+ const OID kOriginalEpoch = OID::gen();
+ const OID kReshardingEpoch = OID::gen();
+ const UUID kReshardingUUID = UUID::gen();
+
+ const ShardId kShardOne = ShardId("shardOne");
+ const ShardId kShardTwo = ShardId("shardTwo");
+
+ const std::vector<ShardId> kShardIds = {kShardOne, kShardTwo};
+
+ const Timestamp kFetchTimestamp = Timestamp(1, 0);
+
+protected:
+ CollectionMetadata makeShardedMetadataForOriginalCollection(OperationContext* opCtx) {
+ return makeShardedMetadata(opCtx,
+ kOriginalNss,
+ kOriginalShardKey,
+ kOriginalShardKeyPattern,
+ kExistingUUID,
+ kOriginalEpoch);
+ }
+
+ CollectionMetadata makeShardedMetadataForTemporaryReshardingCollection(
+ OperationContext* opCtx) {
+ return makeShardedMetadata(opCtx,
+ kTemporaryReshardingNss,
+ kReshardingKey,
+ kReshardingKeyPattern,
+ kReshardingUUID,
+ kReshardingEpoch);
+ }
+
+ CollectionMetadata makeShardedMetadata(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const std::string& shardKey,
+ const BSONObj& shardKeyPattern,
+ const UUID& uuid,
+ const OID& epoch) {
+ auto range = ChunkRange(BSON(shardKey << MINKEY), BSON(shardKey << MAXKEY));
+ auto chunk = ChunkType(nss, std::move(range), ChunkVersion(1, 0, epoch), kShardTwo);
+ ChunkManager cm(
+ kShardOne,
+ DatabaseVersion(uuid, 1),
+ makeStandaloneRoutingTableHistory(RoutingTableHistory::makeNew(nss,
+ uuid,
+ shardKeyPattern,
+ nullptr,
+ false,
+ epoch,
+ boost::none,
+ {std::move(chunk)})),
+ boost::none);
+
+ if (!OperationShardingState::isOperationVersioned(opCtx)) {
+ const auto version = cm.getVersion(kShardOne);
+ BSONObjBuilder builder;
+ version.appendToCommand(&builder);
+
+ auto& oss = OperationShardingState::get(opCtx);
+ oss.initializeClientRoutingVersionsFromCommand(nss, builder.obj());
+ }
+
+ return CollectionMetadata(std::move(cm), kShardOne);
+ }
+
+ ReshardingFields createCommonReshardingFields(const UUID& reshardingUUID,
+ CoordinatorStateEnum state) {
+ auto fields = ReshardingFields(reshardingUUID);
+ fields.setState(state);
+ return fields;
+ };
+
+ void appendDonorFieldsToReshardingFields(ReshardingFields& fields,
+ const BSONObj& reshardingKey) {
+ fields.setDonorFields(TypeCollectionDonorFields(reshardingKey));
+ }
+
+ void appendRecipientFieldsToReshardingFields(
+ ReshardingFields& fields,
+ const std::vector<ShardId> donorShardIds,
+ const UUID& existingUUID,
+ const NamespaceString& originalNss,
+ const boost::optional<Timestamp>& fetchTimestamp = boost::none) {
+ auto recipientFields =
+ TypeCollectionRecipientFields(donorShardIds, existingUUID, originalNss);
+ emplaceFetchTimestampIfExists(recipientFields, fetchTimestamp);
+ fields.setRecipientFields(std::move(recipientFields));
+ }
+
+ template <class ReshardingDocument>
+ void assertCommonDocFieldsMatchReshardingFields(const NamespaceString& nss,
+ const UUID& reshardingUUID,
+ const UUID& existingUUID,
+ const BSONObj& reshardingKey,
+ const ReshardingDocument& reshardingDoc) {
+ ASSERT_EQ(reshardingDoc.get_id(), reshardingUUID);
+ ASSERT_EQ(reshardingDoc.getNss(), nss);
+ ASSERT_EQ(reshardingDoc.getExistingUUID(), existingUUID);
+ ASSERT_BSONOBJ_EQ(reshardingDoc.getReshardingKey(), reshardingKey);
+ }
+
+ void assertDonorDocMatchesReshardingFields(const NamespaceString& nss,
+ const UUID& existingUUID,
+ const ReshardingFields& reshardingFields,
+ const ReshardingDonorDocument& donorDoc) {
+ assertCommonDocFieldsMatchReshardingFields<ReshardingDonorDocument>(
+ nss,
+ reshardingFields.getUuid(),
+ existingUUID,
+ reshardingFields.getDonorFields()->getReshardingKey().toBSON(),
+ donorDoc);
+ ASSERT(donorDoc.getState() == DonorStateEnum::kPreparingToDonate);
+ ASSERT(donorDoc.getMinFetchTimestamp() == boost::none);
+ }
+
+ void assertRecipientDocMatchesReshardingFields(
+ const CollectionMetadata& metadata,
+ const ReshardingFields& reshardingFields,
+ const ReshardingRecipientDocument& recipientDoc) {
+ assertCommonDocFieldsMatchReshardingFields<ReshardingRecipientDocument>(
+ reshardingFields.getRecipientFields()->getOriginalNamespace(),
+ reshardingFields.getUuid(),
+ reshardingFields.getRecipientFields()->getExistingUUID(),
+ metadata.getShardKeyPattern().toBSON(),
+ recipientDoc);
+
+ ASSERT(recipientDoc.getState() == RecipientStateEnum::kCloning);
+ ASSERT(recipientDoc.getFetchTimestamp() ==
+ reshardingFields.getRecipientFields()->getFetchTimestamp());
+
+ auto donorShardIds = reshardingFields.getRecipientFields()->getDonorShardIds();
+ auto donorShardIdsSet = std::set<ShardId>(donorShardIds.begin(), donorShardIds.end());
+
+ for (const auto& donorShardMirroringEntry : recipientDoc.getDonorShardsMirroring()) {
+ ASSERT_EQ(donorShardMirroringEntry.getMirroring(), false);
+ auto reshardingFieldsDonorShardId =
+ donorShardIdsSet.find(donorShardMirroringEntry.getId());
+ ASSERT(reshardingFieldsDonorShardId != donorShardIdsSet.end());
+ donorShardIdsSet.erase(reshardingFieldsDonorShardId);
+ }
+
+ ASSERT(donorShardIdsSet.empty());
+ }
+};
+
+TEST_F(ReshardingDonorRecipientCommonInternalsTest, ConstructDonorDocumentFromReshardingFields) {
+ OperationContext* opCtx = operationContext();
+ auto metadata = makeShardedMetadataForOriginalCollection(opCtx);
+
+ auto reshardingFields =
+ createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate);
+ appendDonorFieldsToReshardingFields(reshardingFields, kReshardingKeyPattern);
+
+ auto donorDoc = resharding::constructDonorDocumentFromReshardingFields(
+ kOriginalNss, metadata, reshardingFields);
+ assertDonorDocMatchesReshardingFields(kOriginalNss, kExistingUUID, reshardingFields, donorDoc);
+}
+
+TEST_F(ReshardingDonorRecipientCommonInternalsTest,
+ ConstructRecipientDocumentFromReshardingFields) {
+ OperationContext* opCtx = operationContext();
+ auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx);
+
+ auto reshardingFields =
+ createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kCloning);
+
+ appendRecipientFieldsToReshardingFields(
+ reshardingFields, kShardIds, kExistingUUID, kOriginalNss, kFetchTimestamp);
+
+ auto recipientDoc = resharding::constructRecipientDocumentFromReshardingFields(
+ opCtx, metadata, reshardingFields);
+ assertRecipientDocMatchesReshardingFields(metadata, reshardingFields, recipientDoc);
+}
+
+class ReshardingDonorRecipientCommonTest : public ReshardingDonorRecipientCommonInternalsTest {
+public:
+ void setUp() override {
+ ShardServerTestFixture::setUp();
+
+ WaitForMajorityService::get(getServiceContext()).setUp(getServiceContext());
+
+ _registry = repl::PrimaryOnlyServiceRegistry::get(getServiceContext());
+
+ std::unique_ptr<ReshardingDonorService> donorService =
+ std::make_unique<ReshardingDonorService>(getServiceContext());
+ _registry->registerService(std::move(donorService));
+
+ std::unique_ptr<ReshardingRecipientService> recipientService =
+ std::make_unique<ReshardingRecipientService>(getServiceContext());
+ _registry->registerService(std::move(recipientService));
+ _registry->onStartup(operationContext());
+
+ stepUp();
+ }
+
+ void tearDown() override {
+ WaitForMajorityService::get(getServiceContext()).shutDown();
+
+ _registry->onShutdown();
+
+ ShardServerTestFixture::tearDown();
+ }
+
+ void stepUp() {
+ auto replCoord = repl::ReplicationCoordinator::get(getServiceContext());
+
+ // Advance term
+ _term++;
+
+ ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
+ ASSERT_OK(replCoord->updateTerm(operationContext(), _term));
+ replCoord->setMyLastAppliedOpTimeAndWallTime(
+ repl::OpTimeAndWallTime(repl::OpTime(Timestamp(1, 1), _term), Date_t()));
+
+ _registry->onStepUpComplete(operationContext(), _term);
+ }
+
+protected:
+ repl::PrimaryOnlyServiceRegistry* _registry;
+ long long _term = 0;
+};
+
+TEST_F(ReshardingDonorRecipientCommonTest, CreateDonorServiceInstance) {
+ OperationContext* opCtx = operationContext();
+ auto metadata = makeShardedMetadataForOriginalCollection(opCtx);
+
+ auto reshardingFields =
+ createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kPreparingToDonate);
+ appendDonorFieldsToReshardingFields(reshardingFields, kReshardingKeyPattern);
+
+ resharding::processReshardingFieldsForCollection(
+ opCtx, kOriginalNss, metadata, reshardingFields);
+
+ auto donorStateMachine =
+ resharding::tryGetReshardingStateMachine<ReshardingDonorService,
+ ReshardingDonorService::DonorStateMachine,
+ ReshardingDonorDocument>(opCtx, kReshardingUUID);
+
+ ASSERT(donorStateMachine != boost::none);
+}
+
+TEST_F(ReshardingDonorRecipientCommonTest, CreateRecipientServiceInstance) {
+ OperationContext* opCtx = operationContext();
+ auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx);
+
+ auto reshardingFields =
+ createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kCloning);
+ appendRecipientFieldsToReshardingFields(
+ reshardingFields, kShardIds, kExistingUUID, kOriginalNss, kFetchTimestamp);
+
+ resharding::processReshardingFieldsForCollection(
+ opCtx, kTemporaryReshardingNss, metadata, reshardingFields);
+
+ auto recipientStateMachine =
+ resharding::tryGetReshardingStateMachine<ReshardingRecipientService,
+ ReshardingRecipientService::RecipientStateMachine,
+ ReshardingRecipientDocument>(opCtx,
+ kReshardingUUID);
+
+ ASSERT(recipientStateMachine != boost::none);
+}
+
+TEST_F(ReshardingDonorRecipientCommonTest,
+ CreateDonorServiceInstanceWithIncorrectCoordinatorState) {
+ OperationContext* opCtx = operationContext();
+ auto metadata = makeShardedMetadataForOriginalCollection(opCtx);
+
+ auto reshardingFields =
+ createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kCommitted);
+ appendDonorFieldsToReshardingFields(reshardingFields, kReshardingKeyPattern);
+
+ auto donorStateMachine =
+ resharding::tryGetReshardingStateMachine<ReshardingDonorService,
+ ReshardingDonorService::DonorStateMachine,
+ ReshardingDonorDocument>(opCtx, kReshardingUUID);
+
+ ASSERT(donorStateMachine == boost::none);
+}
+
+TEST_F(ReshardingDonorRecipientCommonTest,
+ CreateRecipientServiceInstanceWithIncorrectCoordinatorState) {
+ OperationContext* opCtx = operationContext();
+ auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx);
+
+ auto reshardingFields =
+ createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kCommitted);
+ appendRecipientFieldsToReshardingFields(
+ reshardingFields, kShardIds, kExistingUUID, kOriginalNss, kFetchTimestamp);
+
+ resharding::processReshardingFieldsForCollection(
+ opCtx, kTemporaryReshardingNss, metadata, reshardingFields);
+
+ auto recipientStateMachine =
+ resharding::tryGetReshardingStateMachine<ReshardingRecipientService,
+ ReshardingRecipientService::RecipientStateMachine,
+ ReshardingRecipientDocument>(opCtx,
+ kReshardingUUID);
+
+ ASSERT(recipientStateMachine == boost::none);
+}
+
+DEATH_TEST_F(ReshardingDonorRecipientCommonTest,
+ ProcessReshardingFieldsWithoutDonorOrRecipientFields,
+ "invariant") {
+ OperationContext* opCtx = operationContext();
+ auto metadata = makeShardedMetadataForTemporaryReshardingCollection(opCtx);
+
+ auto reshardingFields =
+ createCommonReshardingFields(kReshardingUUID, CoordinatorStateEnum::kCloning);
+
+ resharding::processReshardingFieldsForCollection(
+ opCtx, kTemporaryReshardingNss, metadata, reshardingFields);
+}
+
+} // namespace
+
+} // namespace mongo \ No newline at end of file
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h
index e26ef0aa737..90e63a6e2c1 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.h
+++ b/src/mongo/db/s/resharding/resharding_donor_service.h
@@ -34,10 +34,11 @@
#include "mongo/s/resharding/type_collection_fields_gen.h"
namespace mongo {
-constexpr StringData kReshardingDonorServiceName = "ReshardingDonorService"_sd;
class ReshardingDonorService final : public repl::PrimaryOnlyService {
public:
+ static constexpr StringData kServiceName = "ReshardingDonorService"_sd;
+
explicit ReshardingDonorService(ServiceContext* serviceContext)
: PrimaryOnlyService(serviceContext) {}
~ReshardingDonorService() = default;
@@ -45,7 +46,7 @@ public:
class DonorStateMachine;
StringData getServiceName() const override {
- return kReshardingDonorServiceName;
+ return kServiceName;
}
NamespaceString getStateDocumentsNS() const override {
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index 6fa3ca44f4d..07980cd9740 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -279,15 +279,7 @@ void ReshardingRecipientService::RecipientStateMachine::_transitionState(
return;
}
- if (fetchTimestamp) {
- auto& fetchTimestampStruct = replacementDoc.getFetchTimestampStruct();
-
- // If the recipient is recovering and already knows the fetchTimestamp, it cannot change
- if (fetchTimestampStruct.getFetchTimestamp())
- invariant(fetchTimestampStruct.getFetchTimestamp().get() == fetchTimestamp.get());
-
- fetchTimestampStruct.setFetchTimestamp(std::move(fetchTimestamp));
- }
+ emplaceFetchTimestampIfExists(replacementDoc, std::move(fetchTimestamp));
_updateRecipientDocument(std::move(replacementDoc));
}
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index 6b13827e31b..61a8e1e3a12 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -34,7 +34,6 @@
#include "mongo/s/resharding/type_collection_fields_gen.h"
namespace mongo {
-constexpr StringData kReshardingRecipientServiceName = "ReshardingRecipientService"_sd;
namespace resharding {
@@ -51,6 +50,8 @@ void createTemporaryReshardingCollectionLocally(OperationContext* opCtx,
class ReshardingRecipientService final : public repl::PrimaryOnlyService {
public:
+ static constexpr StringData kServiceName = "ReshardingRecipientService"_sd;
+
explicit ReshardingRecipientService(ServiceContext* serviceContext)
: PrimaryOnlyService(serviceContext) {}
~ReshardingRecipientService() = default;
@@ -58,7 +59,7 @@ public:
class RecipientStateMachine;
StringData getServiceName() const override {
- return kReshardingRecipientServiceName;
+ return kServiceName;
}
NamespaceString getStateDocumentsNS() const override {
diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp
index 8cc41078863..6c3db032662 100644
--- a/src/mongo/db/s/resharding_util.cpp
+++ b/src/mongo/db/s/resharding_util.cpp
@@ -72,15 +72,6 @@ UUID getCollectionUuid(OperationContext* opCtx, const NamespaceString& nss) {
return *uuid;
}
-// Assembles the namespace string for the temporary resharding collection based on the source
-// namespace components.
-NamespaceString getTempReshardingNss(StringData db, const UUID& sourceUuid) {
- return NamespaceString(db,
- fmt::format("{}{}",
- NamespaceString::kTemporaryReshardingCollectionPrefix,
- sourceUuid.toString()));
-}
-
// Ensure that this shard owns the document. This must be called after verifying that we
// are in a resharding operation so that we are guaranteed that migrations are suspended.
bool documentBelongsToMe(OperationContext* opCtx,
@@ -152,14 +143,12 @@ UUID getCollectionUUIDFromChunkManger(const NamespaceString& originalNss, const
return collectionUUID.get();
}
-NamespaceString constructTemporaryReshardingNss(const NamespaceString& originalNss,
- const ChunkManager& cm) {
- auto collectionUUID = getCollectionUUIDFromChunkManger(originalNss, cm);
- NamespaceString tempReshardingNss(
- originalNss.db(),
- "{}{}"_format(NamespaceString::kTemporaryReshardingCollectionPrefix,
- collectionUUID.toString()));
- return tempReshardingNss;
+
+NamespaceString constructTemporaryReshardingNss(StringData db, const UUID& sourceUuid) {
+ return NamespaceString(db,
+ fmt::format("{}{}",
+ NamespaceString::kTemporaryReshardingCollectionPrefix,
+ sourceUuid.toString()));
}
BatchedCommandRequest buildInsertOp(const NamespaceString& nss, std::vector<BSONObj> docs) {
@@ -716,7 +705,7 @@ boost::optional<ShardId> getDestinedRecipient(OperationContext* opCtx,
bool allowLocks = true;
auto tempNssRoutingInfo = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(
opCtx,
- getTempReshardingNss(sourceNss.db(), getCollectionUuid(opCtx, sourceNss)),
+ constructTemporaryReshardingNss(sourceNss.db(), getCollectionUuid(opCtx, sourceNss)),
allowLocks);
uassert(ShardInvalidatedForTargetingInfo(sourceNss),
diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h
index 0ef4d258815..58f4fd5d0a6 100644
--- a/src/mongo/db/s/resharding_util.h
+++ b/src/mongo/db/s/resharding_util.h
@@ -50,6 +50,28 @@ namespace mongo {
constexpr auto kReshardingOplogPrePostImageOps = "prePostImageOps"_sd;
/**
+ * Emplaces the 'fetchTimestamp' onto the ClassWithFetchTimestamp if the timestamp has been
+ * emplaced inside the boost::optional.
+ */
+template <class ClassWithFetchTimestamp>
+void emplaceFetchTimestampIfExists(ClassWithFetchTimestamp& c,
+ boost::optional<Timestamp> fetchTimestamp) {
+ if (!fetchTimestamp) {
+ return;
+ }
+
+ invariant(!fetchTimestamp->isNull());
+
+ if (auto alreadyExistingFetchTimestamp = c.getFetchTimestamp()) {
+ invariant(fetchTimestamp == alreadyExistingFetchTimestamp);
+ }
+
+ FetchTimestamp fetchTimestampStruct;
+ fetchTimestampStruct.setFetchTimestamp(std::move(fetchTimestamp));
+ c.setFetchTimestampStruct(std::move(fetchTimestampStruct));
+}
+
+/**
* Helper method to construct a DonorShardEntry with the fields specified.
*/
DonorShardEntry makeDonorShard(ShardId shardId,
@@ -72,15 +94,12 @@ RecipientShardEntry makeRecipientShard(
UUID getCollectionUUIDFromChunkManger(const NamespaceString& nss, const ChunkManager& cm);
/**
- * Constructs the temporary resharding collection's namespace provided the original collection's
- * namespace and chunk manager.
+ * Assembles the namespace string for the temporary resharding collection based on the source
+ * namespace components.
*
* <db>.system.resharding.<existing collection's UUID>
- *
- * Note: throws if the original collection does not have a UUID.
*/
-NamespaceString constructTemporaryReshardingNss(const NamespaceString& originalNss,
- const ChunkManager& cm);
+NamespaceString constructTemporaryReshardingNss(StringData db, const UUID& sourceUuid);
/**
* Constructs a BatchedCommandRequest with batch type 'Insert'.
diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
index 614032749bb..05f6e2bb3f2 100644
--- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
+++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/s/database_sharding_state.h"
#include "mongo/db/s/migration_util.h"
#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/s/resharding/resharding_donor_recipient_common.h"
#include "mongo/db/s/sharding_runtime_d_params_gen.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/s/sharding_statistics.h"
@@ -130,6 +131,16 @@ SharedSemiFuture<void> recoverRefreshShardVersion(ServiceContext* serviceContext
}
currentMetadata = forceGetCurrentMetadata(opCtx, nss);
+
+ if (currentMetadata && currentMetadata->isSharded()) {
+ // If the collection metadata after a refresh has 'reshardingFields', then pass it
+ // to the resharding subsystem to process.
+ const auto& reshardingFields = currentMetadata->getReshardingFields();
+ if (reshardingFields) {
+ resharding::processReshardingFieldsForCollection(
+ opCtx, nss, *currentMetadata, *reshardingFields);
+ }
+ }
})
.semi()
.share();