diff options
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_recipient_service.cpp')
-rw-r--r-- | src/mongo/db/s/resharding/resharding_recipient_service.cpp | 59 |
1 files changed, 42 insertions, 17 deletions
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index 33686ba261b..9e7a4ffac0b 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -87,6 +87,14 @@ ChunkManager getShardedCollectionRoutingInfo(OperationContext* opCtx, const Name return cm; } +/** + * Fulfills the promise if it is not already. Otherwise, does nothing. + */ +void ensureFulfilledPromise(WithLock lk, SharedPromise<void>& sp) { + if (!sp.getFuture().isReady()) { + sp.emplaceValue(); + } +} } // namespace namespace resharding { @@ -161,7 +169,6 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine( ReshardingRecipientService::RecipientStateMachine::~RecipientStateMachine() { stdx::lock_guard<Latch> lg(_mutex); - invariant(_allDonorsMirroring.getFuture().isReady()); invariant(_coordinatorHasCommitted.getFuture().isReady()); invariant(_completionPromise.getFuture().isReady()); } @@ -180,8 +187,13 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run( .then([this, executor] { return _awaitCoordinatorHasCommittedThenTransitionToRenaming(executor); }) - .then([this] { _renameTemporaryReshardingCollectionThenDeleteLocalState(); }) - .onError([this](Status status) { + .then([this, self = shared_from_this()] { + // After this line, the shared_ptr stored in the PrimaryOnlyService's map for + // the ReshardingRecipientService Instance is removed. It is necessary to use + // shared_from_this() to extend the lifetime for the remaining callbacks. + _renameTemporaryReshardingCollectionThenDeleteLocalState(); + }) + .onError([this, self = shared_from_this()](Status status) { LOGV2(4956500, "Resharding operation recipient state machine failed", "namespace"_attr = _recipientDoc.getNss().ns(), @@ -189,10 +201,10 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run( "error"_attr = status); // TODO SERVER-50584 Report errors to the coordinator so that the resharding operation // can be aborted. - this->_transitionStateToError(status); + _transitionStateToError(status); return status; }) - .onCompletion([this](Status status) { + .onCompletion([this, self = shared_from_this()](Status status) { stdx::lock_guard<Latch> lg(_mutex); if (_completionPromise.getFuture().isReady()) { // interrupt() was called before we got here. @@ -228,10 +240,6 @@ void ReshardingRecipientService::RecipientStateMachine::interrupt(Status status) threadPool->shutdown(); } - if (!_allDonorsMirroring.getFuture().isReady()) { - _allDonorsMirroring.setError(status); - } - if (!_coordinatorHasCommitted.getFuture().isReady()) { _coordinatorHasCommitted.setError(status); } @@ -242,7 +250,22 @@ void ReshardingRecipientService::RecipientStateMachine::interrupt(Status status) } void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChanges( - boost::optional<TypeCollectionReshardingFields> reshardingFields) {} + const TypeCollectionReshardingFields& reshardingFields) { + auto coordinatorState = reshardingFields.getState(); + if (coordinatorState == CoordinatorStateEnum::kError) { + // TODO SERVER-52838: Investigate if we want to have a special error code so the recipient + // knows when it has recieved the error from the coordinator rather than needing to report + // an error to the coordinator. + interrupt({ErrorCodes::InternalError, + "ReshardingDonorService observed CoordinatorStateEnum::kError"}); + return; + } + + stdx::lock_guard<Latch> lk(_mutex); + if (coordinatorState >= CoordinatorStateEnum::kCommitted) { + ensureFulfilledPromise(lk, _coordinatorHasCommitted); + } +} void ReshardingRecipientService::RecipientStateMachine:: _transitionToCreatingTemporaryReshardingCollection() { @@ -460,7 +483,6 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine:: return whenAllSucceed(std::move(futuresToWaitOn)).thenRunOn(**executor).then([this] { _transitionStateAndUpdateCoordinator(RecipientStateEnum::kStrictConsistency); - interrupt({ErrorCodes::InternalError, "Artificial interruption to enable jsTests"}); }); } @@ -483,14 +505,17 @@ void ReshardingRecipientService::RecipientStateMachine:: return; } - auto opCtx = cc().makeOperationContext(); + { + auto opCtx = cc().makeOperationContext(); - auto reshardingNss = constructTemporaryReshardingNss(_recipientDoc.getNss().db(), - _recipientDoc.getExistingUUID()); + auto reshardingNss = constructTemporaryReshardingNss(_recipientDoc.getNss().db(), + _recipientDoc.getExistingUUID()); - RenameCollectionOptions options; - options.dropTarget = true; - uassertStatusOK(renameCollection(opCtx.get(), reshardingNss, _recipientDoc.getNss(), options)); + RenameCollectionOptions options; + options.dropTarget = true; + uassertStatusOK( + renameCollection(opCtx.get(), reshardingNss, _recipientDoc.getNss(), options)); + } _transitionStateAndUpdateCoordinator(RecipientStateEnum::kDone); } |