diff options
Diffstat (limited to 'src/mongo/db')
4 files changed, 57 insertions, 42 deletions
diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp index 7c30bf61faf..e3f9562fed0 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.cpp @@ -973,13 +973,13 @@ ReshardingCoordinatorService::ReshardingCoordinator::_runUntilReadyToPersistDeci .then([this, executor] { _tellAllDonorsToRefresh(executor); }) .then([this, executor] { return _awaitAllRecipientsInStrictConsistency(executor); }) .onCompletion([this](auto passthroughFuture) { - _factory.emplace(_ctHolder->getStepdownToken(), _markKilledExecutor); + _cancelableOpCtxFactory.emplace(_ctHolder->getStepdownToken(), _markKilledExecutor); return passthroughFuture; }) .onError([this, self = shared_from_this(), executor]( Status status) -> StatusWith<ReshardingCoordinatorDocument> { { - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); reshardingPauseCoordinatorBeforeStartingErrorFlow.pauseWhileSet(opCtx.get()); } @@ -1017,7 +1017,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_persistDecisionAndFinishRe }) .onError([this, self = shared_from_this(), executor](Status status) { { - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); reshardingPauseCoordinatorBeforeStartingErrorFlow.pauseWhileSet(opCtx.get()); } @@ -1035,7 +1035,7 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run( const CancellationToken& stepdownToken) noexcept { _ctHolder = std::make_unique<CoordinatorCancellationTokenHolder>(stepdownToken); _markKilledExecutor->startup(); - _factory.emplace(_ctHolder->getAbortToken(), _markKilledExecutor); + _cancelableOpCtxFactory.emplace(_ctHolder->getAbortToken(), _markKilledExecutor); return _runUntilReadyToPersistDecision(executor) .then([this, self = shared_from_this(), executor]( @@ -1051,7 +1051,7 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run( markCompleted(status); } - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); reshardingPauseCoordinatorBeforeCompletion.pauseWhileSetAndNotCanceled( opCtx.get(), _ctHolder->getStepdownToken()); @@ -1147,7 +1147,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_insertCoordDocAndChan return; } - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); ReshardingCoordinatorDocument updatedCoordinatorDoc = _coordinatorDoc; updatedCoordinatorDoc.setState(CoordinatorStateEnum::kInitializing); @@ -1164,7 +1164,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator:: return; } - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); ReshardingCoordinatorDocument updatedCoordinatorDoc = _coordinatorDoc; auto shardsAndChunks = @@ -1254,7 +1254,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllDonorsReadyToDonat .thenRunOn(**executor) .then([this](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { { - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); reshardingPauseCoordinatorBeforeCloning.pauseWhileSetAndNotCanceled( opCtx.get(), _ctHolder->getAbortToken()); } @@ -1299,7 +1299,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllRecipientsFinished .thenRunOn(**executor) .then([this, executor](ReshardingCoordinatorDocument coordinatorDocChangedOnDisk) { { - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); reshardingPauseCoordinatorInSteadyState.pauseWhileSetAndNotCanceled( opCtx.get(), _ctHolder->getAbortToken()); } @@ -1364,7 +1364,7 @@ Future<void> ReshardingCoordinatorService::ReshardingCoordinator::_persistDecisi ReshardingCoordinatorDocument updatedCoordinatorDoc = coordinatorDoc; updatedCoordinatorDoc.setState(CoordinatorStateEnum::kDecisionPersisted); - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); reshardingPauseCoordinatorBeforeDecisionPersisted.pauseWhileSetAndNotCanceled( opCtx.get(), _ctHolder->getAbortToken()); @@ -1406,7 +1406,7 @@ ReshardingCoordinatorService::ReshardingCoordinator::_awaitAllParticipantShardsD _ctHolder->getStepdownToken()) .thenRunOn(**executor) .then([this, executor](const auto& coordinatorDocsChangedOnDisk) { - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); resharding::removeCoordinatorDocAndReshardingFields(opCtx.get(), coordinatorDocsChangedOnDisk[1]); }); @@ -1426,7 +1426,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator:: emplaceApproxBytesToCopyIfExists(updatedCoordinatorDoc, std::move(approxCopySize)); emplaceAbortReasonIfExists(updatedCoordinatorDoc, abortReason); - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); resharding::writeStateTransitionAndCatalogUpdatesThenBumpShardVersions(opCtx.get(), updatedCoordinatorDoc); @@ -1436,7 +1436,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator:: void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRefresh( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); auto recipientIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getRecipientShards()); NamespaceString nssToRefresh; @@ -1461,7 +1461,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllRecipientsToRe void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllDonorsToRefresh( const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); auto donorIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards()); auto refreshCmd = createFlushReshardingStateChangeCommand(_coordinatorDoc.getSourceNss()); @@ -1474,7 +1474,7 @@ void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllDonorsToRefres void ReshardingCoordinatorService::ReshardingCoordinator::_tellAllParticipantsToRefresh( const NamespaceString& nss, const std::shared_ptr<executor::ScopedTaskExecutor>& executor) { - auto opCtx = _factory->makeOperationContext(&cc()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); auto donorShardIds = extractShardIdsFromParticipantEntries(_coordinatorDoc.getDonorShards()); auto recipientShardIds = diff --git a/src/mongo/db/s/resharding/resharding_coordinator_service.h b/src/mongo/db/s/resharding/resharding_coordinator_service.h index fe0cfb6ec73..98448181c3b 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_service.h +++ b/src/mongo/db/s/resharding/resharding_coordinator_service.h @@ -375,7 +375,7 @@ private: // CancelableOperationContext must have a thread that is always available to it to mark its // opCtx as killed when the cancelToken has been cancelled. const std::shared_ptr<ThreadPool> _markKilledExecutor; - boost::optional<CancelableOperationContextFactory> _factory; + boost::optional<CancelableOperationContextFactory> _cancelableOpCtxFactory; /** * Must be locked while the `_canEnterCritical` or `_completionPromise` diff --git a/src/mongo/db/s/resharding/resharding_donor_service.cpp b/src/mongo/db/s/resharding/resharding_donor_service.cpp index 835c7061631..a1a14730c96 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_service.cpp @@ -65,26 +65,21 @@ namespace { const WriteConcernOptions kNoWaitWriteConcern{1, WriteConcernOptions::SyncMode::UNSET, Seconds(0)}; -Timestamp generateMinFetchTimestamp(const NamespaceString& sourceNss) { - auto opCtx = cc().makeOperationContext(); - +Timestamp generateMinFetchTimestamp(OperationContext* opCtx, const NamespaceString& sourceNss) { // Do a no-op write and use the OpTime as the minFetchTimestamp writeConflictRetry( - opCtx.get(), - "resharding donor minFetchTimestamp", - NamespaceString::kRsOplogNamespace.ns(), - [&] { - AutoGetDb db(opCtx.get(), sourceNss.db(), MODE_IX); - Lock::CollectionLock collLock(opCtx.get(), sourceNss, MODE_S); + opCtx, "resharding donor minFetchTimestamp", NamespaceString::kRsOplogNamespace.ns(), [&] { + AutoGetDb db(opCtx, sourceNss.db(), MODE_IX); + Lock::CollectionLock collLock(opCtx, sourceNss, MODE_S); - AutoGetOplog oplogWrite(opCtx.get(), OplogAccessMode::kWrite); + AutoGetOplog oplogWrite(opCtx, OplogAccessMode::kWrite); const std::string msg = str::stream() << "All future oplog entries on the namespace " << sourceNss.ns() << " must include a 'destinedRecipient' field"; - WriteUnitOfWork wuow(opCtx.get()); + WriteUnitOfWork wuow(opCtx); opCtx->getClient()->getServiceContext()->getOpObserver()->onInternalOpMessage( - opCtx.get(), + opCtx, NamespaceString::kForceOplogBatchBoundaryNamespace, boost::none, BSON("msg" << msg), @@ -204,7 +199,14 @@ ReshardingDonorService::DonorStateMachine::DonorStateMachine( _metadata{donorDoc.getCommonReshardingMetadata()}, _recipientShardIds{donorDoc.getRecipientShards()}, _donorCtx{donorDoc.getMutableState()}, - _externalState{std::move(externalState)} { + _externalState{std::move(externalState)}, + _markKilledExecutor(std::make_shared<ThreadPool>([] { + ThreadPool::Options options; + options.poolName = "ReshardingDonorCancelableOpCtxPool"; + options.minThreads = 1; + options.maxThreads = 1; + return options; + }())) { invariant(_externalState); } @@ -269,7 +271,7 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_notifyCoordinat return withAutomaticRetry(**executor, abortToken, [this, executor] { - auto opCtx = cc().makeOperationContext(); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); return _updateCoordinator(opCtx.get(), executor); }) .then([this, abortToken] { @@ -296,10 +298,10 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_finishReshardin _transitionState(DonorStateEnum::kDone); } - auto opCtx = cc().makeOperationContext(); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); return _updateCoordinator(opCtx.get(), executor).then([this] { { - auto opCtx = cc().makeOperationContext(); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); removeDonorDocFailpoint.pauseWhileSet(opCtx.get()); } _removeDonorDocument(); @@ -311,12 +313,15 @@ SemiFuture<void> ReshardingDonorService::DonorStateMachine::run( std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& stepdownToken) noexcept { auto abortToken = _initAbortSource(stepdownToken); + _markKilledExecutor->startup(); + _cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor); return _runUntilBlockingWritesOrErrored(executor, abortToken) .then([this, executor, abortToken] { return _notifyCoordinatorAndAwaitDecision(executor, abortToken); }) - .onCompletion([executor, stepdownToken, abortToken](Status status) { + .onCompletion([this, executor, stepdownToken, abortToken](Status status) { + _cancelableOpCtxFactory.emplace(stepdownToken, _markKilledExecutor); if (stepdownToken.isCanceled()) { // Propagate any errors from the donor stepping down. return ExecutorFuture<bool>(**executor, status); @@ -416,7 +421,7 @@ void ReshardingDonorService::DonorStateMachine:: int64_t documentsToClone = 0; { - auto opCtx = cc().makeOperationContext(); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); auto rawOpCtx = opCtx.get(); AutoGetCollection coll(rawOpCtx, _metadata.getSourceNss(), MODE_IS); @@ -436,12 +441,15 @@ void ReshardingDonorService::DonorStateMachine:: // the {atClusterTime: <fetchTimestamp>} read on the config.cache.chunks namespace would fail // with a SnapshotUnavailable error response. { - auto opCtx = cc().makeOperationContext(); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); _externalState->refreshCatalogCache(opCtx.get(), _metadata.getTempReshardingNss()); _externalState->waitForCollectionFlush(opCtx.get(), _metadata.getTempReshardingNss()); } - Timestamp minFetchTimestamp = generateMinFetchTimestamp(_metadata.getSourceNss()); + Timestamp minFetchTimestamp = [this] { + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + return generateMinFetchTimestamp(opCtx.get(), _metadata.getSourceNss()); + }(); LOGV2_DEBUG(5390702, 2, @@ -463,7 +471,7 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine:: return ExecutorFuture<void>(**executor, Status::OK()); } - auto opCtx = cc().makeOperationContext(); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); return _updateCoordinator(opCtx.get(), executor) .then([this, abortToken] { return future_util::withCancellation(_allRecipientsDoneCloning.getFuture(), abortToken); @@ -497,7 +505,7 @@ void ReshardingDonorService::DonorStateMachine:: } { - auto opCtx = cc().makeOperationContext(); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); auto rawOpCtx = opCtx.get(); auto generateOplogEntry = [&](ShardId destinedRecipient) { @@ -575,7 +583,7 @@ void ReshardingDonorService::DonorStateMachine::_dropOriginalCollectionThenTrans } { - auto opCtx = cc().makeOperationContext(); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); resharding::data_copy::ensureCollectionDropped( opCtx.get(), _metadata.getSourceNss(), _metadata.getSourceUUID()); } @@ -698,7 +706,7 @@ ExecutorFuture<void> ReshardingDonorService::DonorStateMachine::_updateCoordinat .waitUntilMajority(clientOpTime, CancellationToken::uncancelable()) .thenRunOn(**executor) .then([this] { - auto opCtx = cc().makeOperationContext(); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); auto shardId = _externalState->myShardId(opCtx->getServiceContext()); BSONObjBuilder updateBuilder; @@ -727,7 +735,7 @@ void ReshardingDonorService::DonorStateMachine::insertStateDocument( void ReshardingDonorService::DonorStateMachine::_updateDonorDocument( DonorShardContext&& newDonorCtx) { - auto opCtx = cc().makeOperationContext(); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); PersistentTaskStore<ReshardingDonorDocument> store( NamespaceString::kDonorReshardingOperationsNamespace); store.update( @@ -741,7 +749,7 @@ void ReshardingDonorService::DonorStateMachine::_updateDonorDocument( } void ReshardingDonorService::DonorStateMachine::_removeDonorDocument() { - auto opCtx = cc().makeOperationContext(); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); const auto& nss = NamespaceString::kDonorReshardingOperationsNamespace; writeConflictRetry(opCtx.get(), "DonorStateMachine::_removeDonorDocument", nss.toString(), [&] { diff --git a/src/mongo/db/s/resharding/resharding_donor_service.h b/src/mongo/db/s/resharding/resharding_donor_service.h index 2972dc39d05..11addfc69bc 100644 --- a/src/mongo/db/s/resharding/resharding_donor_service.h +++ b/src/mongo/db/s/resharding/resharding_donor_service.h @@ -29,6 +29,7 @@ #pragma once +#include "mongo/db/cancelable_operation_context.h" #include "mongo/db/repl/primary_only_service.h" #include "mongo/db/s/resharding/donor_document_gen.h" #include "mongo/db/s/resharding/resharding_critical_section.h" @@ -195,6 +196,12 @@ private: const std::unique_ptr<DonorStateMachineExternalState> _externalState; + // ThreadPool used by CancelableOperationContext. + // CancelableOperationContext must have a thread that is always available to it to mark its + // opCtx as killed when the cancelToken has been cancelled. + const std::shared_ptr<ThreadPool> _markKilledExecutor; + boost::optional<CancelableOperationContextFactory> _cancelableOpCtxFactory; + // Protects the state below Mutex _mutex = MONGO_MAKE_LATCH("DonorStateMachine::_mutex"); |