diff options
author | jannaerin <golden.janna@gmail.com> | 2021-03-23 21:09:06 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-03-25 16:42:16 +0000 |
commit | d5ec2876050089857832308a5f72ea23dbb49715 (patch) | |
tree | c709a9726627e58e2ce5ac2624b3def1a40670e0 | |
parent | b79aa28799f52362d88eb7ca32948d2098daa741 (diff) | |
download | mongo-d5ec2876050089857832308a5f72ea23dbb49715.tar.gz |
SERVER-55306 Integrate CancelableOperationContext into ReshardingCollectionCloner
7 files changed, 103 insertions, 83 deletions
diff --git a/src/mongo/db/cancelable_operation_context.cpp b/src/mongo/db/cancelable_operation_context.cpp index c605d892d92..fdf7235c05d 100644 --- a/src/mongo/db/cancelable_operation_context.cpp +++ b/src/mongo/db/cancelable_operation_context.cpp @@ -31,10 +31,8 @@ #include "mongo/db/cancelable_operation_context.h" -#include "mongo/db/client.h" #include "mongo/db/operation_context.h" #include "mongo/stdx/mutex.h" -#include "mongo/util/cancellation.h" namespace mongo { diff --git a/src/mongo/db/cancelable_operation_context.h b/src/mongo/db/cancelable_operation_context.h index ee4225e2b5b..c3bc2931261 100644 --- a/src/mongo/db/cancelable_operation_context.h +++ b/src/mongo/db/cancelable_operation_context.h @@ -31,14 +31,15 @@ #include <memory> +#include "mongo/db/client.h" #include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" +#include "mongo/util/cancellation.h" #include "mongo/util/future.h" #include "mongo/util/out_of_line_executor.h" namespace mongo { -class CancellationToken; class OperationContext; /** @@ -92,4 +93,22 @@ private: const SemiFuture<void> _markKilledFinished; }; +/** + * A factory to create CancelableOperationContext objects that use the same CancelationToken and + * executor. + */ +class CancelableOperationContextFactory { +public: + CancelableOperationContextFactory(CancellationToken cancelToken, ExecutorPtr executor) + : _cancelToken{std::move(cancelToken)}, _executor{std::move(executor)} {} + + CancelableOperationContext makeOperationContext(Client* client) const { + return CancelableOperationContext{client->makeOperationContext(), _cancelToken, _executor}; + } + +private: + const CancellationToken _cancelToken; + const ExecutorPtr _executor; +}; + } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp index 6ebd1d479df..38937a2ce9e 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp +++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp @@ -36,6 +36,7 @@ #include <utility> #include "mongo/bson/json.h" +#include "mongo/db/cancelable_operation_context.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/client.h" @@ -256,6 +257,13 @@ std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::_restartP return resharding::data_copy::findHighestInsertedId(opCtx, *outputColl); }(); + // The BlockingResultsMerger underlying by the $mergeCursors stage records how long the + // recipient spent waiting for documents from the donor shards. It doing so requires the CurOp + // to be marked as having started. + auto* curOp = CurOp::get(opCtx); + curOp->ensureStarted(); + ON_BLOCK_EXIT([curOp] { curOp->done(); }); + auto pipeline = _targetAggregationRequest( opCtx, *makePipeline(opCtx, MongoProcessInterface::create(opCtx), idToResumeFrom)); @@ -301,49 +309,9 @@ bool ReshardingCollectionCloner::doOneBatch(OperationContext* opCtx, Pipeline& p return true; } -/** - * Invokes the 'callable' function with a fresh OperationContext. - * - * The OperationContext is configured so the RstlKillOpThread would always interrupt the operation - * on step-up or stepdown, regardless of whether the operation has acquired any locks. This - * interruption is best-effort to stop doing wasteful work on stepdown as quickly as possible. It - * isn't required for the ReshardingCollectionCloner's correctness. In particular, it is possible - * for an OperationContext to be constructed after stepdown has finished, for the - * ReshardingCollectionCloner to run a getMore on the aggregation against the donor shards, and for - * the ReshardingCollectionCloner to only discover afterwards the recipient had already stepped down - * from a NotPrimary error when inserting a batch of documents locally. - * - * Note that the recipient's primary-only service is responsible for managing the - * ReshardingCollectionCloner and would shut down the ReshardingCollectionCloner's task executor - * following the recipient stepping down. - * - * Also note that the ReshardingCollectionCloner is only created after step-up as part of the - * recipient's primary-only service and therefore would never be interrupted by step-up. - */ -template <typename Callable> -auto ReshardingCollectionCloner::_withTemporaryOperationContext(Callable&& callable) { - auto& client = cc(); - { - stdx::lock_guard<Client> lk(client); - invariant(client.canKillSystemOperationInStepdown(lk)); - } - - auto opCtx = client.makeOperationContext(); - opCtx->setAlwaysInterruptAtStepDownOrUp(); - - // The BlockingResultsMerger underlying by the $mergeCursors stage records how long the - // recipient spent waiting for documents from the donor shards. It doing so requires the CurOp - // to be marked as having started. - auto* curOp = CurOp::get(opCtx.get()); - curOp->ensureStarted(); - { - ON_BLOCK_EXIT([curOp] { curOp->done(); }); - return callable(opCtx.get()); - } -} - SemiFuture<void> ReshardingCollectionCloner::run(std::shared_ptr<executor::TaskExecutor> executor, - CancellationToken cancelToken) { + CancellationToken cancelToken, + CancelableOperationContextFactory factory) { struct ChainContext { std::unique_ptr<Pipeline, PipelineDeleter> pipeline; bool moreToCome = true; @@ -351,25 +319,25 @@ SemiFuture<void> ReshardingCollectionCloner::run(std::shared_ptr<executor::TaskE auto chainCtx = std::make_shared<ChainContext>(); - return AsyncTry([this, chainCtx] { + return AsyncTry([this, chainCtx, factory] { if (!chainCtx->pipeline) { - chainCtx->pipeline = _withTemporaryOperationContext( - [&](auto* opCtx) { return _restartPipeline(opCtx); }); + auto opCtx = factory.makeOperationContext(&cc()); + chainCtx->pipeline = _restartPipeline(opCtx.get()); } - chainCtx->moreToCome = _withTemporaryOperationContext( - [&](auto* opCtx) { return doOneBatch(opCtx, *chainCtx->pipeline); }); + auto opCtx = factory.makeOperationContext(&cc()); + chainCtx->moreToCome = doOneBatch(opCtx.get(), *chainCtx->pipeline); }) - .until([this, chainCtx, cancelToken](Status status) { + .until([this, chainCtx, cancelToken, factory](Status status) { if (status.isOK() && chainCtx->moreToCome) { return false; } if (chainCtx->pipeline) { - _withTemporaryOperationContext([&](auto* opCtx) { - chainCtx->pipeline->dispose(opCtx); - chainCtx->pipeline.reset(); - }); + auto opCtx = factory.makeOperationContext(&cc()); + + chainCtx->pipeline->dispose(opCtx.get()); + chainCtx->pipeline.reset(); } if (status.isA<ErrorCategory::CancellationError>() || @@ -411,11 +379,11 @@ SemiFuture<void> ReshardingCollectionCloner::run(std::shared_ptr<executor::TaskE .on(executor, cancelToken) .onCompletion([this, chainCtx](Status status) { if (chainCtx->pipeline) { + auto opCtx = cc().makeOperationContext(); + // Guarantee the pipeline is always cleaned up - even upon cancellation. - _withTemporaryOperationContext([&](auto* opCtx) { - chainCtx->pipeline->dispose(opCtx); - chainCtx->pipeline.reset(); - }); + chainCtx->pipeline->dispose(opCtx.get()); + chainCtx->pipeline.reset(); } // Propagate the result of the AsyncTry. diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.h b/src/mongo/db/s/resharding/resharding_collection_cloner.h index 5f31adaddb3..eb346343fe5 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner.h +++ b/src/mongo/db/s/resharding/resharding_collection_cloner.h @@ -49,6 +49,7 @@ class TaskExecutor; } // namespace executor +class CancelableOperationContextFactory; class OperationContext; class MongoProcessInterface; class ReshardingMetrics; @@ -93,7 +94,8 @@ public: * (b) the cancellation token was canceled due to a stepdown or abort. */ SemiFuture<void> run(std::shared_ptr<executor::TaskExecutor> executor, - CancellationToken cancelToken); + CancellationToken cancelToken, + CancelableOperationContextFactory factory); /** * Fetches and inserts a single batch of documents. @@ -109,9 +111,6 @@ private: std::unique_ptr<Pipeline, PipelineDeleter> _restartPipeline(OperationContext* opCtx); - template <typename Callable> - auto _withTemporaryOperationContext(Callable&& callable); - const std::unique_ptr<Env> _env; const ShardKeyPattern _newShardKeyPattern; const NamespaceString _sourceNss; diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp index 504930825ac..29d02d18c2f 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp +++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp @@ -31,6 +31,7 @@ #include "mongo/db/s/resharding/resharding_recipient_service.h" +#include "mongo/db/cancelable_operation_context.h" #include "mongo/db/catalog/rename_collection.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/dbdirectclient.h" @@ -228,7 +229,14 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine( _donorShardIds{recipientDoc.getDonorShards()}, _minimumOperationDuration{Milliseconds{recipientDoc.getMinimumOperationDurationMillis()}}, _recipientCtx{recipientDoc.getMutableState()}, - _fetchTimestamp{recipientDoc.getFetchTimestamp()} {} + _fetchTimestamp{recipientDoc.getFetchTimestamp()}, + _markKilledExecutor(std::make_shared<ThreadPool>([] { + ThreadPool::Options options; + options.poolName = "RecipientStateMachineCancelableOpCtxPool"; + options.minThreads = 1; + options.maxThreads = 1; + return options; + }())) {} ReshardingRecipientService::RecipientStateMachine::~RecipientStateMachine() { stdx::lock_guard<Latch> lg(_mutex); @@ -241,6 +249,7 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run( std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& stepdownToken) noexcept { auto abortToken = _initAbortSource(stepdownToken); + _markKilledExecutor->startup(); return ExecutorFuture<void>(**executor) .then([this, executor] { @@ -517,24 +526,29 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin })); } - return whenAllSucceed(_collectionCloner->run(**executor, abortToken).thenRunOn(**executor), - (*executor) - ->sleepFor(_minimumOperationDuration, abortToken) - .then([this, executor, abortToken] { - if (_txnCloners.empty()) { - return SemiFuture<void>::makeReady(); - } - - auto serviceContext = Client::getCurrent()->getServiceContext(); - - std::vector<ExecutorFuture<void>> txnClonerFutures; - for (auto&& txnCloner : _txnCloners) { - txnClonerFutures.push_back( - txnCloner->run(serviceContext, **executor, abortToken)); - } - - return whenAllSucceed(std::move(txnClonerFutures)); - })) + return whenAllSucceed( + _collectionCloner + ->run(**executor, + abortToken, + CancelableOperationContextFactory(abortToken, _markKilledExecutor)) + .thenRunOn(**executor), + (*executor) + ->sleepFor(_minimumOperationDuration, abortToken) + .then([this, executor, abortToken] { + if (_txnCloners.empty()) { + return SemiFuture<void>::makeReady(); + } + + auto serviceContext = Client::getCurrent()->getServiceContext(); + + std::vector<ExecutorFuture<void>> txnClonerFutures; + for (auto&& txnCloner : _txnCloners) { + txnClonerFutures.push_back( + txnCloner->run(serviceContext, **executor, abortToken)); + } + + return whenAllSucceed(std::move(txnClonerFutures)); + })) .thenRunOn(**executor) .then([this] { // ReshardingTxnCloners must complete before the recipient transitions to kApplying to diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h index e2e52382a71..f1c8501a4e8 100644 --- a/src/mongo/db/s/resharding/resharding_recipient_service.h +++ b/src/mongo/db/s/resharding/resharding_recipient_service.h @@ -212,6 +212,11 @@ private: RecipientShardContext _recipientCtx; boost::optional<Timestamp> _fetchTimestamp; + // ThreadPool used by CancelableOperationContext. + // CancelableOperationContext must have a thread that is always available to it to mark its + // opCtx as killed when the cancelToken has been cancelled. + const std::shared_ptr<ThreadPool> _markKilledExecutor; + std::unique_ptr<ReshardingCollectionCloner> _collectionCloner; std::vector<std::unique_ptr<ReshardingTxnCloner>> _txnCloners; diff --git a/src/mongo/db/s/resharding_test_commands.cpp b/src/mongo/db/s/resharding_test_commands.cpp index d9b2fcd15af..6f7dfa11544 100644 --- a/src/mongo/db/s/resharding_test_commands.cpp +++ b/src/mongo/db/s/resharding_test_commands.cpp @@ -34,6 +34,7 @@ #include <memory> #include "mongo/db/auth/authorization_session.h" +#include "mongo/db/cancelable_operation_context.h" #include "mongo/db/commands.h" #include "mongo/db/operation_context.h" #include "mongo/db/s/resharding/resharding_collection_cloner.h" @@ -99,7 +100,23 @@ public: request().getAtClusterTime(), request().getOutputNs()); - cloner.run(executor, opCtx->getCancellationToken()).get(opCtx); + std::shared_ptr<ThreadPool> cancelableOperationContextPool = [] { + ThreadPool::Options options; + options.poolName = "TestReshardingCollectionClonerCancelableOpCtxPool"; + options.minThreads = 1; + options.maxThreads = 1; + + auto threadPool = std::make_shared<ThreadPool>(std::move(options)); + threadPool->startup(); + return threadPool; + }(); + + cloner + .run(executor, + opCtx->getCancellationToken(), + CancelableOperationContextFactory(opCtx->getCancellationToken(), + cancelableOperationContextPool)) + .get(opCtx); } private: |