diff options
author | Haley Connelly <haley.connelly@mongodb.com> | 2020-12-07 20:30:40 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-12-16 19:48:04 +0000 |
commit | 85b63642b03e4b09a89317692d671945c304b459 (patch) | |
tree | 4bdd977c055c35e70c92350abe4f7e379ae3e793 /src/mongo | |
parent | 36e5f97f0bc9f51d2ba69d19b76ce6ffda06f864 (diff) | |
download | mongo-85b63642b03e4b09a89317692d671945c304b459.tar.gz |
SERVER-51130 Implement onReshardingFieldsChanges for donor and recipient state machines
SERVER-53173 Create jstest to verify that resharding participants have removed all disk metadata
Diffstat (limited to 'src/mongo')
6 files changed, 114 insertions, 53 deletions
diff --git a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp index d22f559b15b..7aef9649de5 100644 --- a/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp +++ b/src/mongo/db/s/config/configsvr_reshard_collection_cmd.cpp @@ -220,21 +220,19 @@ public: instance->setInitialChunksAndZones(std::move(initialChunks), std::move(newZones)); - instance->getObserver()->awaitAllDonorsReadyToDonate().wait(opCtx); - - instance->getObserver()->awaitAllRecipientsFinishedCloning().wait(opCtx); - if (resharding::gReshardingTempInterruptBeforeOplogApplication) { + instance->getObserver()->awaitAllDonorsReadyToDonate().wait(opCtx); + instance->getObserver()->awaitAllRecipientsFinishedCloning().wait(opCtx); + // This promise is currently automatically filled by recipient shards after creating // the temporary resharding collection. instance->getObserver()->awaitAllRecipientsFinishedApplying().wait(opCtx); + + instance->interrupt( + {ErrorCodes::InternalError, "Artificial interruption to enable jsTests"}); } else { - instance->getObserver()->awaitAllRecipientsFinishedApplying().wait(opCtx); - instance->getObserver()->awaitAllRecipientsInStrictConsistency().wait(opCtx); + instance->getCompletionFuture().get(opCtx); } - - instance->interrupt( - {ErrorCodes::InternalError, "Artificial interruption to enable jsTests"}); } private: diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 22660d84dd2..98740683694 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -54,6 +54,8 @@ namespace mongo { namespace { +using namespace fmt::literals; + MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorInSteadyState); MONGO_FAIL_POINT_DEFINE(reshardingPauseCoordinatorBeforeCommit); @@ -614,7 +616,9 @@ void persistStateTransitionAndCatalogUpdatesThenBumpShardVersions( invariant(notifyForStateTransition.find(nextState) != notifyForStateTransition.end()); // TODO SERVER-51800 Remove special casing for kError. invariant(nextState == CoordinatorStateEnum::kError || - notifyForStateTransition[nextState] != ParticipantsToNotifyEnum::kNone); + notifyForStateTransition[nextState] != ParticipantsToNotifyEnum::kNone, + "failed to persist state transition with nextState {}"_format( + CoordinatorState_serializer(nextState))); // Resharding metadata changes to be executed. auto changeMetadataFunc = [&](OperationContext* opCtx, TxnNumber txnNumber) { @@ -711,10 +715,13 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run( return; }) .then([this, executor] { _tellAllParticipantsToRefresh(executor); }) - .then([this, executor] { + .then([this, self = shared_from_this(), executor] { + // The shared_ptr maintaining the ReshardingCoordinatorService Instance object gets + // deleted from the PrimaryOnlyService's map. Thus, shared_from_this() is necessary to + // keep 'this' pointer alive for the remaining callbacks. return _awaitAllParticipantShardsRenamedOrDroppedOriginalCollection(executor); }) - .onError([this, executor](Status status) { + .onError([this, self = shared_from_this(), executor](Status status) { stdx::lock_guard<Latch> lg(_mutex); if (_completionPromise.getFuture().isReady()) { // interrupt() was called before we got here. @@ -736,7 +743,7 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run( return status; }) - .onCompletion([this](Status status) { + .onCompletion([this, self = shared_from_this()](Status status) { stdx::lock_guard<Latch> lg(_mutex); if (_completionPromise.getFuture().isReady()) { // interrupt() was called before we got here. diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index cd07485cca4..15fa5aa4a2c 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -109,6 +109,15 @@ Timestamp generateMinFetchTimestamp(const ReshardingDonorDocument& donorDoc) { return generatedOpTime.getTimestamp(); } + +/** + * Fulfills the promise if it is not already. Otherwise, does nothing. + */ +void ensureFulfilledPromise(WithLock lk, SharedPromise<void>& sp) { + if (!sp.getFuture().isReady()) { + sp.emplaceValue(); + } +} } // namespace std::shared_ptr<repl::PrimaryOnlyService::Instance> ReshardingDonorService::constructInstance( @@ -146,8 +155,13 @@ SemiFuture<void> ReshardingDonorService::DonorStateMachine::run( .then([this, executor] { return _awaitCoordinatorHasCommittedThenTransitionToDropping(executor); }) - .then([this] { return _dropOriginalCollectionThenDeleteLocalState(); }) - .onError([this](Status status) { + .then([this, self = shared_from_this()] { + // After this line, the shared_ptr stored in the PrimaryOnlyService's map for + // the ReshardingDonorService Instance is removed. It is necessary to use + // shared_from_this() to extend the lifetime for the remaining callbacks. + return _dropOriginalCollectionThenDeleteLocalState(); + }) + .onError([this, self = shared_from_this()](Status status) { LOGV2(4956400, "Resharding operation donor state machine failed", "namespace"_attr = _donorDoc.getNss().ns(), @@ -155,10 +169,10 @@ SemiFuture<void> ReshardingDonorService::DonorStateMachine::run( "error"_attr = status); // TODO SERVER-50584 Report errors to the coordinator so that the resharding operation // can be aborted. - this->_transitionStateToError(status); + _transitionStateToError(status); return status; }) - .onCompletion([this](Status status) { + .onCompletion([this, self = shared_from_this()](Status status) { stdx::lock_guard<Latch> lg(_mutex); if (_completionPromise.getFuture().isReady()) { // interrupt() was called before we got here. @@ -196,7 +210,30 @@ void ReshardingDonorService::DonorStateMachine::interrupt(Status status) { } void ReshardingDonorService::DonorStateMachine::onReshardingFieldsChanges( - boost::optional<TypeCollectionReshardingFields> reshardingFields) {} + const TypeCollectionReshardingFields& reshardingFields) { + auto coordinatorState = reshardingFields.getState(); + if (coordinatorState == CoordinatorStateEnum::kError) { + // TODO SERVER-52838: Investigate if we want to have a special error code so the donor knows + // when it has recieved the error from the coordinator rather than needing to report an + // error to the coordinator. + interrupt({ErrorCodes::InternalError, + "ReshardingDonorService observed CoordinatorStateEnum::kError"}); + return; + } + + stdx::lock_guard<Latch> lk(_mutex); + if (coordinatorState >= CoordinatorStateEnum::kApplying) { + ensureFulfilledPromise(lk, _allRecipientsDoneCloning); + } + + if (coordinatorState >= CoordinatorStateEnum::kMirroring) { + ensureFulfilledPromise(lk, _allRecipientsDoneApplying); + } + + if (coordinatorState >= CoordinatorStateEnum::kCommitted) { + ensureFulfilledPromise(lk, _coordinatorHasCommitted); + } +} void ReshardingDonorService::DonorStateMachine:: _onPreparingToDonateCalculateTimestampThenTransitionToDonatingInitialData() { @@ -217,17 +254,6 @@ void ReshardingDonorService::DonorStateMachine:: auto minFetchTimestamp = generateMinFetchTimestamp(_donorDoc); _transitionStateAndUpdateCoordinator(DonorStateEnum::kDonatingInitialData, minFetchTimestamp); - - // Unless a test is willing to leak the contents of the config.localReshardingOperations.donor - // collection, without this interrupt(), an invariant would be hit from - // _allRecipientsDoneCloning not being ready when this DonorStateMachine is being destructed. - // - // TODO SERVER-51130: Move this interrupt() to after _transitionState(kDonatingOplogEntries) - // once the donor shards learn from the coordinator when all recipient shards have finished - // cloning. - if (resharding::gReshardingTempInterruptBeforeOplogApplication) { - interrupt({ErrorCodes::InternalError, "Artificial interruption to enable jsTests"}); - } } ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: @@ -239,6 +265,16 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: return _allRecipientsDoneCloning.getFuture().thenRunOn(**executor).then([this]() { _transitionState(DonorStateEnum::kDonatingOplogEntries); + + // Unless a test is willing to leak the contents of the + // config.localReshardingOperations.donor collection, without this interrupt(), an invariant + // would be hit from _allRecipientsDoneCloning not being ready when this DonorStateMachine + // is being destructed. + // + // TODO SERVER-53372: Remove this interrupt altogether. + if (resharding::gReshardingTempInterruptBeforeOplogApplication) { + interrupt({ErrorCodes::InternalError, "Artificial interruption to enable jsTests"}); + } }); } @@ -261,7 +297,6 @@ void ReshardingDonorService::DonorStateMachine:: } _transitionState(DonorStateEnum::kMirroring); - interrupt({ErrorCodes::InternalError, "Artificial interruption to enable jsTests"}); } ExecutorFuture<void> diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h index 32775d91f20..f131f8d1d1c 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.h +++ b/src/mongo/db/s/resharding/resharding_donor_service.h @@ -95,8 +95,7 @@ public: return boost::none; } - void onReshardingFieldsChanges( - boost::optional<TypeCollectionReshardingFields> reshardingFields); + void onReshardingFieldsChanges(const TypeCollectionReshardingFields& reshardingFields); private: // The following functions correspond to the actions to take at a particular donor state. diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index 33686ba261b..9e7a4ffac0b 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -87,6 +87,14 @@ ChunkManager getShardedCollectionRoutingInfo(OperationContext* opCtx, const Name return cm; } +/** + * Fulfills the promise if it is not already. Otherwise, does nothing. + */ +void ensureFulfilledPromise(WithLock lk, SharedPromise<void>& sp) { + if (!sp.getFuture().isReady()) { + sp.emplaceValue(); + } +} } // namespace namespace resharding { @@ -161,7 +169,6 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine( ReshardingRecipientService::RecipientStateMachine::~RecipientStateMachine() { stdx::lock_guard<Latch> lg(_mutex); - invariant(_allDonorsMirroring.getFuture().isReady()); invariant(_coordinatorHasCommitted.getFuture().isReady()); invariant(_completionPromise.getFuture().isReady()); } @@ -180,8 +187,13 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run( .then([this, executor] { return _awaitCoordinatorHasCommittedThenTransitionToRenaming(executor); }) - .then([this] { _renameTemporaryReshardingCollectionThenDeleteLocalState(); }) - .onError([this](Status status) { + .then([this, self = shared_from_this()] { + // After this line, the shared_ptr stored in the PrimaryOnlyService's map for + // the ReshardingRecipientService Instance is removed. It is necessary to use + // shared_from_this() to extend the lifetime for the remaining callbacks. + _renameTemporaryReshardingCollectionThenDeleteLocalState(); + }) + .onError([this, self = shared_from_this()](Status status) { LOGV2(4956500, "Resharding operation recipient state machine failed", "namespace"_attr = _recipientDoc.getNss().ns(), @@ -189,10 +201,10 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run( "error"_attr = status); // TODO SERVER-50584 Report errors to the coordinator so that the resharding operation // can be aborted. - this->_transitionStateToError(status); + _transitionStateToError(status); return status; }) - .onCompletion([this](Status status) { + .onCompletion([this, self = shared_from_this()](Status status) { stdx::lock_guard<Latch> lg(_mutex); if (_completionPromise.getFuture().isReady()) { // interrupt() was called before we got here. @@ -228,10 +240,6 @@ void ReshardingRecipientService::RecipientStateMachine::interrupt(Status status) threadPool->shutdown(); } - if (!_allDonorsMirroring.getFuture().isReady()) { - _allDonorsMirroring.setError(status); - } - if (!_coordinatorHasCommitted.getFuture().isReady()) { _coordinatorHasCommitted.setError(status); } @@ -242,7 +250,22 @@ void ReshardingRecipientService::RecipientStateMachine::interrupt(Status status) } void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChanges( - boost::optional<TypeCollectionReshardingFields> reshardingFields) {} + const TypeCollectionReshardingFields& reshardingFields) { + auto coordinatorState = reshardingFields.getState(); + if (coordinatorState == CoordinatorStateEnum::kError) { + // TODO SERVER-52838: Investigate if we want to have a special error code so the recipient + // knows when it has recieved the error from the coordinator rather than needing to report + // an error to the coordinator. + interrupt({ErrorCodes::InternalError, + "ReshardingDonorService observed CoordinatorStateEnum::kError"}); + return; + } + + stdx::lock_guard<Latch> lk(_mutex); + if (coordinatorState >= CoordinatorStateEnum::kCommitted) { + ensureFulfilledPromise(lk, _coordinatorHasCommitted); + } +} void ReshardingRecipientService::RecipientStateMachine:: _transitionToCreatingTemporaryReshardingCollection() { @@ -460,7 +483,6 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: return whenAllSucceed(std::move(futuresToWaitOn)).thenRunOn(**executor).then([this] { _transitionStateAndUpdateCoordinator(RecipientStateEnum::kStrictConsistency); - interrupt({ErrorCodes::InternalError, "Artificial interruption to enable jsTests"}); }); } @@ -483,14 +505,17 @@ void ReshardingRecipientService::RecipientStateMachine:: return; } - auto opCtx = cc().makeOperationContext(); + { + auto opCtx = cc().makeOperationContext(); - auto reshardingNss = constructTemporaryReshardingNss(_recipientDoc.getNss().db(), - _recipientDoc.getExistingUUID()); + auto reshardingNss = constructTemporaryReshardingNss(_recipientDoc.getNss().db(), + _recipientDoc.getExistingUUID()); - RenameCollectionOptions options; - options.dropTarget = true; - uassertStatusOK(renameCollection(opCtx.get(), reshardingNss, _recipientDoc.getNss(), options)); + RenameCollectionOptions options; + options.dropTarget = true; + uassertStatusOK( + renameCollection(opCtx.get(), reshardingNss, _recipientDoc.getNss(), options)); + } _transitionStateAndUpdateCoordinator(RecipientStateEnum::kDone); } diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index 9856fca8703..107ac10deac 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -116,8 +116,7 @@ public: return boost::none; } - void onReshardingFieldsChanges( - boost::optional<TypeCollectionReshardingFields> reshardingFields); + void onReshardingFieldsChanges(const TypeCollectionReshardingFields& reshardingFields); private: // The following functions correspond to the actions to take at a particular recipient state. @@ -182,8 +181,6 @@ private: // Each promise below corresponds to a state on the recipient state machine. They are listed in // ascending order, such that the first promise below will be the first promise fulfilled. - SharedPromise<void> _allDonorsMirroring; - SharedPromise<void> _coordinatorHasCommitted; SharedPromise<void> _completionPromise; |