summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_coordinator_service.cpp')
-rw-r--r--src/mongo/db/s/resharding/resharding_coordinator_service.cpp30
1 files changed, 15 insertions, 15 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
index 7c30bf61faf..e3f9562fed0 100644
--- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp
@@ -973,13 +973,13 @@ ReshardingCoordinatorService::ReshardingCoordinator::_runUntilReadyToPersistDeci
.then([this, executor] { _tellAllDonorsToRefresh(executor); })
.then([this, executor] { return _awaitAllRecipientsInStrictConsistency(executor); })
.onCompletion([this](auto passthroughFuture) {
- _factory.emplace(_ctHolder->getStepdownToken(), _markKilledExecutor);
+ _cancelableOpCtxFactory.emplace(_ctHolder->getStepdownToken(), _markKilledExecutor);
return passthroughFuture;
})
.onError([this, self = shared_from_this(), executor](
Status status) -> StatusWith<ReshardingCoordinatorDocument> {
{
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
reshardingPauseCoordinatorBeforeStartingErrorFlow.pauseWhileSet(opCtx.get());
}
@@ -1017,7 +1017,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_persistDecisionAndFinishRe
})
.onError([this, self = shared_from_this(), executor](Status status) {
{
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
reshardingPauseCoordinatorBeforeStartingErrorFlow.pauseWhileSet(opCtx.get());
}
@@ -1035,7 +1035,7 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
const CancellationToken& stepdownToken) noexcept {
_ctHolder = std::make_unique<CoordinatorCancellationTokenHolder>(stepdownToken);
_markKilledExecutor->startup();
- _factory.emplace(_ctHolder->getAbortToken(), _markKilledExecutor);
+ _cancelableOpCtxFactory.emplace(_ctHolder->getAbortToken(), _markKilledExecutor);
return _runUntilReadyToPersistDecision(executor)
.then([this, self = shared_from_this(), executor](
@@ -1051,7 +1051,7 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
markCompleted(status);
}
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
reshardingPauseCoordinatorBeforeCompletion.pauseWhileSetAndNotCanceled(
opCtx.get(), _ctHolder->getStepdownToken());
@@ -1147,7 +1147,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_insertCoordDocAndChan
return;
}
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
ReshardingCoordinatorDocument updatedCoordinatorDoc = _coordinatorDoc;
updatedCoordinatorDoc.setState(CoordinatorStateEnum::kInitializing);
@@ -1164,7 +1164,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::
return;
}
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
ReshardingCoordinatorDocument updatedCoordinatorDoc = _coordinatorDoc;
auto shardsAndChunks =
@@ -1254,7 +1254,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonat
.thenRunOn(**executor)
.then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) {
{
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
reshardingPauseCoordinatorBeforeCloning.pauseWhileSetAndNotCanceled(
opCtx.get(), _ctHolder->getAbortToken());
}
@@ -1299,7 +1299,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished
.thenRunOn(**executor)
.then([this, executor](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) {
{
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
reshardingPauseCoordinatorInSteadyState.pauseWhileSetAndNotCanceled(
opCtx.get(), _ctHolder->getAbortToken());
}
@@ -1364,7 +1364,7 @@ Future<void> ReshardingCoordinatorService::ReshardingCoordinator::_persistDecisi
ReshardingCoordinatorDocument updatedCoordinatorDoc = coordinatorDoc;
updatedCoordinatorDoc.setState(CoordinatorStateEnum::kDecisionPersisted);
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
reshardingPauseCoordinatorBeforeDecisionPersisted.pauseWhileSetAndNotCanceled(
opCtx.get(), _ctHolder->getAbortToken());
@@ -1406,7 +1406,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllParticipantShardsD
_ctHolder->getStepdownToken())
.thenRunOn(**executor)
.then([this, executor](const auto& coordinatorDocsChangedOnDisk) {
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
resharding::removeCoordinatorDocAndReshardingFields(opCtx.get(),
coordinatorDocsChangedOnDisk[1]);
});
@@ -1426,7 +1426,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::
emplaceApproxBytesToCopyIfExists(updatedCoordinatorDoc, std::move(approxCopySize));
emplaceAbortReasonIfExists(updatedCoordinatorDoc, abortReason);
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
resharding::writeStateTransitionAndCatalogUpdatesThenBumpShardVersions(opCtx.get(),
updatedCoordinatorDoc);
@@ -1436,7 +1436,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::
void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRefresh(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
auto recipientIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards());
NamespaceString nssToRefresh;
@@ -1461,7 +1461,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRe
void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllDonorsToRefresh(
const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
auto donorIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards());
auto refreshCmd = createFlushReshardingStateChangeCommand(_coordinatorDoc.getSourceNss());
@@ -1474,7 +1474,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllDonorsToRefres
void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllParticipantsToRefresh(
const NamespaceString& nss, const std::shared_ptr<executor::ScopedTaskExecutor>& executor) {
- auto opCtx = _factory->makeOperationContext(&cc());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
auto donorShardIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards());
auto recipientShardIds =