summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2020-11-10 12:40:47 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-11-10 12:59:03 +0000
commit4e4ebb8fef276c28011aebecf8a0ed12e69a9f9b (patch)
tree19db3f989babad664d32c5275858f1cf5f3aec30
parent9e8989bfd5dd36bf835247130d1d1229806802bf (diff)
downloadmongo-4e4ebb8fef276c28011aebecf8a0ed12e69a9f9b.tar.gz
SERVER-52690 Run ReshardingCollectionCloner on a TaskExecutor.
Restructures the ReshardingCollectionCloner class to run on a separate thread and returned a mongo::Future readied when the cloning procedure has completed successfully or failed with an error. Also enables the ReshardingCollectionCloner to insert documents in batches of size > 1 in a single storage transaction. The reshardingCollectionClonerBatchSizeInBytes server parameter can be used to control the size of the insert batch.
-rw-r--r--src/mongo/db/commands/resharding_test_commands.cpp19
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner.cpp218
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner.h43
-rw-r--r--src/mongo/db/s/resharding/resharding_server_parameters.idl48
5 files changed, 265 insertions, 64 deletions
diff --git a/src/mongo/db/commands/resharding_test_commands.cpp b/src/mongo/db/commands/resharding_test_commands.cpp
index 427f9babcc1..fe1343879aa 100644
--- a/src/mongo/db/commands/resharding_test_commands.cpp
+++ b/src/mongo/db/commands/resharding_test_commands.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/commands/resharding_test_commands_gen.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/s/resharding/resharding_collection_cloner.h"
+#include "mongo/s/grid.h"
namespace mongo {
namespace {
@@ -50,13 +51,17 @@ public:
using InvocationBase::InvocationBase;
void typedRun(OperationContext* opCtx) {
- ReshardingCollectionCloner cloner(
- ShardKeyPattern(request().getShardKey()), ns(), request().getUuid());
-
- cloner.runPipeline(opCtx,
- request().getShardId(),
- request().getAtClusterTime(),
- request().getOutputNs());
+ ReshardingCollectionCloner cloner(ShardKeyPattern(request().getShardKey()),
+ ns(),
+ request().getUuid(),
+ request().getShardId(),
+ request().getAtClusterTime(),
+ request().getOutputNs());
+
+ cloner
+ .run(opCtx->getServiceContext(),
+ Grid::get(opCtx)->getExecutorPool()->getFixedExecutor())
+ .get();
}
private:
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index e0d3a8dcfe7..d259e971d6f 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -80,6 +80,7 @@ env.Library(
'resharding/resharding_oplog_applier.cpp',
'resharding/resharding_oplog_fetcher.cpp',
'resharding/resharding_recipient_service.cpp',
+ 'resharding/resharding_server_parameters.idl',
'scoped_operation_completion_sharding_actions.cpp',
'session_catalog_migration_destination.cpp',
'session_catalog_migration_source.cpp',
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
index b393380c237..699df230b40 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
@@ -33,26 +33,40 @@
#include "mongo/db/s/resharding/resharding_collection_cloner.h"
+#include <utility>
+
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog_raii.h"
+#include "mongo/db/client.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/curop.h"
#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/db/pipeline/sharded_agg_helpers.h"
+#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/s/resharding_util.h"
+#include "mongo/db/service_context.h"
#include "mongo/db/storage/write_unit_of_work.h"
+#include "mongo/executor/task_executor.h"
#include "mongo/logv2/log.h"
+#include "mongo/util/scopeguard.h"
#include "mongo/util/str.h"
namespace mongo {
ReshardingCollectionCloner::ReshardingCollectionCloner(ShardKeyPattern newShardKeyPattern,
NamespaceString sourceNss,
- CollectionUUID sourceUUID)
+ CollectionUUID sourceUUID,
+ ShardId recipientShard,
+ Timestamp atClusterTime,
+ NamespaceString outputNss)
: _newShardKeyPattern(std::move(newShardKeyPattern)),
_sourceNss(std::move(sourceNss)),
- _sourceUUID(std::move(sourceUUID)) {}
+ _sourceUUID(std::move(sourceUUID)),
+ _recipientShard(std::move(recipientShard)),
+ _atClusterTime(atClusterTime),
+ _outputNss(std::move(outputNss)) {}
-std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::buildCursor(
+std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::_makePipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const ShardId& recipientShard,
Timestamp atClusterTime,
@@ -69,61 +83,167 @@ std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::buildCurs
<< repl::readConcernLevels::kSnapshotName
<< repl::ReadConcernArgs::kAtClusterTimeFieldName
<< atClusterTime));
+ // TODO SERVER-52692: Set read preference to nearest.
+ // request.setUnwrappedReadPref();
return sharded_agg_helpers::targetShardsAndAddMergeCursors(std::move(expCtx),
std::move(request));
}
-void ReshardingCollectionCloner::runPipeline(OperationContext* opCtx,
- const ShardId& recipientShard,
- Timestamp atClusterTime,
- const NamespaceString& outputNss) {
- // Assume that the input collection isn't a view. The collectionUUID parameter to the aggregate
- // would enforce this anyway.
- StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
- resolvedNamespaces[_sourceNss.coll()] = {_sourceNss, std::vector<BSONObj>{}};
-
- // Assume that the config.cache.chunks collection isn't a view either.
- auto tempNss = constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID);
- auto tempCacheChunksNss =
- NamespaceString(NamespaceString::kConfigDb, "cache.chunks." + tempNss.ns());
- resolvedNamespaces[tempCacheChunksNss.coll()] = {tempCacheChunksNss, std::vector<BSONObj>{}};
-
- auto expCtx = make_intrusive<ExpressionContext>(opCtx,
- boost::none, /* explain */
- false, /* fromMongos */
- false, /* needsMerge */
- false, /* allowDiskUse */
- false, /* bypassDocumentValidation */
- false, /* isMapReduceCommand */
- _sourceNss,
- boost::none, /* runtimeConstants */
- nullptr, /* collator */
- MongoProcessInterface::create(opCtx),
- std::move(resolvedNamespaces),
- _sourceUUID);
-
- auto pipeline = buildCursor(expCtx, recipientShard, atClusterTime, outputNss);
- invariant(!opCtx->lockState()->inAWriteUnitOfWork());
-
- auto doc = pipeline->getNext();
- while (doc) {
+std::vector<InsertStatement> ReshardingCollectionCloner::_fillBatch(Pipeline& pipeline) {
+ std::vector<InsertStatement> batch;
+
+ int numBytes = 0;
+ do {
+ auto doc = pipeline.getNext();
+ if (!doc) {
+ break;
+ }
+
auto obj = doc->toBson();
+ batch.emplace_back(obj.getOwned());
+ numBytes += obj.objsize();
+ } while (numBytes < resharding::gReshardingCollectionClonerBatchSizeInBytes);
- // TODO: Do some amount of batching for inserts.
- writeConflictRetry(opCtx, "reshardingCollectionClonerInsertDocument", outputNss.ns(), [&] {
- AutoGetCollection outputColl(opCtx, outputNss, MODE_IX);
- uassert(ErrorCodes::NamespaceNotFound,
- str::stream() << "Resharding collection cloner's output collection '"
- << outputNss << "' did not already exist",
- outputColl);
- WriteUnitOfWork wuow(opCtx);
- uassertStatusOK(outputColl->insertDocument(opCtx, InsertStatement{obj}, nullptr));
- wuow.commit();
- });
+ return batch;
+}
+
+void ReshardingCollectionCloner::_insertBatch(OperationContext* opCtx,
+ std::vector<InsertStatement>& batch) {
+ writeConflictRetry(opCtx, "ReshardingCollectionCloner::_insertBatch", _outputNss.ns(), [&] {
+ AutoGetCollection outputColl(opCtx, _outputNss, MODE_IX);
+ uassert(ErrorCodes::NamespaceNotFound,
+ str::stream() << "Resharding collection cloner's output collection '" << _outputNss
+ << "' did not already exist",
+ outputColl);
+ WriteUnitOfWork wuow(opCtx);
+
+ // Populate 'slots' with new optimes for each insert.
+ // This also notifies the storage engine of each new timestamp.
+ auto oplogSlots = repl::getNextOpTimes(opCtx, batch.size());
+ for (auto [insert, slot] = std::make_pair(batch.begin(), oplogSlots.begin());
+ slot != oplogSlots.end();
+ ++insert, ++slot) {
+ invariant(insert != batch.end());
+ insert->oplogSlot = *slot;
+ }
+
+ uassertStatusOK(outputColl->insertDocuments(opCtx, batch.begin(), batch.end(), nullptr));
+ wuow.commit();
+ });
+}
+
+/**
+ * Invokes the 'callable' function with a fresh OperationContext.
+ *
+ * The OperationContext is configured so the RstlKillOpThread would always interrupt the operation
+ * on step-up or stepdown, regardless of whether the operation has acquired any locks. This
+ * interruption is best-effort to stop doing wasteful work on stepdown as quickly as possible. It
+ * isn't required for the ReshardingCollectionCloner's correctness. In particular, it is possible
+ * for an OperationContext to be constructed after stepdown has finished, for the
+ * ReshardingCollectionCloner to run a getMore on the aggregation against the donor shards, and for
+ * the ReshardingCollectionCloner to only discover afterwards the recipient had already stepped down
+ * from a NotPrimary error when inserting a batch of documents locally.
+ *
+ * Note that the recipient's primary-only service is responsible for managing the
+ * ReshardingCollectionCloner and would shut down the ReshardingCollectionCloner's task executor
+ * following the recipient stepping down.
+ *
+ * Also note that the ReshardingCollectionCloner is only created after step-up as part of the
+ * recipient's primary-only service and therefore would never be interrupted by step-up.
+ */
+template <typename Callable>
+auto ReshardingCollectionCloner::_withTemporaryOperationContext(ServiceContext* serviceContext,
+ Callable&& callable) {
+ ThreadClient tc(kClientName, serviceContext);
+ {
+ stdx::lock_guard<Client> lk(*tc.get());
+ tc->setSystemOperationKillableByStepdown(lk);
+ }
+
+ auto opCtx = tc->makeOperationContext();
+ opCtx->setAlwaysInterruptAtStepDownOrUp();
+
+ // The BlockingResultsMerger underlying by the $mergeCursors stage records how long the
+ // recipient spent waiting for documents from the donor shards. It doing so requires the CurOp
+ // to be marked as having started.
+ auto* curOp = CurOp::get(opCtx.get());
+ curOp->ensureStarted();
+ {
+ ON_BLOCK_EXIT([curOp] { curOp->done(); });
+ return callable(opCtx.get());
+ }
+}
- doc = pipeline->getNext();
+ExecutorFuture<void> ReshardingCollectionCloner::_insertBatchesUntilPipelineExhausted(
+ ServiceContext* serviceContext,
+ std::shared_ptr<executor::TaskExecutor> executor,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline) {
+ bool moreToCome = _withTemporaryOperationContext(serviceContext, [&](auto* opCtx) {
+ pipeline->reattachToOperationContext(opCtx);
+ auto batch = _fillBatch(*pipeline);
+ pipeline->detachFromOperationContext();
+
+ if (batch.empty()) {
+ return false;
+ }
+
+ _insertBatch(opCtx, batch);
+ return true;
+ });
+
+ if (!moreToCome) {
+ return ExecutorFuture(std::move(executor));
}
+
+ return ExecutorFuture(executor, std::move(pipeline))
+ .then([this, serviceContext, executor](auto pipeline) {
+ return _insertBatchesUntilPipelineExhausted(
+ serviceContext, std::move(executor), std::move(pipeline));
+ });
+}
+
+ExecutorFuture<void> ReshardingCollectionCloner::run(
+ ServiceContext* serviceContext, std::shared_ptr<executor::TaskExecutor> executor) {
+ return ExecutorFuture(executor)
+ .then([this, serviceContext] {
+ return _withTemporaryOperationContext(serviceContext, [&](auto* opCtx) {
+ // Assume that the input collection isn't a view. The collectionUUID parameter to
+ // the aggregate would enforce this anyway.
+ StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
+ resolvedNamespaces[_sourceNss.coll()] = {_sourceNss, std::vector<BSONObj>{}};
+
+ // Assume that the config.cache.chunks collection isn't a view either.
+ auto tempNss = constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID);
+ auto tempCacheChunksNss =
+ NamespaceString(NamespaceString::kConfigDb, "cache.chunks." + tempNss.ns());
+ resolvedNamespaces[tempCacheChunksNss.coll()] = {tempCacheChunksNss,
+ std::vector<BSONObj>{}};
+
+ auto expCtx =
+ make_intrusive<ExpressionContext>(opCtx,
+ boost::none, /* explain */
+ false, /* fromMongos */
+ false, /* needsMerge */
+ false, /* allowDiskUse */
+ false, /* bypassDocumentValidation */
+ false, /* isMapReduceCommand */
+ _sourceNss,
+ boost::none, /* runtimeConstants */
+ nullptr, /* collator */
+ MongoProcessInterface::create(opCtx),
+ std::move(resolvedNamespaces),
+ _sourceUUID);
+
+ auto pipeline = _makePipeline(expCtx, _recipientShard, _atClusterTime, _outputNss);
+ pipeline->detachFromOperationContext();
+ return pipeline;
+ });
+ })
+ .then([this, serviceContext, executor](auto pipeline) {
+ return _insertBatchesUntilPipelineExhausted(
+ serviceContext, std::move(executor), std::move(pipeline));
+ });
}
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.h b/src/mongo/db/s/resharding/resharding_collection_cloner.h
index 092c2e0e3c5..8b9b467a718 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner.h
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner.h
@@ -27,41 +27,68 @@
* it in the license file.
*/
-
#pragma once
+#include <memory>
+
+#include "mongo/base/string_data.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/catalog/collection_catalog.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/pipeline.h"
+#include "mongo/db/repl/oplog.h"
#include "mongo/s/shard_key_pattern.h"
+#include "mongo/util/future.h"
namespace mongo {
-class OperationContext;
+namespace executor {
+
+class TaskExecutor;
+
+} // namespace executor
+
+class ServiceContext;
class ReshardingCollectionCloner {
public:
ReshardingCollectionCloner(ShardKeyPattern newShardKeyPattern,
NamespaceString sourceNss,
- CollectionUUID sourceUUID);
+ CollectionUUID sourceUUID,
+ ShardId recipientShard,
+ Timestamp atClusterTime,
+ NamespaceString outputNss);
- void runPipeline(OperationContext* opCtx,
- const ShardId& recipientShard,
- Timestamp atClusterTime,
- const NamespaceString& outputNss);
+ ExecutorFuture<void> run(ServiceContext* serviceContext,
+ std::shared_ptr<executor::TaskExecutor>);
private:
- std::unique_ptr<Pipeline, PipelineDeleter> buildCursor(
+ static constexpr StringData kClientName = "ReshardingCollectionCloner"_sd;
+
+ std::unique_ptr<Pipeline, PipelineDeleter> _makePipeline(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
const ShardId& recipientShard,
Timestamp atClusterTime,
const NamespaceString& outputNss);
+ std::vector<InsertStatement> _fillBatch(Pipeline& pipeline);
+ void _insertBatch(OperationContext* opCtx, std::vector<InsertStatement>& batch);
+
+ template <typename Callable>
+ auto _withTemporaryOperationContext(ServiceContext* serviceContext, Callable&& callable);
+
+ ExecutorFuture<void> _insertBatchesUntilPipelineExhausted(
+ ServiceContext* serviceContext,
+ std::shared_ptr<executor::TaskExecutor> executor,
+ std::unique_ptr<Pipeline, PipelineDeleter> pipeline);
+
const ShardKeyPattern _newShardKeyPattern;
const NamespaceString _sourceNss;
const CollectionUUID _sourceUUID;
+ const ShardId _recipientShard;
+ const Timestamp _atClusterTime;
+ const NamespaceString _outputNss;
};
} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_server_parameters.idl b/src/mongo/db/s/resharding/resharding_server_parameters.idl
new file mode 100644
index 00000000000..0699a2272c5
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_server_parameters.idl
@@ -0,0 +1,48 @@
+# Copyright (C) 2020-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+# Server parameters for resharding.
+
+global:
+ cpp_namespace: "mongo::resharding"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+server_parameters:
+ reshardingCollectionClonerBatchSizeInBytes:
+ description: >-
+ Limit for the number of bytes of data inserted per storage transaction (WriteUnitOfWork)
+ by ReshardingCollectionCloner.
+ set_at: startup
+ cpp_vartype: int
+ cpp_varname: gReshardingCollectionClonerBatchSizeInBytes
+ default:
+ expr: 100 * 1024
+ validator:
+ gte: 1