summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBlake Oler <blake.oler@mongodb.com>2020-09-29 13:02:29 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-29 13:25:30 +0000
commit5bb70750163b5b78fbcf8a84cf0ed4352cd73a25 (patch)
tree4aab818990fb3f7d1bcd3fff7071c62ca07ea3d9
parentf1cbeb3ebf1e112339a3c8da47bd5a553b80b23d (diff)
downloadmongo-5bb70750163b5b78fbcf8a84cf0ed4352cd73a25.tar.gz
SERVER-50801 Rearrange resharding PrimaryOnlyService instances to be forward-declared nested classes inside corresponding service classes
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp5
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.h293
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.cpp31
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.h5
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp54
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h4
6 files changed, 207 insertions, 185 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 27a4552ae79..4fde7935a0f 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -488,6 +488,11 @@ void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx,
}
} // namespace resharding
+std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingCoordinatorService::constructInstance(
+ BSONObj initialState) const {
+ return std::make_shared<ReshardingCoordinator>(std::move(initialState));
+}
+
ReshardingCoordinatorService::ReshardingCoordinator::ReshardingCoordinator(const BSONObj& state)
: PrimaryOnlyService::TypedInstance<ReshardingCoordinator>(),
_id(state["_id"].wrap().getOwned()),
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h
index 78f7cb88197..df5c0aa4fd0 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h
@@ -78,6 +78,8 @@ public:
: PrimaryOnlyService(serviceContext) {}
~ReshardingCoordinatorService() = default;
+ class ReshardingCoordinator;
+
StringData getServiceName() const override {
return kReshardingCoordinatorServiceName;
}
@@ -90,156 +92,153 @@ public:
return ThreadPool::Limits();
}
std::shared_ptr<PrimaryOnlyService::Instance> constructInstance(
- BSONObj initialState) const override {
- return std::make_shared<ReshardingCoordinatorService::ReshardingCoordinator>(
- std::move(initialState));
+ BSONObj initialState) const override;
+};
+
+class ReshardingCoordinatorService::ReshardingCoordinator final
+ : public PrimaryOnlyService::TypedInstance<ReshardingCoordinator> {
+public:
+ explicit ReshardingCoordinator(const BSONObj& state);
+ ~ReshardingCoordinator();
+
+ void run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept override;
+
+ void interrupt(Status status) override;
+
+ /**
+ * Returns a Future that will be resolved when all work associated with this Instance has
+ * completed running.
+ */
+ SharedSemiFuture<void> getCompletionFuture() const {
+ return _completionPromise.getFuture();
+ }
+
+ /**
+ * TODO(SERVER-50976) Report ReshardingCoordinators in currentOp().
+ */
+ boost::optional<BSONObj> reportForCurrentOp(
+ MongoProcessInterface::CurrentOpConnectionsMode connMode,
+ MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept override {
+ return boost::none;
}
- class ReshardingCoordinator final
- : public PrimaryOnlyService::TypedInstance<ReshardingCoordinator> {
- public:
- explicit ReshardingCoordinator(const BSONObj& state);
- ~ReshardingCoordinator();
-
- void run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept override;
-
- void interrupt(Status status) override;
-
- /**
- * Returns a Future that will be resolved when all work associated with this Instance has
- * completed running.
- */
- SharedSemiFuture<void> getCompletionFuture() const {
- return _completionPromise.getFuture();
- }
-
- /**
- * TODO(SERVER-50976) Report ReshardingCoordinators in currentOp().
- */
- boost::optional<BSONObj> reportForCurrentOp(
- MongoProcessInterface::CurrentOpConnectionsMode connMode,
- MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept override {
- return boost::none;
- }
-
- void setInitialChunksAndZones(std::vector<ChunkType> initialChunks,
- std::vector<TagsType> newZones);
-
- std::shared_ptr<ReshardingCoordinatorObserver> getObserver();
-
- private:
- struct ChunksAndZones {
- std::vector<ChunkType> initialChunks;
- std::vector<TagsType> newZones;
- };
-
- /**
- * Does the following writes:
- * 1. Inserts coordinator state document into config.reshardingOperations
- * 2. Adds reshardingFields to the config.collections entry for the original collection
- * 3. Inserts an entry into config.collections for the temporary collection
- * 4. Inserts entries into config.chunks for ranges based on the new shard key
- * 5. Upserts entries into config.tags for any zones associated with the new shard key
- *
- * Transitions to 'kInitialized'.
- */
- ExecutorFuture<void> _init(const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
-
- /**
- * Waits on _reshardingCoordinatorObserver to notify that all recipients have created the
- * temporary collection. Transitions to 'kPreparingToDonate'.
- */
- ExecutorFuture<void> _awaitAllRecipientsCreatedCollection(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
-
- /**
- * Waits on _reshardingCoordinatorObserver to notify that all donors have picked a
- * minFetchTimestamp and are ready to donate. Transitions to 'kCloning'.
- */
- ExecutorFuture<void> _awaitAllDonorsReadyToDonate(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
-
- /**
- * Waits on _reshardingCoordinatorObserver to notify that all recipients have finished
- * cloning. Transitions to 'kMirroring'.
- */
- ExecutorFuture<void> _awaitAllRecipientsFinishedCloning(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
-
- /**
- * Waits on _reshardingCoordinatorObserver to notify that all recipients have entered
- * strict-consistency.
- */
- SharedSemiFuture<ReshardingCoordinatorDocument> _awaitAllRecipientsInStrictConsistency(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
-
- /**
- * Does the following writes:
- * 1. Updates the config.collections entry for the new sharded collection
- * 2. Updates config.chunks entries for the new sharded collection
- * 3. Updates config.tags for the new sharded collection
- *
- * Transitions to 'kCommitted'.
- */
- Future<void> _commit(const ReshardingCoordinatorDocument& updatedDoc);
-
- /**
- * Waits on _reshardingCoordinatorObserver to notify that all recipients have renamed the
- * temporary collection to the original collection namespace. Transitions to 'kDropping'.
- */
- ExecutorFuture<void> _awaitAllRecipientsRenamedCollection(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
-
- /**
- * Waits on _reshardingCoordinatorObserver to notify that all donors have dropped the
- * original collection. Transitions to 'kDone'.
- */
- ExecutorFuture<void> _awaitAllDonorsDroppedOriginalCollection(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
-
- /**
- * Updates the entry for this resharding operation in config.reshardingOperations and the
- * catalog entries for the original and temporary namespaces in config.collections.
- */
- void _runUpdates(CoordinatorStateEnum nextState,
- ReshardingCoordinatorDocument updatedStateDoc,
- boost::optional<Timestamp> fetchTimestamp = boost::none);
-
- /**
- * Sends 'flushRoutingTableCacheUpdates' for the temporary namespace to all recipient
- * shards.
- */
- void _tellAllRecipientsToRefresh();
-
- /**
- * Sends 'flushRoutingTableCacheUpdates' for the original namespace to all donor shards.
- */
- void _tellAllDonorsToRefresh();
-
- // The unique key for a given resharding operation. InstanceID is an alias for BSONObj. The
- // value of this is the UUID that will be used as the collection UUID for the new sharded
- // collection. The object looks like: {_id: 'reshardingUUID'}
- const InstanceID _id;
-
- // Observes writes that indicate state changes for this resharding operation and notifies
- // 'this' when all donors/recipients have entered some state so that 'this' can transition
- // states.
- std::shared_ptr<ReshardingCoordinatorObserver> _reshardingCoordinatorObserver;
-
- // The updated coordinator state document.
- ReshardingCoordinatorDocument _coordinatorDoc;
-
- // Protects promises below.
- mutable Mutex _mutex = MONGO_MAKE_LATCH("ReshardingCoordinatorService::_mutex");
-
- // Promise containing the initial chunks and new zones based on the new shard key. These are
- // not a part of the state document, so must be set by configsvrReshardCollection after
- // construction.
- SharedPromise<ChunksAndZones> _initialChunksAndZonesPromise;
-
- // Promise that is resolved when the chain of work kicked off by run() has completed.
- SharedPromise<void> _completionPromise;
+ void setInitialChunksAndZones(std::vector<ChunkType> initialChunks,
+ std::vector<TagsType> newZones);
+
+ std::shared_ptr<ReshardingCoordinatorObserver> getObserver();
+
+private:
+ struct ChunksAndZones {
+ std::vector<ChunkType> initialChunks;
+ std::vector<TagsType> newZones;
};
+
+ /**
+ * Does the following writes:
+ * 1. Inserts coordinator state document into config.reshardingOperations
+ * 2. Adds reshardingFields to the config.collections entry for the original collection
+ * 3. Inserts an entry into config.collections for the temporary collection
+ * 4. Inserts entries into config.chunks for ranges based on the new shard key
+ * 5. Upserts entries into config.tags for any zones associated with the new shard key
+ *
+ * Transitions to 'kInitialized'.
+ */
+ ExecutorFuture<void> _init(const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+
+ /**
+ * Waits on _reshardingCoordinatorObserver to notify that all recipients have created the
+ * temporary collection. Transitions to 'kPreparingToDonate'.
+ */
+ ExecutorFuture<void> _awaitAllRecipientsCreatedCollection(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+
+ /**
+ * Waits on _reshardingCoordinatorObserver to notify that all donors have picked a
+ * minFetchTimestamp and are ready to donate. Transitions to 'kCloning'.
+ */
+ ExecutorFuture<void> _awaitAllDonorsReadyToDonate(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+
+ /**
+ * Waits on _reshardingCoordinatorObserver to notify that all recipients have finished
+ * cloning. Transitions to 'kMirroring'.
+ */
+ ExecutorFuture<void> _awaitAllRecipientsFinishedCloning(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+
+ /**
+ * Waits on _reshardingCoordinatorObserver to notify that all recipients have entered
+ * strict-consistency.
+ */
+ SharedSemiFuture<ReshardingCoordinatorDocument> _awaitAllRecipientsInStrictConsistency(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+
+ /**
+ * Does the following writes:
+ * 1. Updates the config.collections entry for the new sharded collection
+ * 2. Updates config.chunks entries for the new sharded collection
+ * 3. Updates config.tags for the new sharded collection
+ *
+ * Transitions to 'kCommitted'.
+ */
+ Future<void> _commit(const ReshardingCoordinatorDocument& updatedDoc);
+
+ /**
+ * Waits on _reshardingCoordinatorObserver to notify that all recipients have renamed the
+ * temporary collection to the original collection namespace. Transitions to 'kDropping'.
+ */
+ ExecutorFuture<void> _awaitAllRecipientsRenamedCollection(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+
+ /**
+ * Waits on _reshardingCoordinatorObserver to notify that all donors have dropped the
+ * original collection. Transitions to 'kDone'.
+ */
+ ExecutorFuture<void> _awaitAllDonorsDroppedOriginalCollection(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor);
+
+ /**
+ * Updates the entry for this resharding operation in config.reshardingOperations and the
+ * catalog entries for the original and temporary namespaces in config.collections.
+ */
+ void _runUpdates(CoordinatorStateEnum nextState,
+ ReshardingCoordinatorDocument updatedStateDoc,
+ boost::optional<Timestamp> fetchTimestamp = boost::none);
+
+ /**
+ * Sends 'flushRoutingTableCacheUpdates' for the temporary namespace to all recipient
+ * shards.
+ */
+ void _tellAllRecipientsToRefresh();
+
+ /**
+ * Sends 'flushRoutingTableCacheUpdates' for the original namespace to all donor shards.
+ */
+ void _tellAllDonorsToRefresh();
+
+ // The unique key for a given resharding operation. InstanceID is an alias for BSONObj. The
+ // value of this is the UUID that will be used as the collection UUID for the new sharded
+ // collection. The object looks like: {_id: 'reshardingUUID'}
+ const InstanceID _id;
+
+ // Observes writes that indicate state changes for this resharding operation and notifies
+ // 'this' when all donors/recipients have entered some state so that 'this' can transition
+ // states.
+ std::shared_ptr<ReshardingCoordinatorObserver> _reshardingCoordinatorObserver;
+
+ // The updated coordinator state document.
+ ReshardingCoordinatorDocument _coordinatorDoc;
+
+ // Protects promises below.
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("ReshardingCoordinatorService::_mutex");
+
+ // Promise containing the initial chunks and new zones based on the new shard key. These are
+ // not a part of the state document, so must be set by configsvrReshardCollection after
+ // construction.
+ SharedPromise<ChunksAndZones> _initialChunksAndZonesPromise;
+
+ // Promise that is resolved when the chain of work kicked off by run() has completed.
+ SharedPromise<void> _completionPromise;
};
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp
index 76590d50bb0..3a8acf65dcd 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp
@@ -86,20 +86,21 @@ std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingDonorService::cons
return std::make_shared<DonorStateMachine>(std::move(initialState));
}
-DonorStateMachine::DonorStateMachine(const BSONObj& donorDoc)
+ReshardingDonorService::DonorStateMachine::DonorStateMachine(const BSONObj& donorDoc)
: repl::PrimaryOnlyService::TypedInstance<DonorStateMachine>(),
_donorDoc(ReshardingDonorDocument::parse(IDLParserErrorContext("ReshardingDonorDocument"),
donorDoc)),
_id(_donorDoc.getCommonReshardingMetadata().get_id()) {}
-DonorStateMachine::~DonorStateMachine() {
+ReshardingDonorService::DonorStateMachine::~DonorStateMachine() {
stdx::lock_guard<Latch> lg(_mutex);
invariant(_allRecipientsDoneApplying.getFuture().isReady());
invariant(_coordinatorHasCommitted.getFuture().isReady());
invariant(_completionPromise.getFuture().isReady());
}
-void DonorStateMachine::run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept {
+void ReshardingDonorService::DonorStateMachine::run(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept {
ExecutorFuture<void>(**executor)
.then([this] { _transitionState(DonorStateEnum::kPreparingToDonate); })
.then([this] { _onPreparingToDonateCalculateMinFetchTimestampThenBeginDonating(); })
@@ -136,7 +137,7 @@ void DonorStateMachine::run(std::shared_ptr<executor::ScopedTaskExecutor> execut
});
}
-void DonorStateMachine::interrupt(Status status) {
+void ReshardingDonorService::DonorStateMachine::interrupt(Status status) {
// Resolve any unresolved promises to avoid hanging.
stdx::lock_guard<Latch> lg(_mutex);
if (!_allRecipientsDoneApplying.getFuture().isReady()) {
@@ -152,10 +153,11 @@ void DonorStateMachine::interrupt(Status status) {
}
}
-void DonorStateMachine::onReshardingFieldsChanges(
+void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges(
boost::optional<TypeCollectionReshardingFields> reshardingFields) {}
-void DonorStateMachine::_onPreparingToDonateCalculateMinFetchTimestampThenBeginDonating() {
+void ReshardingDonorService::DonorStateMachine::
+ _onPreparingToDonateCalculateMinFetchTimestampThenBeginDonating() {
if (_donorDoc.getState() > DonorStateEnum::kPreparingToDonate) {
invariant(_donorDoc.getMinFetchTimestamp());
return;
@@ -165,7 +167,8 @@ void DonorStateMachine::_onPreparingToDonateCalculateMinFetchTimestampThenBeginD
_transitionState(DonorStateEnum::kDonating, minFetchTimestamp);
}
-ExecutorFuture<void> DonorStateMachine::_awaitAllRecipientsDoneApplyingThenStartMirroring(
+ExecutorFuture<void>
+ReshardingDonorService::DonorStateMachine::_awaitAllRecipientsDoneApplyingThenStartMirroring(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
if (_donorDoc.getState() > DonorStateEnum::kDonating) {
return ExecutorFuture<void>(**executor, Status::OK());
@@ -176,7 +179,8 @@ ExecutorFuture<void> DonorStateMachine::_awaitAllRecipientsDoneApplyingThenStart
});
}
-ExecutorFuture<void> DonorStateMachine::_awaitCoordinatorHasCommittedThenTransitionToDropping(
+ExecutorFuture<void>
+ReshardingDonorService::DonorStateMachine::_awaitCoordinatorHasCommittedThenTransitionToDropping(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
if (_donorDoc.getState() > DonorStateEnum::kMirroring) {
return ExecutorFuture<void>(**executor, Status::OK());
@@ -187,7 +191,7 @@ ExecutorFuture<void> DonorStateMachine::_awaitCoordinatorHasCommittedThenTransit
});
}
-void DonorStateMachine::_dropOriginalCollectionThenDeleteLocalState() {
+void ReshardingDonorService::DonorStateMachine::_dropOriginalCollectionThenDeleteLocalState() {
if (_donorDoc.getState() > DonorStateEnum::kDropping) {
return;
}
@@ -195,8 +199,8 @@ void DonorStateMachine::_dropOriginalCollectionThenDeleteLocalState() {
_transitionState(DonorStateEnum::kDone);
}
-void DonorStateMachine::_transitionState(DonorStateEnum endState,
- boost::optional<Timestamp> minFetchTimestamp) {
+void ReshardingDonorService::DonorStateMachine::_transitionState(
+ DonorStateEnum endState, boost::optional<Timestamp> minFetchTimestamp) {
ReshardingDonorDocument replacementDoc(_donorDoc);
replacementDoc.setState(endState);
if (minFetchTimestamp) {
@@ -211,13 +215,14 @@ void DonorStateMachine::_transitionState(DonorStateEnum endState,
_updateDonorDocument(std::move(replacementDoc));
}
-void DonorStateMachine::_transitionStateToError(const Status& status) {
+void ReshardingDonorService::DonorStateMachine::_transitionStateToError(const Status& status) {
ReshardingDonorDocument replacementDoc(_donorDoc);
replacementDoc.setState(DonorStateEnum::kError);
_updateDonorDocument(std::move(replacementDoc));
}
-void DonorStateMachine::_updateDonorDocument(ReshardingDonorDocument&& replacementDoc) {
+void ReshardingDonorService::DonorStateMachine::_updateDonorDocument(
+ ReshardingDonorDocument&& replacementDoc) {
auto opCtx = cc().makeOperationContext();
PersistentTaskStore<ReshardingDonorDocument> store(
NamespaceString::kDonorReshardingOperationsNamespace);
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h
index 09578ef2611..f5df94ae523 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.h
+++ b/src/mongo/db/s/resharding/resharding_donor_service.h
@@ -42,6 +42,8 @@ public:
: PrimaryOnlyService(serviceContext) {}
~ReshardingDonorService() = default;
+ class DonorStateMachine;
+
StringData getServiceName() const override {
return kReshardingDonorServiceName;
}
@@ -63,7 +65,8 @@ public:
* Represents the current state of a resharding donor operation on this shard. This class drives
* state transitions and updates to underlying on-disk metadata.
*/
-class DonorStateMachine final : public repl::PrimaryOnlyService::TypedInstance<DonorStateMachine> {
+class ReshardingDonorService::DonorStateMachine final
+ : public repl::PrimaryOnlyService::TypedInstance<DonorStateMachine> {
public:
explicit DonorStateMachine(const BSONObj& donorDoc);
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index b80775b6c46..73256ff0fab 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -42,13 +42,14 @@ std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingRecipientService::
return std::make_shared<RecipientStateMachine>(std::move(initialState));
}
-RecipientStateMachine::RecipientStateMachine(const BSONObj& recipientDoc)
+ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine(
+ const BSONObj& recipientDoc)
: repl::PrimaryOnlyService::TypedInstance<RecipientStateMachine>(),
_recipientDoc(ReshardingRecipientDocument::parse(
IDLParserErrorContext("ReshardingRecipientDocument"), recipientDoc)),
_id(_recipientDoc.getCommonReshardingMetadata().get_id()) {}
-RecipientStateMachine::~RecipientStateMachine() {
+ReshardingRecipientService::RecipientStateMachine::~RecipientStateMachine() {
stdx::lock_guard<Latch> lg(_mutex);
invariant(_allDonorsPreparedToDonate.getFuture().isReady());
invariant(_allDonorsMirroring.getFuture().isReady());
@@ -56,7 +57,8 @@ RecipientStateMachine::~RecipientStateMachine() {
invariant(_completionPromise.getFuture().isReady());
}
-void RecipientStateMachine::run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept {
+void ReshardingRecipientService::RecipientStateMachine::run(
+ std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept {
ExecutorFuture<void>(**executor)
.then([this] { _createTemporaryReshardingCollectionThenTransitionToInitialized(); })
.then([this, executor] {
@@ -98,7 +100,7 @@ void RecipientStateMachine::run(std::shared_ptr<executor::ScopedTaskExecutor> ex
});
}
-void RecipientStateMachine::interrupt(Status status) {
+void ReshardingRecipientService::RecipientStateMachine::interrupt(Status status) {
// Resolve any unresolved promises to avoid hanging.
stdx::lock_guard<Latch> lg(_mutex);
if (!_allDonorsPreparedToDonate.getFuture().isReady()) {
@@ -118,11 +120,11 @@ void RecipientStateMachine::interrupt(Status status) {
}
}
-void onReshardingFieldsChanges(boost::optional<TypeCollectionReshardingFields> reshardingFields) {}
+void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChanges(
+ boost::optional<TypeCollectionReshardingFields> reshardingFields) {}
-void onDonorReportsMirroring(const ShardId& donor) {}
-
-void RecipientStateMachine::_createTemporaryReshardingCollectionThenTransitionToInitialized() {
+void ReshardingRecipientService::RecipientStateMachine::
+ _createTemporaryReshardingCollectionThenTransitionToInitialized() {
if (_recipientDoc.getState() > RecipientStateEnum::kInitializing) {
return;
}
@@ -130,8 +132,9 @@ void RecipientStateMachine::_createTemporaryReshardingCollectionThenTransitionTo
_transitionState(RecipientStateEnum::kInitialized);
}
-ExecutorFuture<void> RecipientStateMachine::_awaitAllDonorsPreparedToDonateThenTransitionToCloning(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
+ _awaitAllDonorsPreparedToDonateThenTransitionToCloning(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
if (_recipientDoc.getState() > RecipientStateEnum::kInitialized) {
return ExecutorFuture<void>(**executor, Status::OK());
}
@@ -143,7 +146,7 @@ ExecutorFuture<void> RecipientStateMachine::_awaitAllDonorsPreparedToDonateThenT
});
}
-void RecipientStateMachine::_cloneThenTransitionToApplying() {
+void ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplying() {
if (_recipientDoc.getState() > RecipientStateEnum::kCloning) {
return;
}
@@ -151,7 +154,7 @@ void RecipientStateMachine::_cloneThenTransitionToApplying() {
_transitionState(RecipientStateEnum::kApplying);
}
-void RecipientStateMachine::_applyThenTransitionToSteadyState() {
+void ReshardingRecipientService::RecipientStateMachine::_applyThenTransitionToSteadyState() {
if (_recipientDoc.getState() > RecipientStateEnum::kApplying) {
return;
}
@@ -159,9 +162,9 @@ void RecipientStateMachine::_applyThenTransitionToSteadyState() {
_transitionState(RecipientStateEnum::kSteadyState);
}
-ExecutorFuture<void>
-RecipientStateMachine::_awaitAllDonorsMirroringThenTransitionToStrictConsistency(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
+ _awaitAllDonorsMirroringThenTransitionToStrictConsistency(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
if (_recipientDoc.getState() > RecipientStateEnum::kSteadyState) {
return ExecutorFuture<void>(**executor, Status::OK());
}
@@ -171,8 +174,9 @@ RecipientStateMachine::_awaitAllDonorsMirroringThenTransitionToStrictConsistency
});
}
-ExecutorFuture<void> RecipientStateMachine::_awaitCoordinatorHasCommittedThenTransitionToRenaming(
- const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
+ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
+ _awaitCoordinatorHasCommittedThenTransitionToRenaming(
+ const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
if (_recipientDoc.getState() > RecipientStateEnum::kStrictConsistency) {
return ExecutorFuture<void>(**executor, Status::OK());
}
@@ -182,7 +186,8 @@ ExecutorFuture<void> RecipientStateMachine::_awaitCoordinatorHasCommittedThenTra
});
}
-void RecipientStateMachine::_renameTemporaryReshardingCollectionThenDeleteLocalState() {
+void ReshardingRecipientService::RecipientStateMachine::
+ _renameTemporaryReshardingCollectionThenDeleteLocalState() {
if (_recipientDoc.getState() > RecipientStateEnum::kRenaming) {
return;
}
@@ -190,12 +195,13 @@ void RecipientStateMachine::_renameTemporaryReshardingCollectionThenDeleteLocalS
_transitionState(RecipientStateEnum::kDone);
}
-void RecipientStateMachine::_fulfillAllDonorsPreparedToDonate(Timestamp fetchTimestamp) {
+void ReshardingRecipientService::RecipientStateMachine::_fulfillAllDonorsPreparedToDonate(
+ Timestamp fetchTimestamp) {
_allDonorsPreparedToDonate.emplaceValue(fetchTimestamp);
}
-void RecipientStateMachine::_transitionState(RecipientStateEnum endState,
- boost::optional<Timestamp> fetchTimestamp) {
+void ReshardingRecipientService::RecipientStateMachine::_transitionState(
+ RecipientStateEnum endState, boost::optional<Timestamp> fetchTimestamp) {
ReshardingRecipientDocument replacementDoc(_recipientDoc);
replacementDoc.setState(endState);
if (fetchTimestamp) {
@@ -211,13 +217,15 @@ void RecipientStateMachine::_transitionState(RecipientStateEnum endState,
_updateRecipientDocument(std::move(replacementDoc));
}
-void RecipientStateMachine::_transitionStateToError(const Status& status) {
+void ReshardingRecipientService::RecipientStateMachine::_transitionStateToError(
+ const Status& status) {
ReshardingRecipientDocument replacementDoc(_recipientDoc);
replacementDoc.setState(RecipientStateEnum::kError);
_updateRecipientDocument(std::move(replacementDoc));
}
-void RecipientStateMachine::_updateRecipientDocument(ReshardingRecipientDocument&& replacementDoc) {
+void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument(
+ ReshardingRecipientDocument&& replacementDoc) {
auto opCtx = cc().makeOperationContext();
PersistentTaskStore<ReshardingRecipientDocument> store(
NamespaceString::kRecipientReshardingOperationsNamespace);
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index 8435e15071b..ec38bbbdde8 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -42,6 +42,8 @@ public:
: PrimaryOnlyService(serviceContext) {}
~ReshardingRecipientService() = default;
+ class RecipientStateMachine;
+
StringData getServiceName() const override {
return kReshardingRecipientServiceName;
}
@@ -63,7 +65,7 @@ public:
* Represents the current state of a resharding recipient operation on this shard. This class
* drives state transitions and updates to underlying on-disk metadata.
*/
-class RecipientStateMachine final
+class ReshardingRecipientService::RecipientStateMachine final
: public repl::PrimaryOnlyService::TypedInstance<RecipientStateMachine> {
public:
explicit RecipientStateMachine(const BSONObj& recipientDoc);