From 3c9bdbf7a96eae083f7057694719f10c8c0b00cb Mon Sep 17 00:00:00 2001 From: jannaerin Date: Tue, 6 Apr 2021 19:34:21 +0000 Subject: SERVER-55323 Integrate CancelableOperationContext into ReshardingDonorOplogIterator --- .../resharding/resharding_donor_oplog_iterator.cpp | 31 ++++-------- .../s/resharding/resharding_donor_oplog_iterator.h | 9 +++- .../resharding_donor_oplog_iterator_test.cpp | 58 +++++++++++++++------- .../db/s/resharding/resharding_oplog_applier.cpp | 4 +- .../s/resharding/resharding_oplog_applier_test.cpp | 6 ++- 5 files changed, 62 insertions(+), 46 deletions(-) diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp index d218e76e66c..a6002fc86a8 100644 --- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp @@ -137,20 +137,6 @@ std::unique_ptr ReshardingDonorOplogIterator::makePip return Pipeline::create(std::move(stages), std::move(expCtx)); } -template -auto ReshardingDonorOplogIterator::_withTemporaryOperationContext(Callable&& callable) { - auto& client = cc(); - { - stdx::lock_guard lk(client); - invariant(client.canKillSystemOperationInStepdown(lk)); - } - - auto opCtx = client.makeOperationContext(); - opCtx->setAlwaysInterruptAtStepDownOrUp(); - - return callable(opCtx.get()); -} - std::vector ReshardingDonorOplogIterator::_fillBatch(Pipeline& pipeline) { std::vector batch; @@ -208,17 +194,20 @@ std::vector ReshardingDonorOplogIterator::_fillBatch(Pipeline& } ExecutorFuture> ReshardingDonorOplogIterator::getNextBatch( - std::shared_ptr executor, CancellationToken cancelToken) { + std::shared_ptr executor, + CancellationToken cancelToken, + CancelableOperationContextFactory factory) { if (_hasSeenFinalOplogEntry) { invariant(!_pipeline); return ExecutorFuture(std::move(executor), std::vector{}); } - auto batch = _withTemporaryOperationContext([&](auto* opCtx) { + auto batch = [&] { + auto opCtx = factory.makeOperationContext(&cc()); if (_pipeline) { - _pipeline->reattachToOperationContext(opCtx); + _pipeline->reattachToOperationContext(opCtx.get()); } else { - auto pipeline = makePipeline(opCtx, MongoProcessInterface::create(opCtx)); + auto pipeline = makePipeline(opCtx.get(), MongoProcessInterface::create(opCtx.get())); _pipeline = pipeline->getContext() ->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead( pipeline.release()); @@ -243,7 +232,7 @@ ExecutorFuture> ReshardingDonorOplogIterator::getN } return batch; - }); + }(); if (batch.empty() && !_hasSeenFinalOplogEntry) { return ExecutorFuture(executor) @@ -251,8 +240,8 @@ ExecutorFuture> ReshardingDonorOplogIterator::getN return future_util::withCancellation(_insertNotifier->awaitInsert(_resumeToken), cancelToken); }) - .then([this, cancelToken, executor] { - return getNextBatch(std::move(executor), cancelToken); + .then([this, cancelToken, executor, factory] { + return getNextBatch(std::move(executor), cancelToken, factory); }); } diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h index c152e6d04c7..adcdb508a18 100644 --- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h +++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h @@ -31,6 +31,7 @@ #include +#include "mongo/db/cancelable_operation_context.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/repl/oplog_entry.h" @@ -69,7 +70,9 @@ public: * final oplog entry hasn't been returned yet. */ virtual ExecutorFuture> getNextBatch( - std::shared_ptr executor, CancellationToken cancelToken) = 0; + std::shared_ptr executor, + CancellationToken cancelToken, + CancelableOperationContextFactory factory) = 0; }; /** @@ -94,7 +97,9 @@ public: OperationContext* opCtx, std::shared_ptr mongoProcessInterface); ExecutorFuture> getNextBatch( - std::shared_ptr executor, CancellationToken cancelToken) override; + std::shared_ptr executor, + CancellationToken cancelToken, + CancelableOperationContextFactory factory) override; static constexpr auto kActualOpFieldName = "actualOp"_sd; static constexpr auto kPreImageOpFieldName = "preImageOp"_sd; diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp index 441a683904d..68dfffad3e6 100644 --- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp +++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp @@ -172,15 +172,30 @@ public: return executor; } + CancelableOperationContextFactory makeCancelableOpCtx() { + auto cancelableOpCtxExecutor = std::make_shared([] { + ThreadPool::Options options; + options.poolName = "TestReshardOplogFetcherCancelableOpCtxPool"; + options.minThreads = 1; + options.maxThreads = 1; + return options; + }()); + + return CancelableOperationContextFactory(operationContext()->getCancellationToken(), + cancelableOpCtxExecutor); + } + auto getNextBatch(ReshardingDonorOplogIterator* iter, - std::shared_ptr executor) { + std::shared_ptr executor, + CancelableOperationContextFactory factory) { // There isn't a guarantee that the reference count to `executor` has been decremented after // .get() returns. We schedule a trivial task on the task executor to ensure the callback's // destructor has run. Otherwise `executor` could end up outliving the ServiceContext and // triggering an invariant due to the task executor's thread having a Client still. return ExecutorFuture(executor) - .then([iter, executor] { - return iter->getNextBatch(std::move(executor), CancellationToken::uncancelable()); + .then([iter, executor, factory] { + return iter->getNextBatch( + std::move(executor), CancellationToken::uncancelable(), factory); }) .then([](auto x) { return x; }) .get(); @@ -217,21 +232,22 @@ TEST_F(ReshardingDonorOplogIterTest, BasicExhaust) { ReshardingDonorOplogIterator iter(oplogNss(), kResumeFromBeginning, &onInsertAlwaysReady); auto executor = makeTaskExecutorForIterator(); + auto factory = makeCancelableOpCtx(); auto altClient = makeKillableClient(); AlternativeClientRegion acr(altClient); - auto next = getNextBatch(&iter, executor); + auto next = getNextBatch(&iter, executor, factory); ASSERT_EQ(next.size(), 1U); ASSERT_BSONOBJ_EQ(getId(oplog1), getId(next[0])); - next = getNextBatch(&iter, executor); + next = getNextBatch(&iter, executor, factory); ASSERT_EQ(next.size(), 1U); ASSERT_BSONOBJ_EQ(getId(oplog2), getId(next[0])); - next = getNextBatch(&iter, executor); + next = getNextBatch(&iter, executor, factory); ASSERT_TRUE(next.empty()); - next = getNextBatch(&iter, executor); + next = getNextBatch(&iter, executor, factory); ASSERT_TRUE(next.empty()); } @@ -249,14 +265,15 @@ TEST_F(ReshardingDonorOplogIterTest, ResumeFromMiddle) { ReshardingDonorOplogId resumeToken(Timestamp(2, 4), Timestamp(2, 4)); ReshardingDonorOplogIterator iter(oplogNss(), resumeToken, &onInsertAlwaysReady); auto executor = makeTaskExecutorForIterator(); + auto factory = makeCancelableOpCtx(); auto altClient = makeKillableClient(); AlternativeClientRegion acr(altClient); - auto next = getNextBatch(&iter, executor); + auto next = getNextBatch(&iter, executor, factory); ASSERT_EQ(next.size(), 1U); ASSERT_BSONOBJ_EQ(getId(oplog2), getId(next[0])); - next = getNextBatch(&iter, executor); + next = getNextBatch(&iter, executor, factory); ASSERT_TRUE(next.empty()); } @@ -306,21 +323,22 @@ TEST_F(ReshardingDonorOplogIterTest, ExhaustWithIncomingInserts) { ReshardingDonorOplogIterator iter(oplogNss(), kResumeFromBeginning, &insertNotifier); auto executor = makeTaskExecutorForIterator(); + auto factory = makeCancelableOpCtx(); auto altClient = makeKillableClient(); AlternativeClientRegion acr(altClient); - auto next = getNextBatch(&iter, executor); + auto next = getNextBatch(&iter, executor, factory); ASSERT_EQ(next.size(), 1U); ASSERT_BSONOBJ_EQ(getId(oplog1), getId(next[0])); - next = getNextBatch(&iter, executor); + next = getNextBatch(&iter, executor, factory); ASSERT_EQ(next.size(), 1U); ASSERT_BSONOBJ_EQ(getId(oplog2), getId(next[0])); - next = getNextBatch(&iter, executor); + next = getNextBatch(&iter, executor, factory); ASSERT_TRUE(next.empty()); - next = getNextBatch(&iter, executor); + next = getNextBatch(&iter, executor, factory); ASSERT_TRUE(next.empty()); ASSERT_EQ(insertNotifier.numCalls, 2U); @@ -340,22 +358,23 @@ TEST_F(ReshardingDonorOplogIterTest, FillsInPreImageOplogEntry) { ReshardingDonorOplogIterator iter(oplogNss(), kResumeFromBeginning, &onInsertAlwaysReady); auto executor = makeTaskExecutorForIterator(); + auto factory = makeCancelableOpCtx(); auto altClient = makeKillableClient(); AlternativeClientRegion acr(altClient); - auto next = getNextBatch(&iter, executor); + auto next = getNextBatch(&iter, executor, factory); ASSERT_EQ(next.size(), 1U); ASSERT_BSONOBJ_EQ(getId(preImageOp), getId(next[0])); ASSERT_BSONOBJ_BINARY_EQ(preImageDoc, next[0].getObject()); - next = getNextBatch(&iter, executor); + next = getNextBatch(&iter, executor, factory); ASSERT_EQ(next.size(), 1U); ASSERT_BSONOBJ_EQ(getId(deleteOp), getId(next[0])); ASSERT_TRUE(bool(next[0].getPreImageOp())); ASSERT_BSONOBJ_BINARY_EQ(getId(preImageOp), getId(*next[0].getPreImageOp())); ASSERT_BSONOBJ_BINARY_EQ(preImageDoc, next[0].getPreImageOp()->getObject()); - next = getNextBatch(&iter, executor); + next = getNextBatch(&iter, executor, factory); ASSERT_TRUE(next.empty()); } @@ -373,22 +392,23 @@ TEST_F(ReshardingDonorOplogIterTest, FillsInPostImageOplogEntry) { ReshardingDonorOplogIterator iter(oplogNss(), kResumeFromBeginning, &onInsertAlwaysReady); auto executor = makeTaskExecutorForIterator(); + auto factory = makeCancelableOpCtx(); auto altClient = makeKillableClient(); AlternativeClientRegion acr(altClient); - auto next = getNextBatch(&iter, executor); + auto next = getNextBatch(&iter, executor, factory); ASSERT_EQ(next.size(), 1U); ASSERT_BSONOBJ_EQ(getId(postImageOp), getId(next[0])); ASSERT_BSONOBJ_BINARY_EQ(postImageDoc, next[0].getObject()); - next = getNextBatch(&iter, executor); + next = getNextBatch(&iter, executor, factory); ASSERT_EQ(next.size(), 1U); ASSERT_BSONOBJ_EQ(getId(updateOp), getId(next[0])); ASSERT_TRUE(bool(next[0].getPostImageOp())); ASSERT_BSONOBJ_BINARY_EQ(getId(postImageOp), getId(*next[0].getPostImageOp())); ASSERT_BSONOBJ_BINARY_EQ(postImageDoc, next[0].getPostImageOp()->getObject()); - next = getNextBatch(&iter, executor); + next = getNextBatch(&iter, executor, factory); ASSERT_TRUE(next.empty()); } diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp index 3815fdde3ea..f3c8201d99d 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp @@ -139,11 +139,11 @@ ExecutorFuture ReshardingOplogApplier::_scheduleNextBatch( CancellationToken cancelToken, CancelableOperationContextFactory factory) { return ExecutorFuture(executor) - .then([this, executor, cancelToken] { + .then([this, executor, cancelToken, factory] { auto batchClient = makeKillableClient(_service(), kClientName); AlternativeClientRegion acr(batchClient); - return _oplogIter->getNextBatch(executor, cancelToken); + return _oplogIter->getNextBatch(executor, cancelToken, factory); }) .then([this, executor, cancelToken, factory](OplogBatch batch) { LOGV2_DEBUG(5391002, 3, "Starting batch", "batchSize"_attr = batch.size()); diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp index 70859f8fc31..786ab9e96de 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp @@ -77,10 +77,12 @@ public: } ExecutorFuture> getNextBatch( - std::shared_ptr executor, CancellationToken cancelToken) override { + std::shared_ptr executor, + CancellationToken cancelToken, + CancelableOperationContextFactory factory) override { // This operation context is unused by the function but confirms that the Client calling // getNextBatch() doesn't already have an operation context. - auto opCtx = cc().makeOperationContext(); + auto opCtx = factory.makeOperationContext(&cc()); return ExecutorFuture(std::move(executor)).then([this] { std::vector ret; -- cgit v1.2.1