summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2021-04-06 19:34:21 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-04-09 19:14:12 +0000
commit3c9bdbf7a96eae083f7057694719f10c8c0b00cb (patch)
treeea76abd802fd5df5b305416b0087c7959d7e1e5a
parentda7c4d8cdc446b50826455fbd20c08b55b31e897 (diff)
downloadmongo-3c9bdbf7a96eae083f7057694719f10c8c0b00cb.tar.gz
SERVER-55323 Integrate CancelableOperationContext into ReshardingDonorOplogIterator
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp31
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h9
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp58
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp6
5 files changed, 62 insertions, 46 deletions
diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp
index d218e76e66c..a6002fc86a8 100644
--- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp
@@ -137,20 +137,6 @@ std::unique_ptr<Pipeline, PipelineDeleter> ReshardingDonorOplogIterator::makePip
return Pipeline::create(std::move(stages), std::move(expCtx));
}
-template <typename Callable>
-auto ReshardingDonorOplogIterator::_withTemporaryOperationContext(Callable&& callable) {
- auto& client = cc();
- {
- stdx::lock_guard<Client> lk(client);
- invariant(client.canKillSystemOperationInStepdown(lk));
- }
-
- auto opCtx = client.makeOperationContext();
- opCtx->setAlwaysInterruptAtStepDownOrUp();
-
- return callable(opCtx.get());
-}
-
std::vector<repl::OplogEntry> ReshardingDonorOplogIterator::_fillBatch(Pipeline& pipeline) {
std::vector<repl::OplogEntry> batch;
@@ -208,17 +194,20 @@ std::vector<repl::OplogEntry> ReshardingDonorOplogIterator::_fillBatch(Pipeline&
}
ExecutorFuture<std::vector<repl::OplogEntry>> ReshardingDonorOplogIterator::getNextBatch(
- std::shared_ptr<executor::TaskExecutor> executor, CancellationToken cancelToken) {
+ std::shared_ptr<executor::TaskExecutor> executor,
+ CancellationToken cancelToken,
+ CancelableOperationContextFactory factory) {
if (_hasSeenFinalOplogEntry) {
invariant(!_pipeline);
return ExecutorFuture(std::move(executor), std::vector<repl::OplogEntry>{});
}
- auto batch = _withTemporaryOperationContext([&](auto* opCtx) {
+ auto batch = [&] {
+ auto opCtx = factory.makeOperationContext(&cc());
if (_pipeline) {
- _pipeline->reattachToOperationContext(opCtx);
+ _pipeline->reattachToOperationContext(opCtx.get());
} else {
- auto pipeline = makePipeline(opCtx, MongoProcessInterface::create(opCtx));
+ auto pipeline = makePipeline(opCtx.get(), MongoProcessInterface::create(opCtx.get()));
_pipeline = pipeline->getContext()
->mongoProcessInterface->attachCursorSourceToPipelineForLocalRead(
pipeline.release());
@@ -243,7 +232,7 @@ ExecutorFuture<std::vector<repl::OplogEntry>> ReshardingDonorOplogIterator::getN
}
return batch;
- });
+ }();
if (batch.empty() && !_hasSeenFinalOplogEntry) {
return ExecutorFuture(executor)
@@ -251,8 +240,8 @@ ExecutorFuture<std::vector<repl::OplogEntry>> ReshardingDonorOplogIterator::getN
return future_util::withCancellation(_insertNotifier->awaitInsert(_resumeToken),
cancelToken);
})
- .then([this, cancelToken, executor] {
- return getNextBatch(std::move(executor), cancelToken);
+ .then([this, cancelToken, executor, factory] {
+ return getNextBatch(std::move(executor), cancelToken, factory);
});
}
diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h
index c152e6d04c7..adcdb508a18 100644
--- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h
+++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.h
@@ -31,6 +31,7 @@
#include <vector>
+#include "mongo/db/cancelable_operation_context.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/repl/oplog_entry.h"
@@ -69,7 +70,9 @@ public:
* final oplog entry hasn't been returned yet.
*/
virtual ExecutorFuture<std::vector<repl::OplogEntry>> getNextBatch(
- std::shared_ptr<executor::TaskExecutor> executor, CancellationToken cancelToken) = 0;
+ std::shared_ptr<executor::TaskExecutor> executor,
+ CancellationToken cancelToken,
+ CancelableOperationContextFactory factory) = 0;
};
/**
@@ -94,7 +97,9 @@ public:
OperationContext* opCtx, std::shared_ptr<MongoProcessInterface> mongoProcessInterface);
ExecutorFuture<std::vector<repl::OplogEntry>> getNextBatch(
- std::shared_ptr<executor::TaskExecutor> executor, CancellationToken cancelToken) override;
+ std::shared_ptr<executor::TaskExecutor> executor,
+ CancellationToken cancelToken,
+ CancelableOperationContextFactory factory) override;
static constexpr auto kActualOpFieldName = "actualOp"_sd;
static constexpr auto kPreImageOpFieldName = "preImageOp"_sd;
diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
index 441a683904d..68dfffad3e6 100644
--- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
@@ -172,15 +172,30 @@ public:
return executor;
}
+ CancelableOperationContextFactory makeCancelableOpCtx() {
+ auto cancelableOpCtxExecutor = std::make_shared<ThreadPool>([] {
+ ThreadPool::Options options;
+ options.poolName = "TestReshardOplogFetcherCancelableOpCtxPool";
+ options.minThreads = 1;
+ options.maxThreads = 1;
+ return options;
+ }());
+
+ return CancelableOperationContextFactory(operationContext()->getCancellationToken(),
+ cancelableOpCtxExecutor);
+ }
+
auto getNextBatch(ReshardingDonorOplogIterator* iter,
- std::shared_ptr<executor::TaskExecutor> executor) {
+ std::shared_ptr<executor::TaskExecutor> executor,
+ CancelableOperationContextFactory factory) {
// There isn't a guarantee that the reference count to `executor` has been decremented after
// .get() returns. We schedule a trivial task on the task executor to ensure the callback's
// destructor has run. Otherwise `executor` could end up outliving the ServiceContext and
// triggering an invariant due to the task executor's thread having a Client still.
return ExecutorFuture(executor)
- .then([iter, executor] {
- return iter->getNextBatch(std::move(executor), CancellationToken::uncancelable());
+ .then([iter, executor, factory] {
+ return iter->getNextBatch(
+ std::move(executor), CancellationToken::uncancelable(), factory);
})
.then([](auto x) { return x; })
.get();
@@ -217,21 +232,22 @@ TEST_F(ReshardingDonorOplogIterTest, BasicExhaust) {
ReshardingDonorOplogIterator iter(oplogNss(), kResumeFromBeginning, &onInsertAlwaysReady);
auto executor = makeTaskExecutorForIterator();
+ auto factory = makeCancelableOpCtx();
auto altClient = makeKillableClient();
AlternativeClientRegion acr(altClient);
- auto next = getNextBatch(&iter, executor);
+ auto next = getNextBatch(&iter, executor, factory);
ASSERT_EQ(next.size(), 1U);
ASSERT_BSONOBJ_EQ(getId(oplog1), getId(next[0]));
- next = getNextBatch(&iter, executor);
+ next = getNextBatch(&iter, executor, factory);
ASSERT_EQ(next.size(), 1U);
ASSERT_BSONOBJ_EQ(getId(oplog2), getId(next[0]));
- next = getNextBatch(&iter, executor);
+ next = getNextBatch(&iter, executor, factory);
ASSERT_TRUE(next.empty());
- next = getNextBatch(&iter, executor);
+ next = getNextBatch(&iter, executor, factory);
ASSERT_TRUE(next.empty());
}
@@ -249,14 +265,15 @@ TEST_F(ReshardingDonorOplogIterTest, ResumeFromMiddle) {
ReshardingDonorOplogId resumeToken(Timestamp(2, 4), Timestamp(2, 4));
ReshardingDonorOplogIterator iter(oplogNss(), resumeToken, &onInsertAlwaysReady);
auto executor = makeTaskExecutorForIterator();
+ auto factory = makeCancelableOpCtx();
auto altClient = makeKillableClient();
AlternativeClientRegion acr(altClient);
- auto next = getNextBatch(&iter, executor);
+ auto next = getNextBatch(&iter, executor, factory);
ASSERT_EQ(next.size(), 1U);
ASSERT_BSONOBJ_EQ(getId(oplog2), getId(next[0]));
- next = getNextBatch(&iter, executor);
+ next = getNextBatch(&iter, executor, factory);
ASSERT_TRUE(next.empty());
}
@@ -306,21 +323,22 @@ TEST_F(ReshardingDonorOplogIterTest, ExhaustWithIncomingInserts) {
ReshardingDonorOplogIterator iter(oplogNss(), kResumeFromBeginning, &insertNotifier);
auto executor = makeTaskExecutorForIterator();
+ auto factory = makeCancelableOpCtx();
auto altClient = makeKillableClient();
AlternativeClientRegion acr(altClient);
- auto next = getNextBatch(&iter, executor);
+ auto next = getNextBatch(&iter, executor, factory);
ASSERT_EQ(next.size(), 1U);
ASSERT_BSONOBJ_EQ(getId(oplog1), getId(next[0]));
- next = getNextBatch(&iter, executor);
+ next = getNextBatch(&iter, executor, factory);
ASSERT_EQ(next.size(), 1U);
ASSERT_BSONOBJ_EQ(getId(oplog2), getId(next[0]));
- next = getNextBatch(&iter, executor);
+ next = getNextBatch(&iter, executor, factory);
ASSERT_TRUE(next.empty());
- next = getNextBatch(&iter, executor);
+ next = getNextBatch(&iter, executor, factory);
ASSERT_TRUE(next.empty());
ASSERT_EQ(insertNotifier.numCalls, 2U);
@@ -340,22 +358,23 @@ TEST_F(ReshardingDonorOplogIterTest, FillsInPreImageOplogEntry) {
ReshardingDonorOplogIterator iter(oplogNss(), kResumeFromBeginning, &onInsertAlwaysReady);
auto executor = makeTaskExecutorForIterator();
+ auto factory = makeCancelableOpCtx();
auto altClient = makeKillableClient();
AlternativeClientRegion acr(altClient);
- auto next = getNextBatch(&iter, executor);
+ auto next = getNextBatch(&iter, executor, factory);
ASSERT_EQ(next.size(), 1U);
ASSERT_BSONOBJ_EQ(getId(preImageOp), getId(next[0]));
ASSERT_BSONOBJ_BINARY_EQ(preImageDoc, next[0].getObject());
- next = getNextBatch(&iter, executor);
+ next = getNextBatch(&iter, executor, factory);
ASSERT_EQ(next.size(), 1U);
ASSERT_BSONOBJ_EQ(getId(deleteOp), getId(next[0]));
ASSERT_TRUE(bool(next[0].getPreImageOp()));
ASSERT_BSONOBJ_BINARY_EQ(getId(preImageOp), getId(*next[0].getPreImageOp()));
ASSERT_BSONOBJ_BINARY_EQ(preImageDoc, next[0].getPreImageOp()->getObject());
- next = getNextBatch(&iter, executor);
+ next = getNextBatch(&iter, executor, factory);
ASSERT_TRUE(next.empty());
}
@@ -373,22 +392,23 @@ TEST_F(ReshardingDonorOplogIterTest, FillsInPostImageOplogEntry) {
ReshardingDonorOplogIterator iter(oplogNss(), kResumeFromBeginning, &onInsertAlwaysReady);
auto executor = makeTaskExecutorForIterator();
+ auto factory = makeCancelableOpCtx();
auto altClient = makeKillableClient();
AlternativeClientRegion acr(altClient);
- auto next = getNextBatch(&iter, executor);
+ auto next = getNextBatch(&iter, executor, factory);
ASSERT_EQ(next.size(), 1U);
ASSERT_BSONOBJ_EQ(getId(postImageOp), getId(next[0]));
ASSERT_BSONOBJ_BINARY_EQ(postImageDoc, next[0].getObject());
- next = getNextBatch(&iter, executor);
+ next = getNextBatch(&iter, executor, factory);
ASSERT_EQ(next.size(), 1U);
ASSERT_BSONOBJ_EQ(getId(updateOp), getId(next[0]));
ASSERT_TRUE(bool(next[0].getPostImageOp()));
ASSERT_BSONOBJ_BINARY_EQ(getId(postImageOp), getId(*next[0].getPostImageOp()));
ASSERT_BSONOBJ_BINARY_EQ(postImageDoc, next[0].getPostImageOp()->getObject());
- next = getNextBatch(&iter, executor);
+ next = getNextBatch(&iter, executor, factory);
ASSERT_TRUE(next.empty());
}
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
index 3815fdde3ea..f3c8201d99d 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier.cpp
@@ -139,11 +139,11 @@ ExecutorFuture<void> ReshardingOplogApplier::_scheduleNextBatch(
CancellationToken cancelToken,
CancelableOperationContextFactory factory) {
return ExecutorFuture(executor)
- .then([this, executor, cancelToken] {
+ .then([this, executor, cancelToken, factory] {
auto batchClient = makeKillableClient(_service(), kClientName);
AlternativeClientRegion acr(batchClient);
- return _oplogIter->getNextBatch(executor, cancelToken);
+ return _oplogIter->getNextBatch(executor, cancelToken, factory);
})
.then([this, executor, cancelToken, factory](OplogBatch batch) {
LOGV2_DEBUG(5391002, 3, "Starting batch", "batchSize"_attr = batch.size());
diff --git a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
index 70859f8fc31..786ab9e96de 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_applier_test.cpp
@@ -77,10 +77,12 @@ public:
}
ExecutorFuture<std::vector<repl::OplogEntry>> getNextBatch(
- std::shared_ptr<executor::TaskExecutor> executor, CancellationToken cancelToken) override {
+ std::shared_ptr<executor::TaskExecutor> executor,
+ CancellationToken cancelToken,
+ CancelableOperationContextFactory factory) override {
// This operation context is unused by the function but confirms that the Client calling
// getNextBatch() doesn't already have an operation context.
- auto opCtx = cc().makeOperationContext();
+ auto opCtx = factory.makeOperationContext(&cc());
return ExecutorFuture(std::move(executor)).then([this] {
std::vector<repl::OplogEntry> ret;