summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2021-03-23 21:09:06 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-03-25 16:42:16 +0000
commitd5ec2876050089857832308a5f72ea23dbb49715 (patch)
treec709a9726627e58e2ce5ac2624b3def1a40670e0
parentb79aa28799f52362d88eb7ca32948d2098daa741 (diff)
downloadmongo-d5ec2876050089857832308a5f72ea23dbb49715.tar.gz
SERVER-55306 Integrate CancelableOperationContext into ReshardingCollectionCloner
-rw-r--r--src/mongo/db/cancelable_operation_context.cpp2
-rw-r--r--src/mongo/db/cancelable_operation_context.h21
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner.cpp80
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner.h7
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.cpp52
-rw-r--r--src/mongo/db/s/resharding/resharding_recipient_service.h5
-rw-r--r--src/mongo/db/s/resharding_test_commands.cpp19
7 files changed, 103 insertions, 83 deletions
diff --git a/src/mongo/db/cancelable_operation_context.cpp b/src/mongo/db/cancelable_operation_context.cpp
index c605d892d92..fdf7235c05d 100644
--- a/src/mongo/db/cancelable_operation_context.cpp
+++ b/src/mongo/db/cancelable_operation_context.cpp
@@ -31,10 +31,8 @@
#include "mongo/db/cancelable_operation_context.h"
-#include "mongo/db/client.h"
#include "mongo/db/operation_context.h"
#include "mongo/stdx/mutex.h"
-#include "mongo/util/cancellation.h"
namespace mongo {
diff --git a/src/mongo/db/cancelable_operation_context.h b/src/mongo/db/cancelable_operation_context.h
index ee4225e2b5b..c3bc2931261 100644
--- a/src/mongo/db/cancelable_operation_context.h
+++ b/src/mongo/db/cancelable_operation_context.h
@@ -31,14 +31,15 @@
#include <memory>
+#include "mongo/db/client.h"
#include "mongo/db/service_context.h"
#include "mongo/platform/atomic_word.h"
+#include "mongo/util/cancellation.h"
#include "mongo/util/future.h"
#include "mongo/util/out_of_line_executor.h"
namespace mongo {
-class CancellationToken;
class OperationContext;
/**
@@ -92,4 +93,22 @@ private:
const SemiFuture<void> _markKilledFinished;
};
+/**
+ * A factory to create CancelableOperationContext objects that use the same CancelationToken and
+ * executor.
+ */
+class CancelableOperationContextFactory {
+public:
+ CancelableOperationContextFactory(CancellationToken cancelToken, ExecutorPtr executor)
+ : _cancelToken{std::move(cancelToken)}, _executor{std::move(executor)} {}
+
+ CancelableOperationContext makeOperationContext(Client* client) const {
+ return CancelableOperationContext{client->makeOperationContext(), _cancelToken, _executor};
+ }
+
+private:
+ const CancellationToken _cancelToken;
+ const ExecutorPtr _executor;
+};
+
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
index 6ebd1d479df..38937a2ce9e 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
@@ -36,6 +36,7 @@
#include <utility>
#include "mongo/bson/json.h"
+#include "mongo/db/cancelable_operation_context.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/client.h"
@@ -256,6 +257,13 @@ std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::_restartP
return resharding::data_copy::findHighestInsertedId(opCtx, *outputColl);
}();
+ // The BlockingResultsMerger underlying by the $mergeCursors stage records how long the
+ // recipient spent waiting for documents from the donor shards. It doing so requires the CurOp
+ // to be marked as having started.
+ auto* curOp = CurOp::get(opCtx);
+ curOp->ensureStarted();
+ ON_BLOCK_EXIT([curOp] { curOp->done(); });
+
auto pipeline = _targetAggregationRequest(
opCtx, *makePipeline(opCtx, MongoProcessInterface::create(opCtx), idToResumeFrom));
@@ -301,49 +309,9 @@ bool ReshardingCollectionCloner::doOneBatch(OperationContext* opCtx, Pipeline& p
return true;
}
-/**
- * Invokes the 'callable' function with a fresh OperationContext.
- *
- * The OperationContext is configured so the RstlKillOpThread would always interrupt the operation
- * on step-up or stepdown, regardless of whether the operation has acquired any locks. This
- * interruption is best-effort to stop doing wasteful work on stepdown as quickly as possible. It
- * isn't required for the ReshardingCollectionCloner's correctness. In particular, it is possible
- * for an OperationContext to be constructed after stepdown has finished, for the
- * ReshardingCollectionCloner to run a getMore on the aggregation against the donor shards, and for
- * the ReshardingCollectionCloner to only discover afterwards the recipient had already stepped down
- * from a NotPrimary error when inserting a batch of documents locally.
- *
- * Note that the recipient's primary-only service is responsible for managing the
- * ReshardingCollectionCloner and would shut down the ReshardingCollectionCloner's task executor
- * following the recipient stepping down.
- *
- * Also note that the ReshardingCollectionCloner is only created after step-up as part of the
- * recipient's primary-only service and therefore would never be interrupted by step-up.
- */
-template <typename Callable>
-auto ReshardingCollectionCloner::_withTemporaryOperationContext(Callable&& callable) {
- auto& client = cc();
- {
- stdx::lock_guard<Client> lk(client);
- invariant(client.canKillSystemOperationInStepdown(lk));
- }
-
- auto opCtx = client.makeOperationContext();
- opCtx->setAlwaysInterruptAtStepDownOrUp();
-
- // The BlockingResultsMerger underlying by the $mergeCursors stage records how long the
- // recipient spent waiting for documents from the donor shards. It doing so requires the CurOp
- // to be marked as having started.
- auto* curOp = CurOp::get(opCtx.get());
- curOp->ensureStarted();
- {
- ON_BLOCK_EXIT([curOp] { curOp->done(); });
- return callable(opCtx.get());
- }
-}
-
SemiFuture<void> ReshardingCollectionCloner::run(std::shared_ptr<executor::TaskExecutor> executor,
- CancellationToken cancelToken) {
+ CancellationToken cancelToken,
+ CancelableOperationContextFactory factory) {
struct ChainContext {
std::unique_ptr<Pipeline, PipelineDeleter> pipeline;
bool moreToCome = true;
@@ -351,25 +319,25 @@ SemiFuture<void> ReshardingCollectionCloner::run(std::shared_ptr<executor::TaskE
auto chainCtx = std::make_shared<ChainContext>();
- return AsyncTry([this, chainCtx] {
+ return AsyncTry([this, chainCtx, factory] {
if (!chainCtx->pipeline) {
- chainCtx->pipeline = _withTemporaryOperationContext(
- [&](auto* opCtx) { return _restartPipeline(opCtx); });
+ auto opCtx = factory.makeOperationContext(&cc());
+ chainCtx->pipeline = _restartPipeline(opCtx.get());
}
- chainCtx->moreToCome = _withTemporaryOperationContext(
- [&](auto* opCtx) { return doOneBatch(opCtx, *chainCtx->pipeline); });
+ auto opCtx = factory.makeOperationContext(&cc());
+ chainCtx->moreToCome = doOneBatch(opCtx.get(), *chainCtx->pipeline);
})
- .until([this, chainCtx, cancelToken](Status status) {
+ .until([this, chainCtx, cancelToken, factory](Status status) {
if (status.isOK() && chainCtx->moreToCome) {
return false;
}
if (chainCtx->pipeline) {
- _withTemporaryOperationContext([&](auto* opCtx) {
- chainCtx->pipeline->dispose(opCtx);
- chainCtx->pipeline.reset();
- });
+ auto opCtx = factory.makeOperationContext(&cc());
+
+ chainCtx->pipeline->dispose(opCtx.get());
+ chainCtx->pipeline.reset();
}
if (status.isA<ErrorCategory::CancellationError>() ||
@@ -411,11 +379,11 @@ SemiFuture<void> ReshardingCollectionCloner::run(std::shared_ptr<executor::TaskE
.on(executor, cancelToken)
.onCompletion([this, chainCtx](Status status) {
if (chainCtx->pipeline) {
+ auto opCtx = cc().makeOperationContext();
+
// Guarantee the pipeline is always cleaned up - even upon cancellation.
- _withTemporaryOperationContext([&](auto* opCtx) {
- chainCtx->pipeline->dispose(opCtx);
- chainCtx->pipeline.reset();
- });
+ chainCtx->pipeline->dispose(opCtx.get());
+ chainCtx->pipeline.reset();
}
// Propagate the result of the AsyncTry.
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.h b/src/mongo/db/s/resharding/resharding_collection_cloner.h
index 5f31adaddb3..eb346343fe5 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner.h
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner.h
@@ -49,6 +49,7 @@ class TaskExecutor;
} // namespace executor
+class CancelableOperationContextFactory;
class OperationContext;
class MongoProcessInterface;
class ReshardingMetrics;
@@ -93,7 +94,8 @@ public:
* (b) the cancellation token was canceled due to a stepdown or abort.
*/
SemiFuture<void> run(std::shared_ptr<executor::TaskExecutor> executor,
- CancellationToken cancelToken);
+ CancellationToken cancelToken,
+ CancelableOperationContextFactory factory);
/**
* Fetches and inserts a single batch of documents.
@@ -109,9 +111,6 @@ private:
std::unique_ptr<Pipeline, PipelineDeleter> _restartPipeline(OperationContext* opCtx);
- template <typename Callable>
- auto _withTemporaryOperationContext(Callable&& callable);
-
const std::unique_ptr<Env> _env;
const ShardKeyPattern _newShardKeyPattern;
const NamespaceString _sourceNss;
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.cpp b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
index 504930825ac..29d02d18c2f 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.cpp
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.cpp
@@ -31,6 +31,7 @@
#include "mongo/db/s/resharding/resharding_recipient_service.h"
+#include "mongo/db/cancelable_operation_context.h"
#include "mongo/db/catalog/rename_collection.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/dbdirectclient.h"
@@ -228,7 +229,14 @@ ReshardingRecipientService::RecipientStateMachine::RecipientStateMachine(
_donorShardIds{recipientDoc.getDonorShards()},
_minimumOperationDuration{Milliseconds{recipientDoc.getMinimumOperationDurationMillis()}},
_recipientCtx{recipientDoc.getMutableState()},
- _fetchTimestamp{recipientDoc.getFetchTimestamp()} {}
+ _fetchTimestamp{recipientDoc.getFetchTimestamp()},
+ _markKilledExecutor(std::make_shared<ThreadPool>([] {
+ ThreadPool::Options options;
+ options.poolName = "RecipientStateMachineCancelableOpCtxPool";
+ options.minThreads = 1;
+ options.maxThreads = 1;
+ return options;
+ }())) {}
ReshardingRecipientService::RecipientStateMachine::~RecipientStateMachine() {
stdx::lock_guard<Latch> lg(_mutex);
@@ -241,6 +249,7 @@ SemiFuture<void> ReshardingRecipientService::RecipientStateMachine::run(
std::shared_ptr<executor::ScopedTaskExecutor> executor,
const CancellationToken& stepdownToken) noexcept {
auto abortToken = _initAbortSource(stepdownToken);
+ _markKilledExecutor->startup();
return ExecutorFuture<void>(**executor)
.then([this, executor] {
@@ -517,24 +526,29 @@ ReshardingRecipientService::RecipientStateMachine::_cloneThenTransitionToApplyin
}));
}
- return whenAllSucceed(_collectionCloner->run(**executor, abortToken).thenRunOn(**executor),
- (*executor)
- ->sleepFor(_minimumOperationDuration, abortToken)
- .then([this, executor, abortToken] {
- if (_txnCloners.empty()) {
- return SemiFuture<void>::makeReady();
- }
-
- auto serviceContext = Client::getCurrent()->getServiceContext();
-
- std::vector<ExecutorFuture<void>> txnClonerFutures;
- for (auto&& txnCloner : _txnCloners) {
- txnClonerFutures.push_back(
- txnCloner->run(serviceContext, **executor, abortToken));
- }
-
- return whenAllSucceed(std::move(txnClonerFutures));
- }))
+ return whenAllSucceed(
+ _collectionCloner
+ ->run(**executor,
+ abortToken,
+ CancelableOperationContextFactory(abortToken, _markKilledExecutor))
+ .thenRunOn(**executor),
+ (*executor)
+ ->sleepFor(_minimumOperationDuration, abortToken)
+ .then([this, executor, abortToken] {
+ if (_txnCloners.empty()) {
+ return SemiFuture<void>::makeReady();
+ }
+
+ auto serviceContext = Client::getCurrent()->getServiceContext();
+
+ std::vector<ExecutorFuture<void>> txnClonerFutures;
+ for (auto&& txnCloner : _txnCloners) {
+ txnClonerFutures.push_back(
+ txnCloner->run(serviceContext, **executor, abortToken));
+ }
+
+ return whenAllSucceed(std::move(txnClonerFutures));
+ }))
.thenRunOn(**executor)
.then([this] {
// ReshardingTxnCloners must complete before the recipient transitions to kApplying to
diff --git a/src/mongo/db/s/resharding/resharding_recipient_service.h b/src/mongo/db/s/resharding/resharding_recipient_service.h
index e2e52382a71..f1c8501a4e8 100644
--- a/src/mongo/db/s/resharding/resharding_recipient_service.h
+++ b/src/mongo/db/s/resharding/resharding_recipient_service.h
@@ -212,6 +212,11 @@ private:
RecipientShardContext _recipientCtx;
boost::optional<Timestamp> _fetchTimestamp;
+ // ThreadPool used by CancelableOperationContext.
+ // CancelableOperationContext must have a thread that is always available to it to mark its
+ // opCtx as killed when the cancelToken has been cancelled.
+ const std::shared_ptr<ThreadPool> _markKilledExecutor;
+
std::unique_ptr<ReshardingCollectionCloner> _collectionCloner;
std::vector<std::unique_ptr<ReshardingTxnCloner>> _txnCloners;
diff --git a/src/mongo/db/s/resharding_test_commands.cpp b/src/mongo/db/s/resharding_test_commands.cpp
index d9b2fcd15af..6f7dfa11544 100644
--- a/src/mongo/db/s/resharding_test_commands.cpp
+++ b/src/mongo/db/s/resharding_test_commands.cpp
@@ -34,6 +34,7 @@
#include <memory>
#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/cancelable_operation_context.h"
#include "mongo/db/commands.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/s/resharding/resharding_collection_cloner.h"
@@ -99,7 +100,23 @@ public:
request().getAtClusterTime(),
request().getOutputNs());
- cloner.run(executor, opCtx->getCancellationToken()).get(opCtx);
+ std::shared_ptr<ThreadPool> cancelableOperationContextPool = [] {
+ ThreadPool::Options options;
+ options.poolName = "TestReshardingCollectionClonerCancelableOpCtxPool";
+ options.minThreads = 1;
+ options.maxThreads = 1;
+
+ auto threadPool = std::make_shared<ThreadPool>(std::move(options));
+ threadPool->startup();
+ return threadPool;
+ }();
+
+ cloner
+ .run(executor,
+ opCtx->getCancellationToken(),
+ CancelableOperationContextFactory(opCtx->getCancellationToken(),
+ cancelableOperationContextPool))
+ .get(opCtx);
}
private: