diff options
Diffstat (limited to 'src/mongo/db/s/resharding/resharding_coordinator_service.cpp')
-rw-r--r-- | src/mongo/db/s/resharding/resharding_coordinator_service.cpp | 30 |
1 files changed, 15 insertions, 15 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 7c30bf61faf..e3f9562fed0 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -973,13 +973,13 @@ ReshardingCoordinatorService::ReshardingCoordinator::_runUntilReadyToPersistDeci .then([this, executor] { _tellAllDonorsToRefresh(executor); }) .then([this, executor] { return _awaitAllRecipientsInStrictConsistency(executor); }) .onCompletion([this](auto passthroughFuture) { - _factory.emplace(_ctHolder->getStepdownToken(), _markKilledExecutor); + _cancelableOpCtxFactory.emplace(_ctHolder->getStepdownToken(), _markKilledExecutor); return passthroughFuture; }) .onError([this, self = shared_from_this(), executor]( Status status) -> StatusWith<ReshardingCoordinatorDocument> { { - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); reshardingPauseCoordinatorBeforeStartingErrorFlow.pauseWhileSet(opCtx.get()); } @@ -1017,7 +1017,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_persistDecisionAndFinishRe }) .onError([this, self = shared_from_this(), executor](Status status) { { - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); reshardingPauseCoordinatorBeforeStartingErrorFlow.pauseWhileSet(opCtx.get()); } @@ -1035,7 +1035,7 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run( const CancellationToken& stepdownToken) noexcept { _ctHolder = std::make_unique<CoordinatorCancellationTokenHolder>(stepdownToken); _markKilledExecutor->startup(); - _factory.emplace(_ctHolder->getAbortToken(), _markKilledExecutor); + _cancelableOpCtxFactory.emplace(_ctHolder->getAbortToken(), _markKilledExecutor); return _runUntilReadyToPersistDecision(executor) .then([this, self = shared_from_this(), executor]( @@ -1051,7 +1051,7 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run( markCompleted(status); } - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); reshardingPauseCoordinatorBeforeCompletion.pauseWhileSetAndNotCanceled( opCtx.get(), _ctHolder->getStepdownToken()); @@ -1147,7 +1147,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_insertCoordDocAndChan return; } - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); ReshardingCoordinatorDocument updatedCoordinatorDoc = _coordinatorDoc; updatedCoordinatorDoc.setState(CoordinatorStateEnum::kInitializing); @@ -1164,7 +1164,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator:: return; } - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); ReshardingCoordinatorDocument updatedCoordinatorDoc = _coordinatorDoc; auto shardsAndChunks = @@ -1254,7 +1254,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonat .thenRunOn(**executor) .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { { - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); reshardingPauseCoordinatorBeforeCloning.pauseWhileSetAndNotCanceled( opCtx.get(), _ctHolder->getAbortToken()); } @@ -1299,7 +1299,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished .thenRunOn(**executor) .then([this, executor](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { { - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); reshardingPauseCoordinatorInSteadyState.pauseWhileSetAndNotCanceled( opCtx.get(), _ctHolder->getAbortToken()); } @@ -1364,7 +1364,7 @@ Future<void> ReshardingCoordinatorService::ReshardingCoordinator::_persistDecisi ReshardingCoordinatorDocument updatedCoordinatorDoc = coordinatorDoc; updatedCoordinatorDoc.setState(CoordinatorStateEnum::kDecisionPersisted); - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); reshardingPauseCoordinatorBeforeDecisionPersisted.pauseWhileSetAndNotCanceled( opCtx.get(), _ctHolder->getAbortToken()); @@ -1406,7 +1406,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllParticipantShardsD _ctHolder->getStepdownToken()) .thenRunOn(**executor) .then([this, executor](const auto& coordinatorDocsChangedOnDisk) { - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); resharding::removeCoordinatorDocAndReshardingFields(opCtx.get(), coordinatorDocsChangedOnDisk[1]); }); @@ -1426,7 +1426,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator:: emplaceApproxBytesToCopyIfExists(updatedCoordinatorDoc, std::move(approxCopySize)); emplaceAbortReasonIfExists(updatedCoordinatorDoc, abortReason); - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); resharding::writeStateTransitionAndCatalogUpdatesThenBumpShardVersions(opCtx.get(), updatedCoordinatorDoc); @@ -1436,7 +1436,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator:: void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRefresh( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); auto recipientIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()); NamespaceString nssToRefresh; @@ -1461,7 +1461,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRe void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllDonorsToRefresh( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); auto donorIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards()); auto refreshCmd = createFlushReshardingStateChangeCommand(_coordinatorDoc.getSourceNss()); @@ -1474,7 +1474,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllDonorsToRefres void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllParticipantsToRefresh( const NamespaceString& nss, const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); auto donorShardIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards()); auto recipientShardIds = |