diff options
author | Blake Oler <blake.oler@mongodb.com> | 2020-09-29 13:02:29 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-29 13:25:30 +0000 |
commit | 5bb70750163b5b78fbcf8a84cf0ed4352cd73a25 (patch) | |
tree | 4aab818990fb3f7d1bcd3fff7071c62ca07ea3d9 | |
parent | f1cbeb3ebf1e112339a3c8da47bd5a553b80b23d (diff) | |
download | mongo-5bb70750163b5b78fbcf8a84cf0ed4352cd73a25.tar.gz |
SERVER-50801 Rearrange resharding PrimaryOnlyService instances to be forward-declared nested classes inside corresponding service classes
6 files changed, 207 insertions, 185 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 27a4552ae79..4fde7935a0f 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -488,6 +488,11 @@ void removeCoordinatorDocAndReshardingFields(OperationContext* opCtx, } } // namespace resharding +std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingCoordinatorService::constructInstance( + BSONObj initialState) const { + return std::make_shared<ReshardingCoordinator>(std::move(initialState)); +} + ReshardingCoordinatorService::ReshardingCoordinator::ReshardingCoordinator(const BSONObj& state) : PrimaryOnlyService::TypedInstance<ReshardingCoordinator>(), _id(state["_id"].wrap().getOwned()), diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h index 78f7cb88197..df5c0aa4fd0 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h @@ -78,6 +78,8 @@ public: : PrimaryOnlyService(serviceContext) {} ~ReshardingCoordinatorService() = default; + class ReshardingCoordinator; + StringData getServiceName() const override { return kReshardingCoordinatorServiceName; } @@ -90,156 +92,153 @@ public: return ThreadPool::Limits(); } std::shared_ptr<PrimaryOnlyService::Instance> constructInstance( - BSONObj initialState) const override { - return std::make_shared<ReshardingCoordinatorService::ReshardingCoordinator>( - std::move(initialState)); + BSONObj initialState) const override; +}; + +class ReshardingCoordinatorService::ReshardingCoordinator final + : public PrimaryOnlyService::TypedInstance<ReshardingCoordinator> { +public: + explicit ReshardingCoordinator(const BSONObj& state); + ~ReshardingCoordinator(); + + void run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept override; + + void interrupt(Status status) override; + + /** + * Returns a Future that will be resolved when all work associated with this Instance has + * completed running. + */ + SharedSemiFuture<void> getCompletionFuture() const { + return _completionPromise.getFuture(); + } + + /** + * TODO(SERVER-50976) Report ReshardingCoordinators in currentOp(). + */ + boost::optional<BSONObj> reportForCurrentOp( + MongoProcessInterface::CurrentOpConnectionsMode connMode, + MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept override { + return boost::none; } - class ReshardingCoordinator final - : public PrimaryOnlyService::TypedInstance<ReshardingCoordinator> { - public: - explicit ReshardingCoordinator(const BSONObj& state); - ~ReshardingCoordinator(); - - void run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept override; - - void interrupt(Status status) override; - - /** - * Returns a Future that will be resolved when all work associated with this Instance has - * completed running. - */ - SharedSemiFuture<void> getCompletionFuture() const { - return _completionPromise.getFuture(); - } - - /** - * TODO(SERVER-50976) Report ReshardingCoordinators in currentOp(). - */ - boost::optional<BSONObj> reportForCurrentOp( - MongoProcessInterface::CurrentOpConnectionsMode connMode, - MongoProcessInterface::CurrentOpSessionsMode sessionMode) noexcept override { - return boost::none; - } - - void setInitialChunksAndZones(std::vector<ChunkType> initialChunks, - std::vector<TagsType> newZones); - - std::shared_ptr<ReshardingCoordinatorObserver> getObserver(); - - private: - struct ChunksAndZones { - std::vector<ChunkType> initialChunks; - std::vector<TagsType> newZones; - }; - - /** - * Does the following writes: - * 1. Inserts coordinator state document into config.reshardingOperations - * 2. Adds reshardingFields to the config.collections entry for the original collection - * 3. Inserts an entry into config.collections for the temporary collection - * 4. Inserts entries into config.chunks for ranges based on the new shard key - * 5. Upserts entries into config.tags for any zones associated with the new shard key - * - * Transitions to 'kInitialized'. - */ - ExecutorFuture<void> _init(const std::shared_ptr<executor::ScopedTaskExecutor>& executor); - - /** - * Waits on _reshardingCoordinatorObserver to notify that all recipients have created the - * temporary collection. Transitions to 'kPreparingToDonate'. - */ - ExecutorFuture<void> _awaitAllRecipientsCreatedCollection( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor); - - /** - * Waits on _reshardingCoordinatorObserver to notify that all donors have picked a - * minFetchTimestamp and are ready to donate. Transitions to 'kCloning'. - */ - ExecutorFuture<void> _awaitAllDonorsReadyToDonate( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor); - - /** - * Waits on _reshardingCoordinatorObserver to notify that all recipients have finished - * cloning. Transitions to 'kMirroring'. - */ - ExecutorFuture<void> _awaitAllRecipientsFinishedCloning( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor); - - /** - * Waits on _reshardingCoordinatorObserver to notify that all recipients have entered - * strict-consistency. - */ - SharedSemiFuture<ReshardingCoordinatorDocument> _awaitAllRecipientsInStrictConsistency( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor); - - /** - * Does the following writes: - * 1. Updates the config.collections entry for the new sharded collection - * 2. Updates config.chunks entries for the new sharded collection - * 3. Updates config.tags for the new sharded collection - * - * Transitions to 'kCommitted'. - */ - Future<void> _commit(const ReshardingCoordinatorDocument& updatedDoc); - - /** - * Waits on _reshardingCoordinatorObserver to notify that all recipients have renamed the - * temporary collection to the original collection namespace. Transitions to 'kDropping'. - */ - ExecutorFuture<void> _awaitAllRecipientsRenamedCollection( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor); - - /** - * Waits on _reshardingCoordinatorObserver to notify that all donors have dropped the - * original collection. Transitions to 'kDone'. - */ - ExecutorFuture<void> _awaitAllDonorsDroppedOriginalCollection( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor); - - /** - * Updates the entry for this resharding operation in config.reshardingOperations and the - * catalog entries for the original and temporary namespaces in config.collections. - */ - void _runUpdates(CoordinatorStateEnum nextState, - ReshardingCoordinatorDocument updatedStateDoc, - boost::optional<Timestamp> fetchTimestamp = boost::none); - - /** - * Sends 'flushRoutingTableCacheUpdates' for the temporary namespace to all recipient - * shards. - */ - void _tellAllRecipientsToRefresh(); - - /** - * Sends 'flushRoutingTableCacheUpdates' for the original namespace to all donor shards. - */ - void _tellAllDonorsToRefresh(); - - // The unique key for a given resharding operation. InstanceID is an alias for BSONObj. The - // value of this is the UUID that will be used as the collection UUID for the new sharded - // collection. The object looks like: {_id: 'reshardingUUID'} - const InstanceID _id; - - // Observes writes that indicate state changes for this resharding operation and notifies - // 'this' when all donors/recipients have entered some state so that 'this' can transition - // states. - std::shared_ptr<ReshardingCoordinatorObserver> _reshardingCoordinatorObserver; - - // The updated coordinator state document. - ReshardingCoordinatorDocument _coordinatorDoc; - - // Protects promises below. - mutable Mutex _mutex = MONGO_MAKE_LATCH("ReshardingCoordinatorService::_mutex"); - - // Promise containing the initial chunks and new zones based on the new shard key. These are - // not a part of the state document, so must be set by configsvrReshardCollection after - // construction. - SharedPromise<ChunksAndZones> _initialChunksAndZonesPromise; - - // Promise that is resolved when the chain of work kicked off by run() has completed. - SharedPromise<void> _completionPromise; + void setInitialChunksAndZones(std::vector<ChunkType> initialChunks, + std::vector<TagsType> newZones); + + std::shared_ptr<ReshardingCoordinatorObserver> getObserver(); + +private: + struct ChunksAndZones { + std::vector<ChunkType> initialChunks; + std::vector<TagsType> newZones; }; + + /** + * Does the following writes: + * 1. Inserts coordinator state document into config.reshardingOperations + * 2. Adds reshardingFields to the config.collections entry for the original collection + * 3. Inserts an entry into config.collections for the temporary collection + * 4. Inserts entries into config.chunks for ranges based on the new shard key + * 5. Upserts entries into config.tags for any zones associated with the new shard key + * + * Transitions to 'kInitialized'. + */ + ExecutorFuture<void> _init(const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + + /** + * Waits on _reshardingCoordinatorObserver to notify that all recipients have created the + * temporary collection. Transitions to 'kPreparingToDonate'. + */ + ExecutorFuture<void> _awaitAllRecipientsCreatedCollection( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + + /** + * Waits on _reshardingCoordinatorObserver to notify that all donors have picked a + * minFetchTimestamp and are ready to donate. Transitions to 'kCloning'. + */ + ExecutorFuture<void> _awaitAllDonorsReadyToDonate( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + + /** + * Waits on _reshardingCoordinatorObserver to notify that all recipients have finished + * cloning. Transitions to 'kMirroring'. + */ + ExecutorFuture<void> _awaitAllRecipientsFinishedCloning( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + + /** + * Waits on _reshardingCoordinatorObserver to notify that all recipients have entered + * strict-consistency. + */ + SharedSemiFuture<ReshardingCoordinatorDocument> _awaitAllRecipientsInStrictConsistency( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + + /** + * Does the following writes: + * 1. Updates the config.collections entry for the new sharded collection + * 2. Updates config.chunks entries for the new sharded collection + * 3. Updates config.tags for the new sharded collection + * + * Transitions to 'kCommitted'. + */ + Future<void> _commit(const ReshardingCoordinatorDocument& updatedDoc); + + /** + * Waits on _reshardingCoordinatorObserver to notify that all recipients have renamed the + * temporary collection to the original collection namespace. Transitions to 'kDropping'. + */ + ExecutorFuture<void> _awaitAllRecipientsRenamedCollection( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + + /** + * Waits on _reshardingCoordinatorObserver to notify that all donors have dropped the + * original collection. Transitions to 'kDone'. + */ + ExecutorFuture<void> _awaitAllDonorsDroppedOriginalCollection( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor); + + /** + * Updates the entry for this resharding operation in config.reshardingOperations and the + * catalog entries for the original and temporary namespaces in config.collections. + */ + void _runUpdates(CoordinatorStateEnum nextState, + ReshardingCoordinatorDocument updatedStateDoc, + boost::optional<Timestamp> fetchTimestamp = boost::none); + + /** + * Sends 'flushRoutingTableCacheUpdates' for the temporary namespace to all recipient + * shards. + */ + void _tellAllRecipientsToRefresh(); + + /** + * Sends 'flushRoutingTableCacheUpdates' for the original namespace to all donor shards. + */ + void _tellAllDonorsToRefresh(); + + // The unique key for a given resharding operation. InstanceID is an alias for BSONObj. The + // value of this is the UUID that will be used as the collection UUID for the new sharded + // collection. The object looks like: {_id: 'reshardingUUID'} + const InstanceID _id; + + // Observes writes that indicate state changes for this resharding operation and notifies + // 'this' when all donors/recipients have entered some state so that 'this' can transition + // states. + std::shared_ptr<ReshardingCoordinatorObserver> _reshardingCoordinatorObserver; + + // The updated coordinator state document. + ReshardingCoordinatorDocument _coordinatorDoc; + + // Protects promises below. + mutable Mutex _mutex = MONGO_MAKE_LATCH("ReshardingCoordinatorService::_mutex"); + + // Promise containing the initial chunks and new zones based on the new shard key. These are + // not a part of the state document, so must be set by configsvrReshardCollection after + // construction. + SharedPromise<ChunksAndZones> _initialChunksAndZonesPromise; + + // Promise that is resolved when the chain of work kicked off by run() has completed. + SharedPromise<void> _completionPromise; }; } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index 76590d50bb0..3a8acf65dcd 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -86,20 +86,21 @@ std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingDonorService::cons return std::make_shared<DonorStateMachine>(std::move(initialState)); } -DonorStateMachine::DonorStateMachine(const BSONObj& donorDoc) +ReshardingDonorService::DonorStateMachine::DonorStateMachine(const BSONObj& donorDoc) : repl::PrimaryOnlyService::TypedInstance<DonorStateMachine>(), _donorDoc(ReshardingDonorDocument::parse(IDLParserErrorContext("ReshardingDonorDocument"), donorDoc)), _id(_donorDoc.getCommonReshardingMetadata().get_id()) {} -DonorStateMachine::~DonorStateMachine() { +ReshardingDonorService::DonorStateMachine::~DonorStateMachine() { stdx::lock_guard<Latch> lg(_mutex); invariant(_allRecipientsDoneApplying.getFuture().isReady()); invariant(_coordinatorHasCommitted.getFuture().isReady()); invariant(_completionPromise.getFuture().isReady()); } -void DonorStateMachine::run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept { +void ReshardingDonorService::DonorStateMachine::run( + std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept { ExecutorFuture<void>(**executor) .then([this] { _transitionState(DonorStateEnum::kPreparingToDonate); }) .then([this] { _onPreparingToDonateCalculateMinFetchTimestampThenBeginDonating(); }) @@ -136,7 +137,7 @@ void DonorStateMachine::run(std::shared_ptr<executor::ScopedTaskExecutor> execut }); } -void DonorStateMachine::interrupt(Status status) { +void ReshardingDonorService::DonorStateMachine::interrupt(Status status) { // Resolve any unresolved promises to avoid hanging. stdx::lock_guard<Latch> lg(_mutex); if (!_allRecipientsDoneApplying.getFuture().isReady()) { @@ -152,10 +153,11 @@ void DonorStateMachine::interrupt(Status status) { } } -void DonorStateMachine::onReshardingFieldsChanges( +void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges( boost::optional<TypeCollectionReshardingFields> reshardingFields) {} -void DonorStateMachine::_onPreparingToDonateCalculateMinFetchTimestampThenBeginDonating() { +void ReshardingDonorService::DonorStateMachine:: + _onPreparingToDonateCalculateMinFetchTimestampThenBeginDonating() { if (_donorDoc.getState() > DonorStateEnum::kPreparingToDonate) { invariant(_donorDoc.getMinFetchTimestamp()); return; @@ -165,7 +167,8 @@ void DonorStateMachine::_onPreparingToDonateCalculateMinFetchTimestampThenBeginD _transitionState(DonorStateEnum::kDonating, minFetchTimestamp); } -ExecutorFuture<void> DonorStateMachine::_awaitAllRecipientsDoneApplyingThenStartMirroring( +ExecutorFuture<void> +ReshardingDonorService::DonorStateMachine::_awaitAllRecipientsDoneApplyingThenStartMirroring( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { if (_donorDoc.getState() > DonorStateEnum::kDonating) { return ExecutorFuture<void>(**executor, Status::OK()); @@ -176,7 +179,8 @@ ExecutorFuture<void> DonorStateMachine::_awaitAllRecipientsDoneApplyingThenStart }); } -ExecutorFuture<void> DonorStateMachine::_awaitCoordinatorHasCommittedThenTransitionToDropping( +ExecutorFuture<void> +ReshardingDonorService::DonorStateMachine::_awaitCoordinatorHasCommittedThenTransitionToDropping( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { if (_donorDoc.getState() > DonorStateEnum::kMirroring) { return ExecutorFuture<void>(**executor, Status::OK()); @@ -187,7 +191,7 @@ ExecutorFuture<void> DonorStateMachine::_awaitCoordinatorHasCommittedThenTransit }); } -void DonorStateMachine::_dropOriginalCollectionThenDeleteLocalState() { +void ReshardingDonorService::DonorStateMachine::_dropOriginalCollectionThenDeleteLocalState() { if (_donorDoc.getState() > DonorStateEnum::kDropping) { return; } @@ -195,8 +199,8 @@ void DonorStateMachine::_dropOriginalCollectionThenDeleteLocalState() { _transitionState(DonorStateEnum::kDone); } -void DonorStateMachine::_transitionState(DonorStateEnum endState, - boost::optional<Timestamp> minFetchTimestamp) { +void ReshardingDonorService::DonorStateMachine::_transitionState( + DonorStateEnum endState, boost::optional<Timestamp> minFetchTimestamp) { ReshardingDonorDocument replacementDoc(_donorDoc); replacementDoc.setState(endState); if (minFetchTimestamp) { @@ -211,13 +215,14 @@ void DonorStateMachine::_transitionState(DonorStateEnum endState, _updateDonorDocument(std::move(replacementDoc)); } -void DonorStateMachine::_transitionStateToError(const Status& status) { +void ReshardingDonorService::DonorStateMachine::_transitionStateToError(const Status& status) { ReshardingDonorDocument replacementDoc(_donorDoc); replacementDoc.setState(DonorStateEnum::kError); _updateDonorDocument(std::move(replacementDoc)); } -void DonorStateMachine::_updateDonorDocument(ReshardingDonorDocument&& replacementDoc) { +void ReshardingDonorService::DonorStateMachine::_updateDonorDocument( + ReshardingDonorDocument&& replacementDoc) { auto opCtx = cc().makeOperationContext(); PersistentTaskStore<ReshardingDonorDocument> store( NamespaceString::kDonorReshardingOperationsNamespace); diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h index 09578ef2611..f5df94ae523 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.h +++ b/src/mongo/db/s/resharding/resharding_donor_service.h @@ -42,6 +42,8 @@ public: : PrimaryOnlyService(serviceContext) {} ~ReshardingDonorService() = default; + class DonorStateMachine; + StringData getServiceName() const override { return kReshardingDonorServiceName; } @@ -63,7 +65,8 @@ public: * Represents the current state of a resharding donor operation on this shard. This class drives * state transitions and updates to underlying on-disk metadata. */ -class DonorStateMachine final : public repl::PrimaryOnlyService::TypedInstance<DonorStateMachine> { +class ReshardingDonorService::DonorStateMachine final + : public repl::PrimaryOnlyService::TypedInstance<DonorStateMachine> { public: explicit DonorStateMachine(const BSONObj& donorDoc); diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index b80775b6c46..73256ff0fab 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -42,13 +42,14 @@ std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingRecipientService:: return std::make_shared<RecipientStateMachine>(std::move(initialState)); } -RecipientStateMachine::RecipientStateMachine(const BSONObj& recipientDoc) +ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine( + const BSONObj& recipientDoc) : repl::PrimaryOnlyService::TypedInstance<RecipientStateMachine>(), _recipientDoc(ReshardingRecipientDocument::parse( IDLParserErrorContext("ReshardingRecipientDocument"), recipientDoc)), _id(_recipientDoc.getCommonReshardingMetadata().get_id()) {} -RecipientStateMachine::~RecipientStateMachine() { +ReshardingRecipientService::RecipientStateMachine::~RecipientStateMachine() { stdx::lock_guard<Latch> lg(_mutex); invariant(_allDonorsPreparedToDonate.getFuture().isReady()); invariant(_allDonorsMirroring.getFuture().isReady()); @@ -56,7 +57,8 @@ RecipientStateMachine::~RecipientStateMachine() { invariant(_completionPromise.getFuture().isReady()); } -void RecipientStateMachine::run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept { +void ReshardingRecipientService::RecipientStateMachine::run( + std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept { ExecutorFuture<void>(**executor) .then([this] { _createTemporaryReshardingCollectionThenTransitionToInitialized(); }) .then([this, executor] { @@ -98,7 +100,7 @@ void RecipientStateMachine::run(std::shared_ptr<executor::ScopedTaskExecutor> ex }); } -void RecipientStateMachine::interrupt(Status status) { +void ReshardingRecipientService::RecipientStateMachine::interrupt(Status status) { // Resolve any unresolved promises to avoid hanging. stdx::lock_guard<Latch> lg(_mutex); if (!_allDonorsPreparedToDonate.getFuture().isReady()) { @@ -118,11 +120,11 @@ void RecipientStateMachine::interrupt(Status status) { } } -void onReshardingFieldsChanges(boost::optional<TypeCollectionReshardingFields> reshardingFields) {} +void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChanges( + boost::optional<TypeCollectionReshardingFields> reshardingFields) {} -void onDonorReportsMirroring(const ShardId& donor) {} - -void RecipientStateMachine::_createTemporaryReshardingCollectionThenTransitionToInitialized() { +void ReshardingRecipientService::RecipientStateMachine:: + _createTemporaryReshardingCollectionThenTransitionToInitialized() { if (_recipientDoc.getState() > RecipientStateEnum::kInitializing) { return; } @@ -130,8 +132,9 @@ void RecipientStateMachine::_createTemporaryReshardingCollectionThenTransitionTo _transitionState(RecipientStateEnum::kInitialized); } -ExecutorFuture<void> RecipientStateMachine::_awaitAllDonorsPreparedToDonateThenTransitionToCloning( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { +ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: + _awaitAllDonorsPreparedToDonateThenTransitionToCloning( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { if (_recipientDoc.getState() > RecipientStateEnum::kInitialized) { return ExecutorFuture<void>(**executor, Status::OK()); } @@ -143,7 +146,7 @@ ExecutorFuture<void> RecipientStateMachine::_awaitAllDonorsPreparedToDonateThenT }); } -void RecipientStateMachine::_cloneThenTransitionToApplying() { +void ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplying() { if (_recipientDoc.getState() > RecipientStateEnum::kCloning) { return; } @@ -151,7 +154,7 @@ void RecipientStateMachine::_cloneThenTransitionToApplying() { _transitionState(RecipientStateEnum::kApplying); } -void RecipientStateMachine::_applyThenTransitionToSteadyState() { +void ReshardingRecipientService::RecipientStateMachine::_applyThenTransitionToSteadyState() { if (_recipientDoc.getState() > RecipientStateEnum::kApplying) { return; } @@ -159,9 +162,9 @@ void RecipientStateMachine::_applyThenTransitionToSteadyState() { _transitionState(RecipientStateEnum::kSteadyState); } -ExecutorFuture<void> -RecipientStateMachine::_awaitAllDonorsMirroringThenTransitionToStrictConsistency( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { +ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: + _awaitAllDonorsMirroringThenTransitionToStrictConsistency( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { if (_recipientDoc.getState() > RecipientStateEnum::kSteadyState) { return ExecutorFuture<void>(**executor, Status::OK()); } @@ -171,8 +174,9 @@ RecipientStateMachine::_awaitAllDonorsMirroringThenTransitionToStrictConsistency }); } -ExecutorFuture<void> RecipientStateMachine::_awaitCoordinatorHasCommittedThenTransitionToRenaming( - const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { +ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: + _awaitCoordinatorHasCommittedThenTransitionToRenaming( + const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { if (_recipientDoc.getState() > RecipientStateEnum::kStrictConsistency) { return ExecutorFuture<void>(**executor, Status::OK()); } @@ -182,7 +186,8 @@ ExecutorFuture<void> RecipientStateMachine::_awaitCoordinatorHasCommittedThenTra }); } -void RecipientStateMachine::_renameTemporaryReshardingCollectionThenDeleteLocalState() { +void ReshardingRecipientService::RecipientStateMachine:: + _renameTemporaryReshardingCollectionThenDeleteLocalState() { if (_recipientDoc.getState() > RecipientStateEnum::kRenaming) { return; } @@ -190,12 +195,13 @@ void RecipientStateMachine::_renameTemporaryReshardingCollectionThenDeleteLocalS _transitionState(RecipientStateEnum::kDone); } -void RecipientStateMachine::_fulfillAllDonorsPreparedToDonate(Timestamp fetchTimestamp) { +void ReshardingRecipientService::RecipientStateMachine::_fulfillAllDonorsPreparedToDonate( + Timestamp fetchTimestamp) { _allDonorsPreparedToDonate.emplaceValue(fetchTimestamp); } -void RecipientStateMachine::_transitionState(RecipientStateEnum endState, - boost::optional<Timestamp> fetchTimestamp) { +void ReshardingRecipientService::RecipientStateMachine::_transitionState( + RecipientStateEnum endState, boost::optional<Timestamp> fetchTimestamp) { ReshardingRecipientDocument replacementDoc(_recipientDoc); replacementDoc.setState(endState); if (fetchTimestamp) { @@ -211,13 +217,15 @@ void RecipientStateMachine::_transitionState(RecipientStateEnum endState, _updateRecipientDocument(std::move(replacementDoc)); } -void RecipientStateMachine::_transitionStateToError(const Status& status) { +void ReshardingRecipientService::RecipientStateMachine::_transitionStateToError( + const Status& status) { ReshardingRecipientDocument replacementDoc(_recipientDoc); replacementDoc.setState(RecipientStateEnum::kError); _updateRecipientDocument(std::move(replacementDoc)); } -void RecipientStateMachine::_updateRecipientDocument(ReshardingRecipientDocument&& replacementDoc) { +void ReshardingRecipientService::RecipientStateMachine::_updateRecipientDocument( + ReshardingRecipientDocument&& replacementDoc) { auto opCtx = cc().makeOperationContext(); PersistentTaskStore<ReshardingRecipientDocument> store( NamespaceString::kRecipientReshardingOperationsNamespace); diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index 8435e15071b..ec38bbbdde8 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -42,6 +42,8 @@ public: : PrimaryOnlyService(serviceContext) {} ~ReshardingRecipientService() = default; + class RecipientStateMachine; + StringData getServiceName() const override { return kReshardingRecipientServiceName; } @@ -63,7 +65,7 @@ public: * Represents the current state of a resharding recipient operation on this shard. This class * drives state transitions and updates to underlying on-disk metadata. */ -class RecipientStateMachine final +class ReshardingRecipientService::RecipientStateMachine final : public repl::PrimaryOnlyService::TypedInstance<RecipientStateMachine> { public: explicit RecipientStateMachine(const BSONObj& recipientDoc); |