diff options
Diffstat (limited to 'src/mongo/db/s')
9 files changed, 224 insertions, 16 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp index 8c985c59b06..3bbc9380c66 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp @@ -142,7 +142,15 @@ bool stateTransitionIncomplete(WithLock lk, ReshardingCoordinatorObserver::ReshardingCoordinatorObserver() = default; -ReshardingCoordinatorObserver::~ReshardingCoordinatorObserver() = default; +ReshardingCoordinatorObserver::~ReshardingCoordinatorObserver() { + stdx::lock_guard<Latch> lg(_mutex); + invariant(_allRecipientsCreatedCollection.getFuture().isReady()); + invariant(_allDonorsReportedMinFetchTimestamp.getFuture().isReady()); + invariant(_allRecipientsFinishedCloning.getFuture().isReady()); + invariant(_allRecipientsReportedStrictConsistencyTimestamp.getFuture().isReady()); + invariant(_allRecipientsRenamedCollection.getFuture().isReady()); + invariant(_allDonorsDroppedOriginalCollection.getFuture().isReady()); +} void ReshardingCoordinatorObserver::onReshardingParticipantTransition( const ReshardingCoordinatorDocument& updatedStateDoc) { @@ -220,4 +228,31 @@ ReshardingCoordinatorObserver::awaitAllRecipientsRenamedCollection() { return _allRecipientsRenamedCollection.getFuture(); } +void ReshardingCoordinatorObserver::interrupt(Status status) { + stdx::lock_guard<Latch> lg(_mutex); + if (!_allRecipientsCreatedCollection.getFuture().isReady()) { + _allRecipientsCreatedCollection.setError(status); + } + + if (!_allDonorsReportedMinFetchTimestamp.getFuture().isReady()) { + _allDonorsReportedMinFetchTimestamp.setError(status); + } + + if (!_allRecipientsFinishedCloning.getFuture().isReady()) { + _allRecipientsFinishedCloning.setError(status); + } + + if (!_allRecipientsReportedStrictConsistencyTimestamp.getFuture().isReady()) { + _allRecipientsReportedStrictConsistencyTimestamp.setError(status); + } + + if (!_allRecipientsRenamedCollection.getFuture().isReady()) { + _allRecipientsRenamedCollection.setError(status); + } + + if (!_allDonorsDroppedOriginalCollection.getFuture().isReady()) { + _allDonorsDroppedOriginalCollection.setError(status); + } +} + } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.h b/src/mongo/db/s/resharding/resharding_coordinator_observer.h index 435721a7fde..f721e04d8fc 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_observer.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.h @@ -102,6 +102,11 @@ public: */ SharedSemiFuture<ReshardingCoordinatorDocument> awaitAllRecipientsRenamedCollection(); + /** + * Sets errors on any promises that have not yet been fulfilled. + */ + void interrupt(Status status); + private: // Protects the state below Mutex _mutex = MONGO_MAKE_LATCH("ReshardingCoordinatorObserver::_mutex"); diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp index f9cc6c53d09..7b811ab0ef3 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp @@ -113,6 +113,8 @@ TEST_F(ReshardingCoordinatorObserverTest, onReshardingParticipantTransitionSucce auto coordinatorDoc1 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards1, donorShards); reshardingObserver->onReshardingParticipantTransition(coordinatorDoc1); ASSERT_TRUE(fut.isReady()); + + reshardingObserver->interrupt(Status{ErrorCodes::Interrupted, "interrupted"}); } TEST_F(ReshardingCoordinatorObserverTest, onReshardingParticipantTransitionTwoOutOfOrder) { @@ -146,6 +148,8 @@ TEST_F(ReshardingCoordinatorObserverTest, onReshardingParticipantTransitionTwoOu auto coordinatorDoc2 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards2, donorShards); reshardingObserver->onReshardingParticipantTransition(coordinatorDoc2); ASSERT_TRUE(fut.isReady()); + + reshardingObserver->interrupt(Status{ErrorCodes::Interrupted, "interrupted"}); } TEST_F(ReshardingCoordinatorObserverTest, participantReportsError) { @@ -167,6 +171,8 @@ TEST_F(ReshardingCoordinatorObserverTest, participantReportsError) { // If any participant is in state kError, regardless of other participants' states, an error // should be set. ASSERT_EQ(resp.getStatus(), ErrorCodes::InternalError); + + reshardingObserver->interrupt(Status{ErrorCodes::Interrupted, "interrupted"}); } TEST_F(ReshardingCoordinatorObserverTest, onDonorsReportedMinFetchTimestamp) { @@ -191,6 +197,8 @@ TEST_F(ReshardingCoordinatorObserverTest, onDonorsReportedMinFetchTimestamp) { auto coordinatorDoc1 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards, donorShards1); reshardingObserver->onReshardingParticipantTransition(coordinatorDoc1); ASSERT_TRUE(fut.isReady()); + + reshardingObserver->interrupt(Status{ErrorCodes::Interrupted, "interrupted"}); } } // namespace } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 42be9c1a946..40a72cb75d7 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -51,6 +51,12 @@ ReshardingCoordinatorService::ReshardingCoordinator::ReshardingCoordinator(const _reshardingCoordinatorObserver = std::make_shared<ReshardingCoordinatorObserver>(); } +ReshardingCoordinatorService::ReshardingCoordinator::~ReshardingCoordinator() { + stdx::lock_guard<Latch> lg(_mutex); + invariant(_initialChunksAndZonesPromise.getFuture().isReady()); + invariant(_completionPromise.getFuture().isReady()); +} + void ReshardingCoordinatorService::ReshardingCoordinator::run( std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept { ExecutorFuture<void>(**executor) @@ -80,6 +86,12 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run( .then([this] { _tellAllRecipientsToRefresh(); }) .then([this] { _tellAllDonorsToRefresh(); }) .onError([this](Status status) { + stdx::lock_guard<Latch> lg(_mutex); + if (_completionPromise.getFuture().isReady()) { + // interrupt() was called before we got here. + return status; + } + _runUpdates(CoordinatorStateEnum::kError, _stateDoc); LOGV2(4956902, @@ -94,11 +106,38 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run( return status; }) - .getAsync([](Status) {}); + .getAsync([this](Status status) { + stdx::lock_guard<Latch> lg(_mutex); + if (_completionPromise.getFuture().isReady()) { + // interrupt() was called before we got here. + return; + } + + if (status.isOK()) { + _completionPromise.emplaceValue(); + } else { + _completionPromise.setError(status); + } + }); +} + +void ReshardingCoordinatorService::ReshardingCoordinator::interrupt(Status status) { + // Resolve any unresolved promises to avoid hanging. + stdx::lock_guard<Latch> lg(_mutex); + if (!_initialChunksAndZonesPromise.getFuture().isReady()) { + _initialChunksAndZonesPromise.setError(status); + } + + _reshardingCoordinatorObserver->interrupt(status); + + if (!_completionPromise.getFuture().isReady()) { + _completionPromise.setError(status); + } } void ReshardingCoordinatorService::ReshardingCoordinator::setInitialChunksAndZones( std::vector<ChunkType> initialChunks, std::vector<TagsType> newZones) { + stdx::lock_guard<Latch> lg(_mutex); if (_stateDoc.getState() > CoordinatorStateEnum::kInitializing || _initialChunksAndZonesPromise.getFuture().isReady()) { return; diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h index eae5b690692..95b8d19998a 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h @@ -72,10 +72,19 @@ public: : public PrimaryOnlyService::TypedInstance<ReshardingCoordinator> { public: explicit ReshardingCoordinator(const BSONObj& state); + ~ReshardingCoordinator(); void run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept override; - void interrupt(Status status) override{}; + void interrupt(Status status) override; + + /** + * Returns a Future that will be resolved when all work associated with this Instance has + * completed running. + */ + SharedSemiFuture<void> getCompletionFuture() const { + return _completionPromise.getFuture(); + } /** * TODO(SERVER-50976) Report ReshardingCoordinators in currentOp(). @@ -196,11 +205,6 @@ public: // collection. The object looks like: {_id: 'reshardingUUID'} const InstanceID _id; - // Promise containing the initial chunks and new zones based on the new shard key. These are - // not a part of the state document, so must be set by configsvrReshardCollection after - // construction. - SharedPromise<ChunksAndZones> _initialChunksAndZonesPromise; - // Observes writes that indicate state changes for this resharding operation and notifies // 'this' when all donors/recipients have entered some state so that 'this' can transition // states. @@ -208,6 +212,17 @@ public: // The updated coordinator state document. ReshardingCoordinatorDocument _stateDoc; + + // Protects promises below. + mutable Mutex _mutex = MONGO_MAKE_LATCH("ReshardingCoordinatorService::_mutex"); + + // Promise containing the initial chunks and new zones based on the new shard key. These are + // not a part of the state document, so must be set by configsvrReshardCollection after + // construction. + SharedPromise<ChunksAndZones> _initialChunksAndZonesPromise; + + // Promise that is resolved when the chain of work kicked off by run() has completed. + SharedPromise<void> _completionPromise; }; }; diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index 7f82c599e96..76590d50bb0 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -92,6 +92,13 @@ DonorStateMachine::DonorStateMachine(const BSONObj& donorDoc) donorDoc)), _id(_donorDoc.getCommonReshardingMetadata().get_id()) {} +DonorStateMachine::~DonorStateMachine() { + stdx::lock_guard<Latch> lg(_mutex); + invariant(_allRecipientsDoneApplying.getFuture().isReady()); + invariant(_coordinatorHasCommitted.getFuture().isReady()); + invariant(_completionPromise.getFuture().isReady()); +} + void DonorStateMachine::run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept { ExecutorFuture<void>(**executor) .then([this] { _transitionState(DonorStateEnum::kPreparingToDonate); }) @@ -114,7 +121,35 @@ void DonorStateMachine::run(std::shared_ptr<executor::ScopedTaskExecutor> execut this->_transitionStateToError(status); return status; }) - .getAsync([](Status) {}); + .getAsync([this](Status status) { + stdx::lock_guard<Latch> lg(_mutex); + if (_completionPromise.getFuture().isReady()) { + // interrupt() was called before we got here. + return; + } + + if (status.isOK()) { + _completionPromise.emplaceValue(); + } else { + _completionPromise.setError(status); + } + }); +} + +void DonorStateMachine::interrupt(Status status) { + // Resolve any unresolved promises to avoid hanging. + stdx::lock_guard<Latch> lg(_mutex); + if (!_allRecipientsDoneApplying.getFuture().isReady()) { + _allRecipientsDoneApplying.setError(status); + } + + if (!_coordinatorHasCommitted.getFuture().isReady()) { + _coordinatorHasCommitted.setError(status); + } + + if (!_completionPromise.getFuture().isReady()) { + _completionPromise.setError(status); + } } void DonorStateMachine::onReshardingFieldsChanges( diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h index 4e44ba5a110..09578ef2611 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.h +++ b/src/mongo/db/s/resharding/resharding_donor_service.h @@ -67,9 +67,19 @@ class DonorStateMachine final : public repl::PrimaryOnlyService::TypedInstance<D public: explicit DonorStateMachine(const BSONObj& donorDoc); + ~DonorStateMachine(); + void run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept override; - void interrupt(Status status) override{}; + void interrupt(Status status) override; + + /** + * Returns a Future that will be resolved when all work associated with this Instance has + * completed running. + */ + SharedSemiFuture<void> getCompletionFuture() const { + return _completionPromise.getFuture(); + } /** * TODO(SERVER-50978) Report ReshardingDonorService Instances in currentOp(). @@ -109,14 +119,19 @@ private: // config.localReshardingOperations.donor. ReshardingDonorDocument _donorDoc; + // The id both for the resharding operation and for the primary-only-service instance. + const UUID _id; + + // Protects the promises below + Mutex _mutex = MONGO_MAKE_LATCH("ReshardingDonor::_mutex"); + // Each promise below corresponds to a state on the donor state machine. They are listed in // ascending order, such that the first promise below will be the first promise fulfilled. SharedPromise<void> _allRecipientsDoneApplying; SharedPromise<void> _coordinatorHasCommitted; - // The id both for the resharding operation and for the primary-only-service instance. - const UUID _id; + SharedPromise<void> _completionPromise; }; } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index dfe9272c183..b80775b6c46 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -48,6 +48,14 @@ RecipientStateMachine::RecipientStateMachine(const BSONObj& recipientDoc) IDLParserErrorContext("ReshardingRecipientDocument"), recipientDoc)), _id(_recipientDoc.getCommonReshardingMetadata().get_id()) {} +RecipientStateMachine::~RecipientStateMachine() { + stdx::lock_guard<Latch> lg(_mutex); + invariant(_allDonorsPreparedToDonate.getFuture().isReady()); + invariant(_allDonorsMirroring.getFuture().isReady()); + invariant(_coordinatorHasCommitted.getFuture().isReady()); + invariant(_completionPromise.getFuture().isReady()); +} + void RecipientStateMachine::run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept { ExecutorFuture<void>(**executor) .then([this] { _createTemporaryReshardingCollectionThenTransitionToInitialized(); }) @@ -74,7 +82,40 @@ void RecipientStateMachine::run(std::shared_ptr<executor::ScopedTaskExecutor> ex this->_transitionStateToError(status); return status; }) - .getAsync([](Status) {}); + .getAsync([this](Status status) { + stdx::lock_guard<Latch> lg(_mutex); + if (_completionPromise.getFuture().isReady()) { + // interrupt() was called before we got her.e + return; + } + + if (status.isOK()) { + _completionPromise.emplaceValue(); + } else { + // Set error on all promises + _completionPromise.setError(status); + } + }); +} + +void RecipientStateMachine::interrupt(Status status) { + // Resolve any unresolved promises to avoid hanging. + stdx::lock_guard<Latch> lg(_mutex); + if (!_allDonorsPreparedToDonate.getFuture().isReady()) { + _allDonorsPreparedToDonate.setError(status); + } + + if (!_allDonorsMirroring.getFuture().isReady()) { + _allDonorsMirroring.setError(status); + } + + if (!_coordinatorHasCommitted.getFuture().isReady()) { + _coordinatorHasCommitted.setError(status); + } + + if (!_completionPromise.getFuture().isReady()) { + _completionPromise.setError(status); + } } void onReshardingFieldsChanges(boost::optional<TypeCollectionReshardingFields> reshardingFields) {} diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index b3259719183..8435e15071b 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -68,9 +68,19 @@ class RecipientStateMachine final public: explicit RecipientStateMachine(const BSONObj& recipientDoc); + ~RecipientStateMachine(); + void run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept override; - void interrupt(Status status) override{}; + void interrupt(Status status) override; + + /** + * Returns a Future that will be resolved when all work associated with this Instance has + * completed running. + */ + SharedSemiFuture<void> getCompletionFuture() const { + return _completionPromise.getFuture(); + } /** * TODO(SERVER-51021) Report ReshardingRecipientService Instances in currentOp(). @@ -119,6 +129,12 @@ private: // config.localReshardingOperations.recipient. ReshardingRecipientDocument _recipientDoc; + // The id both for the resharding operation and for the primary-only-service instance. + const UUID _id; + + // Protects the promises below + Mutex _mutex = MONGO_MAKE_LATCH("ReshardingRecipient::_mutex"); + // Each promise below corresponds to a state on the recipient state machine. They are listed in // ascending order, such that the first promise below will be the first promise fulfilled. SharedPromise<Timestamp> _allDonorsPreparedToDonate; @@ -127,8 +143,7 @@ private: SharedPromise<void> _coordinatorHasCommitted; - // The id both for the resharding operation and for the primary-only-service instance. - const UUID _id; + SharedPromise<void> _completionPromise; }; } // namespace mongo |