summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/s/SConscript3
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.cpp10
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.h5
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service_test.cpp5
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp144
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h34
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service_external_state.cpp165
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service_external_state.h146
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service_external_state_test.cpp (renamed from src/mongo/db/s/resharding/resharding_recipient_service_test.cpp)66
9 files changed, 391 insertions, 187 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 4d04c6efc51..bd03fc9e93c 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -82,6 +82,7 @@ env.Library(
'resharding/resharding_oplog_fetcher.cpp',
'resharding/resharding_oplog_session_application.cpp',
'resharding/resharding_recipient_service.cpp',
+ 'resharding/resharding_recipient_service_external_state.cpp',
'resharding/resharding_server_parameters.idl',
'resharding/resharding_txn_cloner.cpp',
'resharding/resharding_txn_cloner_progress.idl',
@@ -482,7 +483,7 @@ env.CppUnitTest(
'resharding/resharding_oplog_crud_application_test.cpp',
'resharding/resharding_oplog_fetcher_test.cpp',
'resharding/resharding_oplog_session_application_test.cpp',
- 'resharding/resharding_recipient_service_test.cpp',
+ 'resharding/resharding_recipient_service_external_state_test.cpp',
'resharding/resharding_donor_service_test.cpp',
'resharding/resharding_txn_cloner_test.cpp',
'session_catalog_migration_destination_test.cpp',
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp
index 791128053b5..2ed925efb80 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp
@@ -185,16 +185,12 @@ public:
std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingDonorService::constructInstance(
BSONObj initialState) {
- return std::make_shared<DonorStateMachine>(std::move(initialState),
- std::make_unique<ExternalStateImpl>());
+ return std::make_shared<DonorStateMachine>(
+ ReshardingDonorDocument::parse({"DonorStateMachine"}, initialState),
+ std::make_unique<ExternalStateImpl>());
}
ReshardingDonorService::DonorStateMachine::DonorStateMachine(
- const BSONObj& donorDoc, std::unique_ptr<DonorStateMachineExternalState> externalState)
- : DonorStateMachine(ReshardingDonorDocument::parse({"DonorStateMachine"}, donorDoc),
- std::move(externalState)) {}
-
-ReshardingDonorService::DonorStateMachine::DonorStateMachine(
const ReshardingDonorDocument& donorDoc,
std::unique_ptr<DonorStateMachineExternalState> externalState)
: repl::PrimaryOnlyService::TypedInstance<DonorStateMachine>(),
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h
index 11addfc69bc..ab74589d603 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.h
+++ b/src/mongo/db/s/resharding/resharding_donor_service.h
@@ -73,7 +73,7 @@ public:
class ReshardingDonorService::DonorStateMachine final
: public repl::PrimaryOnlyService::TypedInstance<DonorStateMachine> {
public:
- explicit DonorStateMachine(const BSONObj& donorDoc,
+ explicit DonorStateMachine(const ReshardingDonorDocument& donorDoc,
std::unique_ptr<DonorStateMachineExternalState> externalState);
~DonorStateMachine();
@@ -104,9 +104,6 @@ public:
const ReshardingDonorDocument& donorDoc);
private:
- DonorStateMachine(const ReshardingDonorDocument& donorDoc,
- std::unique_ptr<DonorStateMachineExternalState> externalState);
-
/**
* Runs up until the donor is either in state kBlockingWrites or encountered an error.
*/
diff --git a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
index 2636d00f0b2..fdac7743862 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service_test.cpp
@@ -167,8 +167,9 @@ public:
: ReshardingDonorService(serviceContext) {}
std::shared_ptr<PrimaryOnlyService::Instance> constructInstance(BSONObj initialState) override {
- return std::make_shared<DonorStateMachine>(std::move(initialState),
- std::make_unique<ExternalStateForTest>());
+ return std::make_shared<DonorStateMachine>(
+ ReshardingDonorDocument::parse({"ReshardingDonorServiceForTest"}, initialState),
+ std::make_unique<ExternalStateForTest>());
}
};
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index 29d50b16c3f..1167c7ed61a 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -47,6 +47,7 @@
#include "mongo/db/s/resharding/resharding_data_copy_util.h"
#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_oplog_applier.h"
+#include "mongo/db/s/resharding/resharding_recipient_service_external_state.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/s/resharding_util.h"
#include "mongo/db/s/shard_key_util.h"
@@ -93,82 +94,19 @@ void ensureFulfilledPromise(WithLock lk, SharedPromise<T>& sp, T value) {
} // namespace
-namespace resharding {
-
-void createTemporaryReshardingCollectionLocally(OperationContext* opCtx,
- const NamespaceString& originalNss,
- const NamespaceString& reshardingNss,
- const UUID& reshardingUUID,
- const UUID& existingUUID,
- Timestamp fetchTimestamp) {
- LOGV2_DEBUG(
- 5002300, 1, "Creating temporary resharding collection", "originalNss"_attr = originalNss);
-
- auto catalogCache = Grid::get(opCtx)->catalogCache();
-
- // Load the original collection's options from the database's primary shard.
- auto [collOptions, uuid] = shardVersionRetry(
- opCtx,
- catalogCache,
- reshardingNss,
- "loading collection options to create temporary resharding collection"_sd,
- [&]() -> MigrationDestinationManager::CollectionOptionsAndUUID {
- auto originalCm = uassertStatusOK(
- catalogCache->getShardedCollectionRoutingInfoWithRefresh(opCtx, originalNss));
- return MigrationDestinationManager::getCollectionOptions(
- opCtx,
- NamespaceStringOrUUID(originalNss.db().toString(), existingUUID),
- originalCm.dbPrimary(),
- originalCm,
- fetchTimestamp);
- });
-
- // Load the original collection's indexes from the shard that owns the global minimum chunk.
- auto [indexes, idIndex] =
- shardVersionRetry(opCtx,
- catalogCache,
- reshardingNss,
- "loading indexes to create temporary resharding collection"_sd,
- [&]() -> MigrationDestinationManager::IndexesAndIdIndex {
- auto originalCm =
- catalogCache->getShardedCollectionRoutingInfo(opCtx, originalNss);
- auto indexShardId = originalCm.getMinKeyShardIdWithSimpleCollation();
- return MigrationDestinationManager::getCollectionIndexes(
- opCtx,
- NamespaceStringOrUUID(originalNss.db().toString(), existingUUID),
- indexShardId,
- originalCm,
- fetchTimestamp);
- });
-
- // Set the temporary resharding collection's UUID to the resharding UUID. Note that
- // BSONObj::addFields() replaces any fields that already exist.
- collOptions = collOptions.addFields(BSON("uuid" << reshardingUUID));
- CollectionOptionsAndIndexes optionsAndIndexes = {reshardingUUID, indexes, idIndex, collOptions};
- MigrationDestinationManager::cloneCollectionIndexesAndOptions(
- opCtx, reshardingNss, optionsAndIndexes);
-}
-
-} // namespace resharding
-
std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingRecipientService::constructInstance(
BSONObj initialState) {
return std::make_shared<RecipientStateMachine>(
- this, std::move(initialState), ReshardingDataReplication::make);
+ this,
+ ReshardingRecipientDocument::parse({"RecipientStateMachine"}, initialState),
+ std::make_unique<RecipientStateMachineExternalStateImpl>(),
+ ReshardingDataReplication::make);
}
ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine(
const ReshardingRecipientService* recipientService,
- const BSONObj& recipientDoc,
- ReshardingDataReplicationFactory dataReplicationFactory)
- : RecipientStateMachine(
- recipientService,
- ReshardingRecipientDocument::parse({"RecipientStateMachine"}, recipientDoc),
- std::move(dataReplicationFactory)) {}
-
-ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine(
- const ReshardingRecipientService* recipientService,
const ReshardingRecipientDocument& recipientDoc,
+ std::unique_ptr<RecipientStateMachineExternalState> externalState,
ReshardingDataReplicationFactory dataReplicationFactory)
: repl::PrimaryOnlyService::TypedInstance<RecipientStateMachine>(),
_recipientService{recipientService},
@@ -177,6 +115,7 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine(
_recipientCtx{recipientDoc.getMutableState()},
_donorShards{recipientDoc.getDonorShards()},
_cloneTimestamp{recipientDoc.getCloneTimestamp()},
+ _externalState{std::move(externalState)},
_markKilledExecutor(std::make_shared<ThreadPool>([] {
ThreadPool::Options options;
options.poolName = "RecipientStateMachineCancelableOpCtxPool";
@@ -184,7 +123,9 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine(
options.maxThreads = 1;
return options;
}())),
- _dataReplicationFactory{std::move(dataReplicationFactory)} {}
+ _dataReplicationFactory{std::move(dataReplicationFactory)} {
+ invariant(_externalState);
+}
ReshardingRecipientService::RecipientStateMachine::~RecipientStateMachine() {
stdx::lock_guard<Latch> lg(_mutex);
@@ -388,29 +329,22 @@ void ReshardingRecipientService::RecipientStateMachine::
{
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- resharding::createTemporaryReshardingCollectionLocally(opCtx.get(),
- _metadata.getSourceNss(),
- _metadata.getTempReshardingNss(),
- _metadata.getReshardingUUID(),
- _metadata.getSourceUUID(),
- *_cloneTimestamp);
-
- ShardKeyPattern shardKeyPattern{_metadata.getReshardingKey()};
-
- auto catalogCache = Grid::get(opCtx.get())->catalogCache();
- shardVersionRetry(opCtx.get(),
- catalogCache,
- _metadata.getTempReshardingNss(),
- "validating shard key index for reshardCollection"_sd,
- [&] {
- shardkeyutil::validateShardKeyIndexExistsOrCreateIfPossible(
- opCtx.get(),
- _metadata.getTempReshardingNss(),
- shardKeyPattern,
- CollationSpec::kSimpleSpec,
- false,
- shardkeyutil::ValidationBehaviorsShardCollection(opCtx.get()));
- });
+ _externalState->ensureTempReshardingCollectionExistsWithIndexes(
+ opCtx.get(), _metadata, *_cloneTimestamp);
+
+ _externalState->withShardVersionRetry(
+ opCtx.get(),
+ _metadata.getTempReshardingNss(),
+ "validating shard key index for reshardCollection"_sd,
+ [&] {
+ shardkeyutil::validateShardKeyIndexExistsOrCreateIfPossible(
+ opCtx.get(),
+ _metadata.getTempReshardingNss(),
+ ShardKeyPattern{_metadata.getReshardingKey()},
+ CollationSpec::kSimpleSpec,
+ false /* unique */,
+ shardkeyutil::ValidationBehaviorsShardCollection(opCtx.get()));
+ });
}
_transitionState(RecipientStateEnum::kCloning);
@@ -421,10 +355,9 @@ ReshardingRecipientService::RecipientStateMachine::_makeDataReplication(Operatio
bool cloningDone) {
invariant(_cloneTimestamp);
- auto myShardId = ShardingState::get(opCtx->getServiceContext())->shardId();
- auto catalogCache = Grid::get(opCtx)->catalogCache();
+ auto myShardId = _externalState->myShardId(opCtx->getServiceContext());
auto sourceChunkMgr =
- catalogCache->getShardedCollectionRoutingInfo(opCtx, _metadata.getSourceNss());
+ _externalState->getShardedCollectionRoutingInfo(opCtx, _metadata.getSourceNss());
return _dataReplicationFactory(opCtx,
_metrics(),
@@ -562,7 +495,7 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
_transitionState(RecipientStateEnum::kStrictConsistency);
const bool isAlsoDonor = [&] {
- auto myShardId = ShardingState::get(cc().getServiceContext())->shardId();
+ auto myShardId = _externalState->myShardId(cc().getServiceContext());
return std::find_if(_donorShards.begin(),
_donorShards.end(),
[&](const DonorShardFetchTimestamp& donor) {
@@ -596,7 +529,7 @@ void ReshardingRecipientService::RecipientStateMachine::_renameTemporaryReshardi
}
const bool isAlsoDonor = [&] {
- auto myShardId = ShardingState::get(cc().getServiceContext())->shardId();
+ auto myShardId = _externalState->myShardId(cc().getServiceContext());
return std::find_if(_donorShards.begin(),
_donorShards.end(),
[&](const DonorShardFetchTimestamp& donor) {
@@ -750,8 +683,7 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_updateC
.thenRunOn(**executor)
.then([this] {
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
-
- auto shardId = ShardingState::get(opCtx.get())->shardId();
+ auto shardId = _externalState->myShardId(opCtx->getServiceContext());
BSONObjBuilder updateBuilder;
{
@@ -763,16 +695,10 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::_updateC
}
}
- uassertStatusOK(
- Grid::get(opCtx.get())
- ->catalogClient()
- ->updateConfigDocument(
- opCtx.get(),
- NamespaceString::kConfigReshardingOperationsNamespace,
- _makeQueryForCoordinatorUpdate(shardId, _recipientCtx.getState()),
- updateBuilder.done(),
- false /* upsert */,
- ShardingCatalogClient::kMajorityWriteConcern));
+ _externalState->updateCoordinatorDocument(
+ opCtx.get(),
+ _makeQueryForCoordinatorUpdate(shardId, _recipientCtx.getState()),
+ updateBuilder.done());
});
}
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index e0c9fae5c97..c4a4d57df0d 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -39,23 +39,7 @@
namespace mongo {
-namespace resharding {
-
-/**
- * Creates the temporary resharding collection locally by loading the collection options and
- * collection indexes from the original collection's primary and MinKey owning chunk shards,
- * respectively.
- */
-void createTemporaryReshardingCollectionLocally(OperationContext* opCtx,
- const NamespaceString& originalNss,
- const NamespaceString& reshardingNss,
- const UUID& reshardingUUID,
- const UUID& existingUUID,
- Timestamp fetchTimestamp);
-
-} // namespace resharding
-
-class ReshardingRecipientService final : public repl::PrimaryOnlyService {
+class ReshardingRecipientService : public repl::PrimaryOnlyService {
public:
static constexpr StringData kServiceName = "ReshardingRecipientService"_sd;
@@ -65,6 +49,8 @@ public:
class RecipientStateMachine;
+ class RecipientStateMachineExternalState;
+
StringData getServiceName() const override {
return kServiceName;
}
@@ -107,9 +93,11 @@ public:
}
};
- explicit RecipientStateMachine(const ReshardingRecipientService* recipientService,
- const BSONObj& recipientDoc,
- ReshardingDataReplicationFactory dataReplicationFactory);
+ explicit RecipientStateMachine(
+ const ReshardingRecipientService* recipientService,
+ const ReshardingRecipientDocument& recipientDoc,
+ std::unique_ptr<RecipientStateMachineExternalState> externalState,
+ ReshardingDataReplicationFactory dataReplicationFactory);
~RecipientStateMachine();
@@ -137,10 +125,6 @@ public:
const ReshardingRecipientDocument& recipientDoc);
private:
- RecipientStateMachine(const ReshardingRecipientService* recipientService,
- const ReshardingRecipientDocument& recipientDoc,
- ReshardingDataReplicationFactory dataReplicationFactory);
-
// The following functions correspond to the actions to take at a particular recipient state.
ExecutorFuture<void> _awaitAllDonorsPreparedToDonateThenTransitionToCreatingCollection(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
@@ -221,6 +205,8 @@ private:
std::vector<DonorShardFetchTimestamp> _donorShards;
boost::optional<Timestamp> _cloneTimestamp;
+ const std::unique_ptr<RecipientStateMachineExternalState> _externalState;
+
// ThreadPool used by CancelableOperationContext.
// CancelableOperationContext must have a thread that is always available to it to mark its
// opCtx as killed when the cancelToken has been cancelled.
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_external_state.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_external_state.cpp
new file mode 100644
index 00000000000..f160955a050
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_recipient_service_external_state.cpp
@@ -0,0 +1,165 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kResharding
+
+#include "mongo/db/s/resharding/resharding_recipient_service_external_state.h"
+
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/logv2/log.h"
+#include "mongo/s/catalog/sharding_catalog_client.h"
+#include "mongo/s/catalog_cache.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/resharding/common_types_gen.h"
+#include "mongo/s/stale_shard_version_helpers.h"
+
+namespace mongo {
+
+void ReshardingRecipientService::RecipientStateMachineExternalState::
+ ensureTempReshardingCollectionExistsWithIndexes(OperationContext* opCtx,
+ const CommonReshardingMetadata& metadata,
+ Timestamp cloneTimestamp) {
+ LOGV2_DEBUG(5002300,
+ 1,
+ "Creating temporary resharding collection",
+ "sourceNamespace"_attr = metadata.getSourceNss());
+
+ // The CatalogCache may have a collection entry from before the source collection was sharded.
+ // We force a refresh to have this shard realize the collection is now sharded.
+ refreshCatalogCache(opCtx, metadata.getSourceNss());
+
+ auto [collOptions, unusedUUID] = getCollectionOptions(
+ opCtx,
+ metadata.getSourceNss(),
+ metadata.getSourceUUID(),
+ cloneTimestamp,
+ "loading collection options to create temporary resharding collection"_sd);
+
+ auto [indexes, idIndex] =
+ getCollectionIndexes(opCtx,
+ metadata.getSourceNss(),
+ metadata.getSourceUUID(),
+ cloneTimestamp,
+ "loading indexes to create temporary resharding collection"_sd);
+
+ // Set the temporary resharding collection's UUID to the resharding UUID. Note that
+ // BSONObj::addFields() replaces any fields that already exist.
+ collOptions = collOptions.addFields(BSON("uuid" << metadata.getReshardingUUID()));
+ MigrationDestinationManager::cloneCollectionIndexesAndOptions(
+ opCtx,
+ metadata.getTempReshardingNss(),
+ CollectionOptionsAndIndexes{metadata.getReshardingUUID(),
+ std::move(indexes),
+ std::move(idIndex),
+ std::move(collOptions)});
+}
+
+template <typename Callable>
+auto RecipientStateMachineExternalStateImpl::_withShardVersionRetry(OperationContext* opCtx,
+ const NamespaceString& nss,
+ StringData reason,
+ Callable&& callable) {
+ auto catalogCache = Grid::get(opCtx)->catalogCache();
+ return shardVersionRetry(opCtx, catalogCache, nss, reason, std::move(callable));
+}
+
+ShardId RecipientStateMachineExternalStateImpl::myShardId(ServiceContext* serviceContext) const {
+ return ShardingState::get(serviceContext)->shardId();
+}
+
+void RecipientStateMachineExternalStateImpl::refreshCatalogCache(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ auto catalogCache = Grid::get(opCtx)->catalogCache();
+ uassertStatusOK(catalogCache->getShardedCollectionRoutingInfoWithRefresh(opCtx, nss));
+}
+
+ChunkManager RecipientStateMachineExternalStateImpl::getShardedCollectionRoutingInfo(
+ OperationContext* opCtx, const NamespaceString& nss) {
+ auto catalogCache = Grid::get(opCtx)->catalogCache();
+ return catalogCache->getShardedCollectionRoutingInfo(opCtx, nss);
+}
+
+MigrationDestinationManager::CollectionOptionsAndUUID
+RecipientStateMachineExternalStateImpl::getCollectionOptions(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const CollectionUUID& uuid,
+ Timestamp afterClusterTime,
+ StringData reason) {
+ // Load the collection options from the primary shard for the database.
+ return _withShardVersionRetry(opCtx, nss, reason, [&] {
+ auto cm = getShardedCollectionRoutingInfo(opCtx, nss);
+ return MigrationDestinationManager::getCollectionOptions(
+ opCtx,
+ NamespaceStringOrUUID{nss.db().toString(), uuid},
+ cm.dbPrimary(),
+ cm,
+ afterClusterTime);
+ });
+}
+
+MigrationDestinationManager::IndexesAndIdIndex
+RecipientStateMachineExternalStateImpl::getCollectionIndexes(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const CollectionUUID& uuid,
+ Timestamp afterClusterTime,
+ StringData reason) {
+ // Load the list of indexes from the shard which owns the global minimum chunk.
+ return _withShardVersionRetry(opCtx, nss, reason, [&] {
+ auto cm = getShardedCollectionRoutingInfo(opCtx, nss);
+ return MigrationDestinationManager::getCollectionIndexes(
+ opCtx,
+ NamespaceStringOrUUID{nss.db().toString(), uuid},
+ cm.getMinKeyShardIdWithSimpleCollation(),
+ cm,
+ afterClusterTime);
+ });
+}
+
+void RecipientStateMachineExternalStateImpl::withShardVersionRetry(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ StringData reason,
+ unique_function<void()> callback) {
+ _withShardVersionRetry(opCtx, nss, reason, std::move(callback));
+}
+
+void RecipientStateMachineExternalStateImpl::updateCoordinatorDocument(OperationContext* opCtx,
+ const BSONObj& query,
+ const BSONObj& update) {
+ auto catalogClient = Grid::get(opCtx)->catalogClient();
+ uassertStatusOK(
+ catalogClient->updateConfigDocument(opCtx,
+ NamespaceString::kConfigReshardingOperationsNamespace,
+ query,
+ update,
+ false, /* upsert */
+ ShardingCatalogClient::kMajorityWriteConcern));
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_external_state.h b/src/mongo/db/s/resharding/resharding_recipient_service_external_state.h
new file mode 100644
index 00000000000..7a414229125
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_recipient_service_external_state.h
@@ -0,0 +1,146 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/util/functional.h"
+
+#include "mongo/base/string_data.h"
+#include "mongo/bson/timestamp.h"
+#include "mongo/db/catalog/collection_options.h"
+#include "mongo/db/s/migration_destination_manager.h"
+#include "mongo/db/s/resharding/resharding_recipient_service.h"
+#include "mongo/s/chunk_manager.h"
+#include "mongo/s/shard_id.h"
+
+namespace mongo {
+
+class BSONObj;
+class CommonReshardingMetadata;
+class NamespaceString;
+class OperationContext;
+class ServiceContext;
+
+/**
+ * Represents the interface that RecipientStateMachine uses to interact with the rest of the
+ * sharding codebase.
+ *
+ * In particular, RecipientStateMachine must not directly use Grid, ShardingState, or
+ * ShardingCatalogClient. RecipientStateMachine must instead access those types through the
+ * RecipientStateMachineExternalState interface. Having it behind an interface makes it more
+ * straightforward to unit test RecipientStateMachine.
+ */
+class ReshardingRecipientService::RecipientStateMachineExternalState {
+public:
+ virtual ~RecipientStateMachineExternalState() = default;
+
+ virtual ShardId myShardId(ServiceContext* serviceContext) const = 0;
+
+ virtual void refreshCatalogCache(OperationContext* opCtx, const NamespaceString& nss) = 0;
+
+ virtual ChunkManager getShardedCollectionRoutingInfo(OperationContext* opCtx,
+ const NamespaceString& nss) = 0;
+
+ virtual MigrationDestinationManager::CollectionOptionsAndUUID getCollectionOptions(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const CollectionUUID& uuid,
+ Timestamp afterClusterTime,
+ StringData reason) = 0;
+
+ virtual MigrationDestinationManager::IndexesAndIdIndex getCollectionIndexes(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const CollectionUUID& uuid,
+ Timestamp afterClusterTime,
+ StringData reason) = 0;
+
+ virtual void withShardVersionRetry(OperationContext* opCtx,
+ const NamespaceString& nss,
+ StringData reason,
+ unique_function<void()> callback) = 0;
+
+ virtual void updateCoordinatorDocument(OperationContext* opCtx,
+ const BSONObj& query,
+ const BSONObj& update) = 0;
+
+ /**
+ * Creates the temporary resharding collection locally.
+ *
+ * The collection options are taken from the primary shard for the source database and the
+ * collection indexes are taken from the shard which owns the global minimum chunk.
+ *
+ * This function won't automatically create an index on the new shard key pattern.
+ */
+ void ensureTempReshardingCollectionExistsWithIndexes(OperationContext* opCtx,
+ const CommonReshardingMetadata& metadata,
+ Timestamp cloneTimestamp);
+};
+
+class RecipientStateMachineExternalStateImpl
+ : public ReshardingRecipientService::RecipientStateMachineExternalState {
+public:
+ ShardId myShardId(ServiceContext* serviceContext) const override;
+
+ void refreshCatalogCache(OperationContext* opCtx, const NamespaceString& nss) override;
+
+ ChunkManager getShardedCollectionRoutingInfo(OperationContext* opCtx,
+ const NamespaceString& nss) override;
+
+ MigrationDestinationManager::CollectionOptionsAndUUID getCollectionOptions(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const CollectionUUID& uuid,
+ Timestamp afterClusterTime,
+ StringData reason) override;
+
+ MigrationDestinationManager::IndexesAndIdIndex getCollectionIndexes(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const CollectionUUID& uuid,
+ Timestamp afterClusterTime,
+ StringData reason) override;
+
+ void withShardVersionRetry(OperationContext* opCtx,
+ const NamespaceString& nss,
+ StringData reason,
+ unique_function<void()> callback) override;
+
+ void updateCoordinatorDocument(OperationContext* opCtx,
+ const BSONObj& query,
+ const BSONObj& update) override;
+
+private:
+ template <typename Callable>
+ auto _withShardVersionRetry(OperationContext* opCtx,
+ const NamespaceString& nss,
+ StringData reason,
+ Callable&& callable);
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp b/src/mongo/db/s/resharding/resharding_recipient_service_external_state_test.cpp
index 8d118974d5b..eada2183685 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service_external_state_test.cpp
@@ -39,9 +39,8 @@
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/storage_interface_impl.h"
-#include "mongo/db/s/migration_destination_manager.h"
#include "mongo/db/s/resharding/resharding_oplog_applier_progress_gen.h"
-#include "mongo/db/s/resharding/resharding_recipient_service.h"
+#include "mongo/db/s/resharding/resharding_recipient_service_external_state.h"
#include "mongo/db/s/resharding_util.h"
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/db/session_catalog_mongod.h"
@@ -53,8 +52,8 @@
namespace mongo {
namespace {
-class ReshardingRecipientServiceTest : public CatalogCacheTestFixture,
- public ServiceContextMongoDTest {
+class RecipientServiceExternalStateTest : public CatalogCacheTestFixture,
+ public ServiceContextMongoDTest {
public:
const ShardKeyPattern kShardKey = ShardKeyPattern(BSON("oldKey" << 1));
const OID kOrigEpoch = OID::gen();
@@ -66,6 +65,8 @@ public:
const NamespaceString kReshardingNss = NamespaceString(
str::stream() << "db." << NamespaceString::kTemporaryReshardingCollectionPrefix
<< kOrigUUID);
+ const CommonReshardingMetadata kMetadata{
+ kReshardingUUID, kOrigNss, kOrigUUID, kReshardingNss, kReshardingKey.getKeyPattern()};
const Timestamp kDefaultFetchTimestamp = Timestamp(200, 1);
void setUp() override {
@@ -244,7 +245,7 @@ public:
}
};
-TEST_F(ReshardingRecipientServiceTest, CreateLocalReshardingCollectionBasic) {
+TEST_F(RecipientServiceExternalStateTest, CreateLocalReshardingCollectionBasic) {
auto shards = setupNShards(2);
// Shard kOrigNss by _id with chunks [minKey, 0), [0, maxKey] on shards "0" and "1"
@@ -282,19 +283,16 @@ TEST_F(ReshardingRecipientServiceTest, CreateLocalReshardingCollectionBasic) {
expectListIndexes(kOrigNss, kOrigUUID, indexes, HostAndPort(shards[0].getHost()));
});
- resharding::createTemporaryReshardingCollectionLocally(operationContext(),
- kOrigNss,
- kReshardingNss,
- kReshardingUUID,
- kOrigUUID,
- kDefaultFetchTimestamp);
+ RecipientStateMachineExternalStateImpl externalState;
+ externalState.ensureTempReshardingCollectionExistsWithIndexes(
+ operationContext(), kMetadata, kDefaultFetchTimestamp);
future.default_timed_get();
verifyCollectionAndIndexes(kReshardingNss, kReshardingUUID, indexes);
}
-TEST_F(ReshardingRecipientServiceTest,
+TEST_F(RecipientServiceExternalStateTest,
CreatingLocalReshardingCollectionRetriesOnStaleVersionErrors) {
auto shards = setupNShards(2);
@@ -324,7 +322,6 @@ TEST_F(ReshardingRecipientServiceTest,
expectRefreshReturnForOriginalColl(kOrigNss, kShardKey, kOrigUUID, kOrigEpoch);
expectStaleDbVersionError(kOrigNss, "listCollections");
expectGetDatabase(kOrigNss, shards[1].getHost());
- expectRefreshReturnForOriginalColl(kOrigNss, kShardKey, kOrigUUID, kOrigEpoch);
expectListCollections(
kOrigNss,
kOrigUUID,
@@ -335,22 +332,20 @@ TEST_F(ReshardingRecipientServiceTest,
HostAndPort(shards[1].getHost()));
expectStaleEpochError(kOrigNss, "listIndexes");
+ expectRefreshReturnForOriginalColl(kOrigNss, kShardKey, kOrigUUID, kOrigEpoch);
expectListIndexes(kOrigNss, kOrigUUID, indexes, HostAndPort(shards[0].getHost()));
});
- resharding::createTemporaryReshardingCollectionLocally(operationContext(),
- kOrigNss,
- kReshardingNss,
- kReshardingUUID,
- kOrigUUID,
- kDefaultFetchTimestamp);
+ RecipientStateMachineExternalStateImpl externalState;
+ externalState.ensureTempReshardingCollectionExistsWithIndexes(
+ operationContext(), kMetadata, kDefaultFetchTimestamp);
future.default_timed_get();
verifyCollectionAndIndexes(kReshardingNss, kReshardingUUID, indexes);
}
-TEST_F(ReshardingRecipientServiceTest,
+TEST_F(RecipientServiceExternalStateTest,
CreateLocalReshardingCollectionCollectionAlreadyExistsWithNoIndexes) {
auto shards = setupNShards(2);
@@ -404,19 +399,16 @@ TEST_F(ReshardingRecipientServiceTest,
expectListIndexes(kOrigNss, kOrigUUID, indexes, HostAndPort(shards[0].getHost()));
});
- resharding::createTemporaryReshardingCollectionLocally(operationContext(),
- kOrigNss,
- kReshardingNss,
- kReshardingUUID,
- kOrigUUID,
- kDefaultFetchTimestamp);
+ RecipientStateMachineExternalStateImpl externalState;
+ externalState.ensureTempReshardingCollectionExistsWithIndexes(
+ operationContext(), kMetadata, kDefaultFetchTimestamp);
future.default_timed_get();
verifyCollectionAndIndexes(kReshardingNss, kReshardingUUID, indexes);
}
-TEST_F(ReshardingRecipientServiceTest,
+TEST_F(RecipientServiceExternalStateTest,
CreateLocalReshardingCollectionCollectionAlreadyExistsWithSomeIndexes) {
auto shards = setupNShards(2);
@@ -472,19 +464,16 @@ TEST_F(ReshardingRecipientServiceTest,
expectListIndexes(kOrigNss, kOrigUUID, indexes, HostAndPort(shards[0].getHost()));
});
- resharding::createTemporaryReshardingCollectionLocally(operationContext(),
- kOrigNss,
- kReshardingNss,
- kReshardingUUID,
- kOrigUUID,
- kDefaultFetchTimestamp);
+ RecipientStateMachineExternalStateImpl externalState;
+ externalState.ensureTempReshardingCollectionExistsWithIndexes(
+ operationContext(), kMetadata, kDefaultFetchTimestamp);
future.default_timed_get();
verifyCollectionAndIndexes(kReshardingNss, kReshardingUUID, indexes);
}
-TEST_F(ReshardingRecipientServiceTest,
+TEST_F(RecipientServiceExternalStateTest,
CreateLocalReshardingCollectionCollectionAlreadyExistsWithAllIndexes) {
auto shards = setupNShards(2);
@@ -530,12 +519,9 @@ TEST_F(ReshardingRecipientServiceTest,
expectListIndexes(kOrigNss, kOrigUUID, indexes, HostAndPort(shards[0].getHost()));
});
- resharding::createTemporaryReshardingCollectionLocally(operationContext(),
- kOrigNss,
- kReshardingNss,
- kReshardingUUID,
- kOrigUUID,
- kDefaultFetchTimestamp);
+ RecipientStateMachineExternalStateImpl externalState;
+ externalState.ensureTempReshardingCollectionExistsWithIndexes(
+ operationContext(), kMetadata, kDefaultFetchTimestamp);
future.default_timed_get();