summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2020-09-18 18:57:50 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-09-24 21:06:09 +0000
commit718b31f226b8af509a1f78c288aaec0157d6e9af (patch)
treeab337e01a4b54d96125a230adf06987c0d12e7eb /src/mongo/db
parent268cf87b208dd966c5e017d0219769163c1dacb0 (diff)
downloadmongo-718b31f226b8af509a1f78c288aaec0157d6e9af.tar.gz
SERVER-50986 Implement PrimaryOnlyService::interrupt on resharding primary only services
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_observer.cpp37
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_observer.h5
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp8
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp41
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.h27
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.cpp37
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.h21
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp43
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h21
9 files changed, 224 insertions, 16 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
index 8c985c59b06..3bbc9380c66 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.cpp
@@ -142,7 +142,15 @@ bool stateTransitionIncomplete(WithLock lk,
ReshardingCoordinatorObserver::ReshardingCoordinatorObserver() = default;
-ReshardingCoordinatorObserver::~ReshardingCoordinatorObserver() = default;
+ReshardingCoordinatorObserver::~ReshardingCoordinatorObserver() {
+ stdx::lock_guard<Latch> lg(_mutex);
+ invariant(_allRecipientsCreatedCollection.getFuture().isReady());
+ invariant(_allDonorsReportedMinFetchTimestamp.getFuture().isReady());
+ invariant(_allRecipientsFinishedCloning.getFuture().isReady());
+ invariant(_allRecipientsReportedStrictConsistencyTimestamp.getFuture().isReady());
+ invariant(_allRecipientsRenamedCollection.getFuture().isReady());
+ invariant(_allDonorsDroppedOriginalCollection.getFuture().isReady());
+}
void ReshardingCoordinatorObserver::onReshardingParticipantTransition(
const ReshardingCoordinatorDocument& updatedStateDoc) {
@@ -220,4 +228,31 @@ ReshardingCoordinatorObserver::awaitAllRecipientsRenamedCollection() {
return _allRecipientsRenamedCollection.getFuture();
}
+void ReshardingCoordinatorObserver::interrupt(Status status) {
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (!_allRecipientsCreatedCollection.getFuture().isReady()) {
+ _allRecipientsCreatedCollection.setError(status);
+ }
+
+ if (!_allDonorsReportedMinFetchTimestamp.getFuture().isReady()) {
+ _allDonorsReportedMinFetchTimestamp.setError(status);
+ }
+
+ if (!_allRecipientsFinishedCloning.getFuture().isReady()) {
+ _allRecipientsFinishedCloning.setError(status);
+ }
+
+ if (!_allRecipientsReportedStrictConsistencyTimestamp.getFuture().isReady()) {
+ _allRecipientsReportedStrictConsistencyTimestamp.setError(status);
+ }
+
+ if (!_allRecipientsRenamedCollection.getFuture().isReady()) {
+ _allRecipientsRenamedCollection.setError(status);
+ }
+
+ if (!_allDonorsDroppedOriginalCollection.getFuture().isReady()) {
+ _allDonorsDroppedOriginalCollection.setError(status);
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer.h b/src/mongo/db/s/resharding/resharding_coordinator_observer.h
index 435721a7fde..f721e04d8fc 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_observer.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_observer.h
@@ -102,6 +102,11 @@ public:
*/
SharedSemiFuture<ReshardingCoordinatorDocument> awaitAllRecipientsRenamedCollection();
+ /**
+ * Sets errors on any promises that have not yet been fulfilled.
+ */
+ void interrupt(Status status);
+
private:
// Protects the state below
Mutex _mutex = MONGO_MAKE_LATCH("ReshardingCoordinatorObserver::_mutex");
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp
index f9cc6c53d09..7b811ab0ef3 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_observer_test.cpp
@@ -113,6 +113,8 @@ TEST_F(ReshardingCoordinatorObserverTest, onReshardingParticipantTransitionSucce
auto coordinatorDoc1 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards1, donorShards);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc1);
ASSERT_TRUE(fut.isReady());
+
+ reshardingObserver->interrupt(Status{ErrorCodes::Interrupted, "interrupted"});
}
TEST_F(ReshardingCoordinatorObserverTest, onReshardingParticipantTransitionTwoOutOfOrder) {
@@ -146,6 +148,8 @@ TEST_F(ReshardingCoordinatorObserverTest, onReshardingParticipantTransitionTwoOu
auto coordinatorDoc2 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards2, donorShards);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc2);
ASSERT_TRUE(fut.isReady());
+
+ reshardingObserver->interrupt(Status{ErrorCodes::Interrupted, "interrupted"});
}
TEST_F(ReshardingCoordinatorObserverTest, participantReportsError) {
@@ -167,6 +171,8 @@ TEST_F(ReshardingCoordinatorObserverTest, participantReportsError) {
// If any participant is in state kError, regardless of other participants' states, an error
// should be set.
ASSERT_EQ(resp.getStatus(), ErrorCodes::InternalError);
+
+ reshardingObserver->interrupt(Status{ErrorCodes::Interrupted, "interrupted"});
}
TEST_F(ReshardingCoordinatorObserverTest, onDonorsReportedMinFetchTimestamp) {
@@ -191,6 +197,8 @@ TEST_F(ReshardingCoordinatorObserverTest, onDonorsReportedMinFetchTimestamp) {
auto coordinatorDoc1 = makeCoordinatorDocWithRecipientsAndDonors(recipientShards, donorShards1);
reshardingObserver->onReshardingParticipantTransition(coordinatorDoc1);
ASSERT_TRUE(fut.isReady());
+
+ reshardingObserver->interrupt(Status{ErrorCodes::Interrupted, "interrupted"});
}
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 42be9c1a946..40a72cb75d7 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -51,6 +51,12 @@ ReshardingCoordinatorService::ReshardingCoordinator::ReshardingCoordinator(const
_reshardingCoordinatorObserver = std::make_shared<ReshardingCoordinatorObserver>();
}
+ReshardingCoordinatorService::ReshardingCoordinator::~ReshardingCoordinator() {
+ stdx::lock_guard<Latch> lg(_mutex);
+ invariant(_initialChunksAndZonesPromise.getFuture().isReady());
+ invariant(_completionPromise.getFuture().isReady());
+}
+
void ReshardingCoordinatorService::ReshardingCoordinator::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept {
ExecutorFuture<void>(**executor)
@@ -80,6 +86,12 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run(
.then([this] { _tellAllRecipientsToRefresh(); })
.then([this] { _tellAllDonorsToRefresh(); })
.onError([this](Status status) {
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (_completionPromise.getFuture().isReady()) {
+ // interrupt() was called before we got here.
+ return status;
+ }
+
_runUpdates(CoordinatorStateEnum::kError, _stateDoc);
LOGV2(4956902,
@@ -94,11 +106,38 @@ void ReshardingCoordinatorService::ReshardingCoordinator::run(
return status;
})
- .getAsync([](Status) {});
+ .getAsync([this](Status status) {
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (_completionPromise.getFuture().isReady()) {
+ // interrupt() was called before we got here.
+ return;
+ }
+
+ if (status.isOK()) {
+ _completionPromise.emplaceValue();
+ } else {
+ _completionPromise.setError(status);
+ }
+ });
+}
+
+void ReshardingCoordinatorService::ReshardingCoordinator::interrupt(Status status) {
+ // Resolve any unresolved promises to avoid hanging.
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (!_initialChunksAndZonesPromise.getFuture().isReady()) {
+ _initialChunksAndZonesPromise.setError(status);
+ }
+
+ _reshardingCoordinatorObserver->interrupt(status);
+
+ if (!_completionPromise.getFuture().isReady()) {
+ _completionPromise.setError(status);
+ }
}
void ReshardingCoordinatorService::ReshardingCoordinator::setInitialChunksAndZones(
std::vector<ChunkType> initialChunks, std::vector<TagsType> newZones) {
+ stdx::lock_guard<Latch> lg(_mutex);
if (_stateDoc.getState() > CoordinatorStateEnum::kInitializing ||
_initialChunksAndZonesPromise.getFuture().isReady()) {
return;
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h
index eae5b690692..95b8d19998a 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.h
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h
@@ -72,10 +72,19 @@ public:
: public PrimaryOnlyService::TypedInstance<ReshardingCoordinator> {
public:
explicit ReshardingCoordinator(const BSONObj& state);
+ ~ReshardingCoordinator();
void run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept override;
- void interrupt(Status status) override{};
+ void interrupt(Status status) override;
+
+ /**
+ * Returns a Future that will be resolved when all work associated with this Instance has
+ * completed running.
+ */
+ SharedSemiFuture<void> getCompletionFuture() const {
+ return _completionPromise.getFuture();
+ }
/**
* TODO(SERVER-50976) Report ReshardingCoordinators in currentOp().
@@ -196,11 +205,6 @@ public:
// collection. The object looks like: {_id: 'reshardingUUID'}
const InstanceID _id;
- // Promise containing the initial chunks and new zones based on the new shard key. These are
- // not a part of the state document, so must be set by configsvrReshardCollection after
- // construction.
- SharedPromise<ChunksAndZones> _initialChunksAndZonesPromise;
-
// Observes writes that indicate state changes for this resharding operation and notifies
// 'this' when all donors/recipients have entered some state so that 'this' can transition
// states.
@@ -208,6 +212,17 @@ public:
// The updated coordinator state document.
ReshardingCoordinatorDocument _stateDoc;
+
+ // Protects promises below.
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("ReshardingCoordinatorService::_mutex");
+
+ // Promise containing the initial chunks and new zones based on the new shard key. These are
+ // not a part of the state document, so must be set by configsvrReshardCollection after
+ // construction.
+ SharedPromise<ChunksAndZones> _initialChunksAndZonesPromise;
+
+ // Promise that is resolved when the chain of work kicked off by run() has completed.
+ SharedPromise<void> _completionPromise;
};
};
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp
index 7f82c599e96..76590d50bb0 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp
@@ -92,6 +92,13 @@ DonorStateMachine::DonorStateMachine(const BSONObj& donorDoc)
donorDoc)),
_id(_donorDoc.getCommonReshardingMetadata().get_id()) {}
+DonorStateMachine::~DonorStateMachine() {
+ stdx::lock_guard<Latch> lg(_mutex);
+ invariant(_allRecipientsDoneApplying.getFuture().isReady());
+ invariant(_coordinatorHasCommitted.getFuture().isReady());
+ invariant(_completionPromise.getFuture().isReady());
+}
+
void DonorStateMachine::run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept {
ExecutorFuture<void>(**executor)
.then([this] { _transitionState(DonorStateEnum::kPreparingToDonate); })
@@ -114,7 +121,35 @@ void DonorStateMachine::run(std::shared_ptr<executor::ScopedTaskExecutor> execut
this->_transitionStateToError(status);
return status;
})
- .getAsync([](Status) {});
+ .getAsync([this](Status status) {
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (_completionPromise.getFuture().isReady()) {
+ // interrupt() was called before we got here.
+ return;
+ }
+
+ if (status.isOK()) {
+ _completionPromise.emplaceValue();
+ } else {
+ _completionPromise.setError(status);
+ }
+ });
+}
+
+void DonorStateMachine::interrupt(Status status) {
+ // Resolve any unresolved promises to avoid hanging.
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (!_allRecipientsDoneApplying.getFuture().isReady()) {
+ _allRecipientsDoneApplying.setError(status);
+ }
+
+ if (!_coordinatorHasCommitted.getFuture().isReady()) {
+ _coordinatorHasCommitted.setError(status);
+ }
+
+ if (!_completionPromise.getFuture().isReady()) {
+ _completionPromise.setError(status);
+ }
}
void DonorStateMachine::onReshardingFieldsChanges(
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h
index 4e44ba5a110..09578ef2611 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.h
+++ b/src/mongo/db/s/resharding/resharding_donor_service.h
@@ -67,9 +67,19 @@ class DonorStateMachine final : public repl::PrimaryOnlyService::TypedInstance<D
public:
explicit DonorStateMachine(const BSONObj& donorDoc);
+ ~DonorStateMachine();
+
void run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept override;
- void interrupt(Status status) override{};
+ void interrupt(Status status) override;
+
+ /**
+ * Returns a Future that will be resolved when all work associated with this Instance has
+ * completed running.
+ */
+ SharedSemiFuture<void> getCompletionFuture() const {
+ return _completionPromise.getFuture();
+ }
/**
* TODO(SERVER-50978) Report ReshardingDonorService Instances in currentOp().
@@ -109,14 +119,19 @@ private:
// config.localReshardingOperations.donor.
ReshardingDonorDocument _donorDoc;
+ // The id both for the resharding operation and for the primary-only-service instance.
+ const UUID _id;
+
+ // Protects the promises below
+ Mutex _mutex = MONGO_MAKE_LATCH("ReshardingDonor::_mutex");
+
// Each promise below corresponds to a state on the donor state machine. They are listed in
// ascending order, such that the first promise below will be the first promise fulfilled.
SharedPromise<void> _allRecipientsDoneApplying;
SharedPromise<void> _coordinatorHasCommitted;
- // The id both for the resharding operation and for the primary-only-service instance.
- const UUID _id;
+ SharedPromise<void> _completionPromise;
};
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index dfe9272c183..b80775b6c46 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -48,6 +48,14 @@ RecipientStateMachine::RecipientStateMachine(const BSONObj& recipientDoc)
IDLParserErrorContext("ReshardingRecipientDocument"), recipientDoc)),
_id(_recipientDoc.getCommonReshardingMetadata().get_id()) {}
+RecipientStateMachine::~RecipientStateMachine() {
+ stdx::lock_guard<Latch> lg(_mutex);
+ invariant(_allDonorsPreparedToDonate.getFuture().isReady());
+ invariant(_allDonorsMirroring.getFuture().isReady());
+ invariant(_coordinatorHasCommitted.getFuture().isReady());
+ invariant(_completionPromise.getFuture().isReady());
+}
+
void RecipientStateMachine::run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept {
ExecutorFuture<void>(**executor)
.then([this] { _createTemporaryReshardingCollectionThenTransitionToInitialized(); })
@@ -74,7 +82,40 @@ void RecipientStateMachine::run(std::shared_ptr<executor::ScopedTaskExecutor> ex
this->_transitionStateToError(status);
return status;
})
- .getAsync([](Status) {});
+ .getAsync([this](Status status) {
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (_completionPromise.getFuture().isReady()) {
+ // interrupt() was called before we got her.e
+ return;
+ }
+
+ if (status.isOK()) {
+ _completionPromise.emplaceValue();
+ } else {
+ // Set error on all promises
+ _completionPromise.setError(status);
+ }
+ });
+}
+
+void RecipientStateMachine::interrupt(Status status) {
+ // Resolve any unresolved promises to avoid hanging.
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (!_allDonorsPreparedToDonate.getFuture().isReady()) {
+ _allDonorsPreparedToDonate.setError(status);
+ }
+
+ if (!_allDonorsMirroring.getFuture().isReady()) {
+ _allDonorsMirroring.setError(status);
+ }
+
+ if (!_coordinatorHasCommitted.getFuture().isReady()) {
+ _coordinatorHasCommitted.setError(status);
+ }
+
+ if (!_completionPromise.getFuture().isReady()) {
+ _completionPromise.setError(status);
+ }
}
void onReshardingFieldsChanges(boost::optional<TypeCollectionReshardingFields> reshardingFields) {}
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index b3259719183..8435e15071b 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -68,9 +68,19 @@ class RecipientStateMachine final
public:
explicit RecipientStateMachine(const BSONObj& recipientDoc);
+ ~RecipientStateMachine();
+
void run(std::shared_ptr<executor::ScopedTaskExecutor> executor) noexcept override;
- void interrupt(Status status) override{};
+ void interrupt(Status status) override;
+
+ /**
+ * Returns a Future that will be resolved when all work associated with this Instance has
+ * completed running.
+ */
+ SharedSemiFuture<void> getCompletionFuture() const {
+ return _completionPromise.getFuture();
+ }
/**
* TODO(SERVER-51021) Report ReshardingRecipientService Instances in currentOp().
@@ -119,6 +129,12 @@ private:
// config.localReshardingOperations.recipient.
ReshardingRecipientDocument _recipientDoc;
+ // The id both for the resharding operation and for the primary-only-service instance.
+ const UUID _id;
+
+ // Protects the promises below
+ Mutex _mutex = MONGO_MAKE_LATCH("ReshardingRecipient::_mutex");
+
// Each promise below corresponds to a state on the recipient state machine. They are listed in
// ascending order, such that the first promise below will be the first promise fulfilled.
SharedPromise<Timestamp> _allDonorsPreparedToDonate;
@@ -127,8 +143,7 @@ private:
SharedPromise<void> _coordinatorHasCommitted;
- // The id both for the resharding operation and for the primary-only-service instance.
- const UUID _id;
+ SharedPromise<void> _completionPromise;
};
} // namespace mongo