summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2020-12-07 20:30:40 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-12-16 19:48:04 +0000
commit85b63642b03e4b09a89317692d671945c304b459 (patch)
tree4bdd977c055c35e70c92350abe4f7e379ae3e793 /src/mongo
parent36e5f97f0bc9f51d2ba69d19b76ce6ffda06f864 (diff)
downloadmongo-85b63642b03e4b09a89317692d671945c304b459.tar.gz
SERVER-51130 Implement onReshardingFieldsChanges for donor and recipient state machines
SERVER-53173 Create jstest to verify that resharding participants have removed all disk metadata
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp16
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp15
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.cpp69
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_service.h3
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp59
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h5
6 files changed, 114 insertions, 53 deletions
diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
index d22f559b15b..7aef9649de5 100644
--- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
+++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp
@@ -220,21 +220,19 @@ public:
instance->setInitialChunksAndZones(std::move(initialChunks), std::move(newZones));
- instance->getObserver()->awaitAllDonorsReadyToDonate().wait(opCtx);
-
- instance->getObserver()->awaitAllRecipientsFinishedCloning().wait(opCtx);
-
if (resharding::gReshardingTempInterruptBeforeOplogApplication) {
+ instance->getObserver()->awaitAllDonorsReadyToDonate().wait(opCtx);
+ instance->getObserver()->awaitAllRecipientsFinishedCloning().wait(opCtx);
+
// This promise is currently automatically filled by recipient shards after creating
// the temporary resharding collection.
instance->getObserver()->awaitAllRecipientsFinishedApplying().wait(opCtx);
+
+ instance->interrupt(
+ {ErrorCodes::InternalError, "Artificial interruption to enable jsTests"});
} else {
- instance->getObserver()->awaitAllRecipientsFinishedApplying().wait(opCtx);
- instance->getObserver()->awaitAllRecipientsInStrictConsistency().wait(opCtx);
+ instance->getCompletionFuture().get(opCtx);
}
-
- instance->interrupt(
- {ErrorCodes::InternalError, "Artificial interruption to enable jsTests"});
}
private:
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 22660d84dd2..98740683694 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -54,6 +54,8 @@
namespace mongo {
namespace {
+using namespace fmt::literals;
+
MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorInSteadyState);
MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforeCommit);
@@ -614,7 +616,9 @@ void persistStateTransitionAndCatalogUpdatesThenBumpShardVersions(
invariant(notifyForStateTransition.find(nextState) != notifyForStateTransition.end());
// TODO SERVER-51800 Remove special casing for kError.
invariant(nextState == CoordinatorStateEnum::kError ||
- notifyForStateTransition[nextState] != ParticipantsToNotifyEnum::kNone);
+ notifyForStateTransition[nextState] != ParticipantsToNotifyEnum::kNone,
+ "failed to persist state transition with nextState {}"_format(
+ CoordinatorState_serializer(nextState)));
// Resharding metadata changes to be executed.
auto changeMetadataFunc = [&](OperationContext* opCtx, TxnNumber txnNumber) {
@@ -711,10 +715,13 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
return;
})
.then([this, executor] { _tellAllParticipantsToRefresh(executor); })
- .then([this, executor] {
+ .then([this, self = shared_from_this(), executor] {
+ // The shared_ptr maintaining the ReshardingCoordinatorService Instance object gets
+ // deleted from the PrimaryOnlyService's map. Thus, shared_from_this() is necessary to
+ // keep 'this' pointer alive for the remaining callbacks.
return _awaitAllParticipantShardsRenamedOrDroppedOriginalCollection(executor);
})
- .onError([this, executor](Status status) {
+ .onError([this, self = shared_from_this(), executor](Status status) {
stdx::lock_guard<Latch> lg(_mutex);
if (_completionPromise.getFuture().isReady()) {
// interrupt() was called before we got here.
@@ -736,7 +743,7 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
return status;
})
- .onCompletion([this](Status status) {
+ .onCompletion([this, self = shared_from_this()](Status status) {
stdx::lock_guard<Latch> lg(_mutex);
if (_completionPromise.getFuture().isReady()) {
// interrupt() was called before we got here.
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp
index cd07485cca4..15fa5aa4a2c 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp
@@ -109,6 +109,15 @@ Timestamp generateMinFetchTimestamp(const ReshardingDonorDocument& donorDoc) {
return generatedOpTime.getTimestamp();
}
+
+/**
+ * Fulfills the promise if it is not already. Otherwise, does nothing.
+ */
+void ensureFulfilledPromise(WithLock lk, SharedPromise<void>& sp) {
+ if (!sp.getFuture().isReady()) {
+ sp.emplaceValue();
+ }
+}
} // namespace
std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingDonorService::constructInstance(
@@ -146,8 +155,13 @@ SemiFuture<void> ReshardingDonorService::DonorStateMachine::run(
.then([this, executor] {
return _awaitCoordinatorHasCommittedThenTransitionToDropping(executor);
})
- .then([this] { return _dropOriginalCollectionThenDeleteLocalState(); })
- .onError([this](Status status) {
+ .then([this, self = shared_from_this()] {
+ // After this line, the shared_ptr stored in the PrimaryOnlyService's map for
+ // the ReshardingDonorService Instance is removed. It is necessary to use
+ // shared_from_this() to extend the lifetime for the remaining callbacks.
+ return _dropOriginalCollectionThenDeleteLocalState();
+ })
+ .onError([this, self = shared_from_this()](Status status) {
LOGV2(4956400,
"Resharding operation donor state machine failed",
"namespace"_attr = _donorDoc.getNss().ns(),
@@ -155,10 +169,10 @@ SemiFuture<void> ReshardingDonorService::DonorStateMachine::run(
"error"_attr = status);
// TODO SERVER-50584 Report errors to the coordinator so that the resharding operation
// can be aborted.
- this->_transitionStateToError(status);
+ _transitionStateToError(status);
return status;
})
- .onCompletion([this](Status status) {
+ .onCompletion([this, self = shared_from_this()](Status status) {
stdx::lock_guard<Latch> lg(_mutex);
if (_completionPromise.getFuture().isReady()) {
// interrupt() was called before we got here.
@@ -196,7 +210,30 @@ void ReshardingDonorService::DonorStateMachine::interrupt(Status status) {
}
void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges(
- boost::optional<TypeCollectionReshardingFields> reshardingFields) {}
+ const TypeCollectionReshardingFields& reshardingFields) {
+ auto coordinatorState = reshardingFields.getState();
+ if (coordinatorState == CoordinatorStateEnum::kError) {
+ // TODO SERVER-52838: Investigate if we want to have a special error code so the donor knows
+ // when it has recieved the error from the coordinator rather than needing to report an
+ // error to the coordinator.
+ interrupt({ErrorCodes::InternalError,
+ "ReshardingDonorService observed CoordinatorStateEnum::kError"});
+ return;
+ }
+
+ stdx::lock_guard<Latch> lk(_mutex);
+ if (coordinatorState >= CoordinatorStateEnum::kApplying) {
+ ensureFulfilledPromise(lk, _allRecipientsDoneCloning);
+ }
+
+ if (coordinatorState >= CoordinatorStateEnum::kMirroring) {
+ ensureFulfilledPromise(lk, _allRecipientsDoneApplying);
+ }
+
+ if (coordinatorState >= CoordinatorStateEnum::kCommitted) {
+ ensureFulfilledPromise(lk, _coordinatorHasCommitted);
+ }
+}
void ReshardingDonorService::DonorStateMachine::
_onPreparingToDonateCalculateTimestampThenTransitionToDonatingInitialData() {
@@ -217,17 +254,6 @@ void ReshardingDonorService::DonorStateMachine::
auto minFetchTimestamp = generateMinFetchTimestamp(_donorDoc);
_transitionStateAndUpdateCoordinator(DonorStateEnum::kDonatingInitialData, minFetchTimestamp);
-
- // Unless a test is willing to leak the contents of the config.localReshardingOperations.donor
- // collection, without this interrupt(), an invariant would be hit from
- // _allRecipientsDoneCloning not being ready when this DonorStateMachine is being destructed.
- //
- // TODO SERVER-51130: Move this interrupt() to after _transitionState(kDonatingOplogEntries)
- // once the donor shards learn from the coordinator when all recipient shards have finished
- // cloning.
- if (resharding::gReshardingTempInterruptBeforeOplogApplication) {
- interrupt({ErrorCodes::InternalError, "Artificial interruption to enable jsTests"});
- }
}
ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
@@ -239,6 +265,16 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::
return _allRecipientsDoneCloning.getFuture().thenRunOn(**executor).then([this]() {
_transitionState(DonorStateEnum::kDonatingOplogEntries);
+
+ // Unless a test is willing to leak the contents of the
+ // config.localReshardingOperations.donor collection, without this interrupt(), an invariant
+ // would be hit from _allRecipientsDoneCloning not being ready when this DonorStateMachine
+ // is being destructed.
+ //
+ // TODO SERVER-53372: Remove this interrupt altogether.
+ if (resharding::gReshardingTempInterruptBeforeOplogApplication) {
+ interrupt({ErrorCodes::InternalError, "Artificial interruption to enable jsTests"});
+ }
});
}
@@ -261,7 +297,6 @@ void ReshardingDonorService::DonorStateMachine::
}
_transitionState(DonorStateEnum::kMirroring);
- interrupt({ErrorCodes::InternalError, "Artificial interruption to enable jsTests"});
}
ExecutorFuture<void>
diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h
index 32775d91f20..f131f8d1d1c 100644
--- a/src/mongo/db/s/resharding/resharding_donor_service.h
+++ b/src/mongo/db/s/resharding/resharding_donor_service.h
@@ -95,8 +95,7 @@ public:
return boost::none;
}
- void onReshardingFieldsChanges(
- boost::optional<TypeCollectionReshardingFields> reshardingFields);
+ void onReshardingFieldsChanges(const TypeCollectionReshardingFields& reshardingFields);
private:
// The following functions correspond to the actions to take at a particular donor state.
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index 33686ba261b..9e7a4ffac0b 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -87,6 +87,14 @@ ChunkManager getShardedCollectionRoutingInfo(OperationContext* opCtx, const Name
return cm;
}
+/**
+ * Fulfills the promise if it is not already. Otherwise, does nothing.
+ */
+void ensureFulfilledPromise(WithLock lk, SharedPromise<void>& sp) {
+ if (!sp.getFuture().isReady()) {
+ sp.emplaceValue();
+ }
+}
} // namespace
namespace resharding {
@@ -161,7 +169,6 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine(
ReshardingRecipientService::RecipientStateMachine::~RecipientStateMachine() {
stdx::lock_guard<Latch> lg(_mutex);
- invariant(_allDonorsMirroring.getFuture().isReady());
invariant(_coordinatorHasCommitted.getFuture().isReady());
invariant(_completionPromise.getFuture().isReady());
}
@@ -180,8 +187,13 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run(
.then([this, executor] {
return _awaitCoordinatorHasCommittedThenTransitionToRenaming(executor);
})
- .then([this] { _renameTemporaryReshardingCollectionThenDeleteLocalState(); })
- .onError([this](Status status) {
+ .then([this, self = shared_from_this()] {
+ // After this line, the shared_ptr stored in the PrimaryOnlyService's map for
+ // the ReshardingRecipientService Instance is removed. It is necessary to use
+ // shared_from_this() to extend the lifetime for the remaining callbacks.
+ _renameTemporaryReshardingCollectionThenDeleteLocalState();
+ })
+ .onError([this, self = shared_from_this()](Status status) {
LOGV2(4956500,
"Resharding operation recipient state machine failed",
"namespace"_attr = _recipientDoc.getNss().ns(),
@@ -189,10 +201,10 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run(
"error"_attr = status);
// TODO SERVER-50584 Report errors to the coordinator so that the resharding operation
// can be aborted.
- this->_transitionStateToError(status);
+ _transitionStateToError(status);
return status;
})
- .onCompletion([this](Status status) {
+ .onCompletion([this, self = shared_from_this()](Status status) {
stdx::lock_guard<Latch> lg(_mutex);
if (_completionPromise.getFuture().isReady()) {
// interrupt() was called before we got here.
@@ -228,10 +240,6 @@ void ReshardingRecipientService::RecipientStateMachine::interrupt(Status status)
threadPool->shutdown();
}
- if (!_allDonorsMirroring.getFuture().isReady()) {
- _allDonorsMirroring.setError(status);
- }
-
if (!_coordinatorHasCommitted.getFuture().isReady()) {
_coordinatorHasCommitted.setError(status);
}
@@ -242,7 +250,22 @@ void ReshardingRecipientService::RecipientStateMachine::interrupt(Status status)
}
void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChanges(
- boost::optional<TypeCollectionReshardingFields> reshardingFields) {}
+ const TypeCollectionReshardingFields& reshardingFields) {
+ auto coordinatorState = reshardingFields.getState();
+ if (coordinatorState == CoordinatorStateEnum::kError) {
+ // TODO SERVER-52838: Investigate if we want to have a special error code so the recipient
+ // knows when it has recieved the error from the coordinator rather than needing to report
+ // an error to the coordinator.
+ interrupt({ErrorCodes::InternalError,
+ "ReshardingDonorService observed CoordinatorStateEnum::kError"});
+ return;
+ }
+
+ stdx::lock_guard<Latch> lk(_mutex);
+ if (coordinatorState >= CoordinatorStateEnum::kCommitted) {
+ ensureFulfilledPromise(lk, _coordinatorHasCommitted);
+ }
+}
void ReshardingRecipientService::RecipientStateMachine::
_transitionToCreatingTemporaryReshardingCollection() {
@@ -460,7 +483,6 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
return whenAllSucceed(std::move(futuresToWaitOn)).thenRunOn(**executor).then([this] {
_transitionStateAndUpdateCoordinator(RecipientStateEnum::kStrictConsistency);
- interrupt({ErrorCodes::InternalError, "Artificial interruption to enable jsTests"});
});
}
@@ -483,14 +505,17 @@ void ReshardingRecipientService::RecipientStateMachine::
return;
}
- auto opCtx = cc().makeOperationContext();
+ {
+ auto opCtx = cc().makeOperationContext();
- auto reshardingNss = constructTemporaryReshardingNss(_recipientDoc.getNss().db(),
- _recipientDoc.getExistingUUID());
+ auto reshardingNss = constructTemporaryReshardingNss(_recipientDoc.getNss().db(),
+ _recipientDoc.getExistingUUID());
- RenameCollectionOptions options;
- options.dropTarget = true;
- uassertStatusOK(renameCollection(opCtx.get(), reshardingNss, _recipientDoc.getNss(), options));
+ RenameCollectionOptions options;
+ options.dropTarget = true;
+ uassertStatusOK(
+ renameCollection(opCtx.get(), reshardingNss, _recipientDoc.getNss(), options));
+ }
_transitionStateAndUpdateCoordinator(RecipientStateEnum::kDone);
}
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index 9856fca8703..107ac10deac 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -116,8 +116,7 @@ public:
return boost::none;
}
- void onReshardingFieldsChanges(
- boost::optional<TypeCollectionReshardingFields> reshardingFields);
+ void onReshardingFieldsChanges(const TypeCollectionReshardingFields& reshardingFields);
private:
// The following functions correspond to the actions to take at a particular recipient state.
@@ -182,8 +181,6 @@ private:
// Each promise below corresponds to a state on the recipient state machine. They are listed in
// ascending order, such that the first promise below will be the first promise fulfilled.
- SharedPromise<void> _allDonorsMirroring;
-
SharedPromise<void> _coordinatorHasCommitted;
SharedPromise<void> _completionPromise;