summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding/resharding_recipient_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_recipient_service.cpp')
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp59
1 files changed, 42 insertions, 17 deletions
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index 33686ba261b..9e7a4ffac0b 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -87,6 +87,14 @@ ChunkManager getShardedCollectionRoutingInfo(OperationContext* opCtx, const Name
return cm;
}
+/**
+ * Fulfills the promise if it is not already. Otherwise, does nothing.
+ */
+void ensureFulfilledPromise(WithLock lk, SharedPromise<void>& sp) {
+ if (!sp.getFuture().isReady()) {
+ sp.emplaceValue();
+ }
+}
} // namespace
namespace resharding {
@@ -161,7 +169,6 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine(
ReshardingRecipientService::RecipientStateMachine::~RecipientStateMachine() {
stdx::lock_guard<Latch> lg(_mutex);
- invariant(_allDonorsMirroring.getFuture().isReady());
invariant(_coordinatorHasCommitted.getFuture().isReady());
invariant(_completionPromise.getFuture().isReady());
}
@@ -180,8 +187,13 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run(
.then([this, executor] {
return _awaitCoordinatorHasCommittedThenTransitionToRenaming(executor);
})
- .then([this] { _renameTemporaryReshardingCollectionThenDeleteLocalState(); })
- .onError([this](Status status) {
+ .then([this, self = shared_from_this()] {
+ // After this line, the shared_ptr stored in the PrimaryOnlyService's map for
+ // the ReshardingRecipientService Instance is removed. It is necessary to use
+ // shared_from_this() to extend the lifetime for the remaining callbacks.
+ _renameTemporaryReshardingCollectionThenDeleteLocalState();
+ })
+ .onError([this, self = shared_from_this()](Status status) {
LOGV2(4956500,
"Resharding operation recipient state machine failed",
"namespace"_attr = _recipientDoc.getNss().ns(),
@@ -189,10 +201,10 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run(
"error"_attr = status);
// TODO SERVER-50584 Report errors to the coordinator so that the resharding operation
// can be aborted.
- this->_transitionStateToError(status);
+ _transitionStateToError(status);
return status;
})
- .onCompletion([this](Status status) {
+ .onCompletion([this, self = shared_from_this()](Status status) {
stdx::lock_guard<Latch> lg(_mutex);
if (_completionPromise.getFuture().isReady()) {
// interrupt() was called before we got here.
@@ -228,10 +240,6 @@ void ReshardingRecipientService::RecipientStateMachine::interrupt(Status status)
threadPool->shutdown();
}
- if (!_allDonorsMirroring.getFuture().isReady()) {
- _allDonorsMirroring.setError(status);
- }
-
if (!_coordinatorHasCommitted.getFuture().isReady()) {
_coordinatorHasCommitted.setError(status);
}
@@ -242,7 +250,22 @@ void ReshardingRecipientService::RecipientStateMachine::interrupt(Status status)
}
void ReshardingRecipientService::RecipientStateMachine::onReshardingFieldsChanges(
- boost::optional<TypeCollectionReshardingFields> reshardingFields) {}
+ const TypeCollectionReshardingFields& reshardingFields) {
+ auto coordinatorState = reshardingFields.getState();
+ if (coordinatorState == CoordinatorStateEnum::kError) {
+ // TODO SERVER-52838: Investigate if we want to have a special error code so the recipient
+ // knows when it has recieved the error from the coordinator rather than needing to report
+ // an error to the coordinator.
+ interrupt({ErrorCodes::InternalError,
+ "ReshardingDonorService observed CoordinatorStateEnum::kError"});
+ return;
+ }
+
+ stdx::lock_guard<Latch> lk(_mutex);
+ if (coordinatorState >= CoordinatorStateEnum::kCommitted) {
+ ensureFulfilledPromise(lk, _coordinatorHasCommitted);
+ }
+}
void ReshardingRecipientService::RecipientStateMachine::
_transitionToCreatingTemporaryReshardingCollection() {
@@ -460,7 +483,6 @@ ExecutorFuture<void> ReshardingRecipientService::RecipientStateMachine::
return whenAllSucceed(std::move(futuresToWaitOn)).thenRunOn(**executor).then([this] {
_transitionStateAndUpdateCoordinator(RecipientStateEnum::kStrictConsistency);
- interrupt({ErrorCodes::InternalError, "Artificial interruption to enable jsTests"});
});
}
@@ -483,14 +505,17 @@ void ReshardingRecipientService::RecipientStateMachine::
return;
}
- auto opCtx = cc().makeOperationContext();
+ {
+ auto opCtx = cc().makeOperationContext();
- auto reshardingNss = constructTemporaryReshardingNss(_recipientDoc.getNss().db(),
- _recipientDoc.getExistingUUID());
+ auto reshardingNss = constructTemporaryReshardingNss(_recipientDoc.getNss().db(),
+ _recipientDoc.getExistingUUID());
- RenameCollectionOptions options;
- options.dropTarget = true;
- uassertStatusOK(renameCollection(opCtx.get(), reshardingNss, _recipientDoc.getNss(), options));
+ RenameCollectionOptions options;
+ options.dropTarget = true;
+ uassertStatusOK(
+ renameCollection(opCtx.get(), reshardingNss, _recipientDoc.getNss(), options));
+ }
_transitionStateAndUpdateCoordinator(RecipientStateEnum::kDone);
}