diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2020-11-10 12:40:47 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-11-10 12:59:03 +0000 |
commit | 4e4ebb8fef276c28011aebecf8a0ed12e69a9f9b (patch) | |
tree | 19db3f989babad664d32c5275858f1cf5f3aec30 | |
parent | 9e8989bfd5dd36bf835247130d1d1229806802bf (diff) | |
download | mongo-4e4ebb8fef276c28011aebecf8a0ed12e69a9f9b.tar.gz |
SERVER-52690 Run ReshardingCollectionCloner on a TaskExecutor.
Restructures the ReshardingCollectionCloner class to run on a separate
thread and returned a mongo::Future readied when the cloning procedure
has completed successfully or failed with an error.
Also enables the ReshardingCollectionCloner to insert documents in
batches of size > 1 in a single storage transaction. The
reshardingCollectionClonerBatchSizeInBytes server parameter can be used
to control the size of the insert batch.
-rw-r--r-- | src/mongo/db/commands/resharding_test_commands.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_collection_cloner.cpp | 218 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_collection_cloner.h | 43 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_server_parameters.idl | 48 |
5 files changed, 265 insertions, 64 deletions
diff --git a/src/mongo/db/commands/resharding_test_commands.cpp b/src/mongo/db/commands/resharding_test_commands.cpp index 427f9babcc1..fe1343879aa 100644 --- a/src/mongo/db/commands/resharding_test_commands.cpp +++ b/src/mongo/db/commands/resharding_test_commands.cpp @@ -36,6 +36,7 @@ #include "mongo/db/commands/resharding_test_commands_gen.h" #include "mongo/db/operation_context.h" #include "mongo/db/s/resharding/resharding_collection_cloner.h" +#include "mongo/s/grid.h" namespace mongo { namespace { @@ -50,13 +51,17 @@ public: using InvocationBase::InvocationBase; void typedRun(OperationContext* opCtx) { - ReshardingCollectionCloner cloner( - ShardKeyPattern(request().getShardKey()), ns(), request().getUuid()); - - cloner.runPipeline(opCtx, - request().getShardId(), - request().getAtClusterTime(), - request().getOutputNs()); + ReshardingCollectionCloner cloner(ShardKeyPattern(request().getShardKey()), + ns(), + request().getUuid(), + request().getShardId(), + request().getAtClusterTime(), + request().getOutputNs()); + + cloner + .run(opCtx->getServiceContext(), + Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()) + .get(); } private: diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index e0d3a8dcfe7..d259e971d6f 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -80,6 +80,7 @@ env.Library( 'resharding/resharding_oplog_applier.cpp', 'resharding/resharding_oplog_fetcher.cpp', 'resharding/resharding_recipient_service.cpp', + 'resharding/resharding_server_parameters.idl', 'scoped_operation_completion_sharding_actions.cpp', 'session_catalog_migration_destination.cpp', 'session_catalog_migration_source.cpp', diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp index b393380c237..699df230b40 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp +++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp @@ -33,26 +33,40 @@ #include "mongo/db/s/resharding/resharding_collection_cloner.h" +#include <utility> + #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog_raii.h" +#include "mongo/db/client.h" #include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/curop.h" #include "mongo/db/pipeline/aggregation_request.h" #include "mongo/db/pipeline/sharded_agg_helpers.h" +#include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/s/resharding_util.h" +#include "mongo/db/service_context.h" #include "mongo/db/storage/write_unit_of_work.h" +#include "mongo/executor/task_executor.h" #include "mongo/logv2/log.h" +#include "mongo/util/scopeguard.h" #include "mongo/util/str.h" namespace mongo { ReshardingCollectionCloner::ReshardingCollectionCloner(ShardKeyPattern newShardKeyPattern, NamespaceString sourceNss, - CollectionUUID sourceUUID) + CollectionUUID sourceUUID, + ShardId recipientShard, + Timestamp atClusterTime, + NamespaceString outputNss) : _newShardKeyPattern(std::move(newShardKeyPattern)), _sourceNss(std::move(sourceNss)), - _sourceUUID(std::move(sourceUUID)) {} + _sourceUUID(std::move(sourceUUID)), + _recipientShard(std::move(recipientShard)), + _atClusterTime(atClusterTime), + _outputNss(std::move(outputNss)) {} -std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::buildCursor( +std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::_makePipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, const ShardId& recipientShard, Timestamp atClusterTime, @@ -69,61 +83,167 @@ std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::buildCurs << repl::readConcernLevels::kSnapshotName << repl::ReadConcernArgs::kAtClusterTimeFieldName << atClusterTime)); + // TODO SERVER-52692: Set read preference to nearest. + // request.setUnwrappedReadPref(); return sharded_agg_helpers::targetShardsAndAddMergeCursors(std::move(expCtx), std::move(request)); } -void ReshardingCollectionCloner::runPipeline(OperationContext* opCtx, - const ShardId& recipientShard, - Timestamp atClusterTime, - const NamespaceString& outputNss) { - // Assume that the input collection isn't a view. The collectionUUID parameter to the aggregate - // would enforce this anyway. - StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; - resolvedNamespaces[_sourceNss.coll()] = {_sourceNss, std::vector<BSONObj>{}}; - - // Assume that the config.cache.chunks collection isn't a view either. - auto tempNss = constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID); - auto tempCacheChunksNss = - NamespaceString(NamespaceString::kConfigDb, "cache.chunks." + tempNss.ns()); - resolvedNamespaces[tempCacheChunksNss.coll()] = {tempCacheChunksNss, std::vector<BSONObj>{}}; - - auto expCtx = make_intrusive<ExpressionContext>(opCtx, - boost::none, /* explain */ - false, /* fromMongos */ - false, /* needsMerge */ - false, /* allowDiskUse */ - false, /* bypassDocumentValidation */ - false, /* isMapReduceCommand */ - _sourceNss, - boost::none, /* runtimeConstants */ - nullptr, /* collator */ - MongoProcessInterface::create(opCtx), - std::move(resolvedNamespaces), - _sourceUUID); - - auto pipeline = buildCursor(expCtx, recipientShard, atClusterTime, outputNss); - invariant(!opCtx->lockState()->inAWriteUnitOfWork()); - - auto doc = pipeline->getNext(); - while (doc) { +std::vector<InsertStatement> ReshardingCollectionCloner::_fillBatch(Pipeline& pipeline) { + std::vector<InsertStatement> batch; + + int numBytes = 0; + do { + auto doc = pipeline.getNext(); + if (!doc) { + break; + } + auto obj = doc->toBson(); + batch.emplace_back(obj.getOwned()); + numBytes += obj.objsize(); + } while (numBytes < resharding::gReshardingCollectionClonerBatchSizeInBytes); - // TODO: Do some amount of batching for inserts. - writeConflictRetry(opCtx, "reshardingCollectionClonerInsertDocument", outputNss.ns(), [&] { - AutoGetCollection outputColl(opCtx, outputNss, MODE_IX); - uassert(ErrorCodes::NamespaceNotFound, - str::stream() << "Resharding collection cloner's output collection '" - << outputNss << "' did not already exist", - outputColl); - WriteUnitOfWork wuow(opCtx); - uassertStatusOK(outputColl->insertDocument(opCtx, InsertStatement{obj}, nullptr)); - wuow.commit(); - }); + return batch; +} + +void ReshardingCollectionCloner::_insertBatch(OperationContext* opCtx, + std::vector<InsertStatement>& batch) { + writeConflictRetry(opCtx, "ReshardingCollectionCloner::_insertBatch", _outputNss.ns(), [&] { + AutoGetCollection outputColl(opCtx, _outputNss, MODE_IX); + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "Resharding collection cloner's output collection '" << _outputNss + << "' did not already exist", + outputColl); + WriteUnitOfWork wuow(opCtx); + + // Populate 'slots' with new optimes for each insert. + // This also notifies the storage engine of each new timestamp. + auto oplogSlots = repl::getNextOpTimes(opCtx, batch.size()); + for (auto [insert, slot] = std::make_pair(batch.begin(), oplogSlots.begin()); + slot != oplogSlots.end(); + ++insert, ++slot) { + invariant(insert != batch.end()); + insert->oplogSlot = *slot; + } + + uassertStatusOK(outputColl->insertDocuments(opCtx, batch.begin(), batch.end(), nullptr)); + wuow.commit(); + }); +} + +/** + * Invokes the 'callable' function with a fresh OperationContext. + * + * The OperationContext is configured so the RstlKillOpThread would always interrupt the operation + * on step-up or stepdown, regardless of whether the operation has acquired any locks. This + * interruption is best-effort to stop doing wasteful work on stepdown as quickly as possible. It + * isn't required for the ReshardingCollectionCloner's correctness. In particular, it is possible + * for an OperationContext to be constructed after stepdown has finished, for the + * ReshardingCollectionCloner to run a getMore on the aggregation against the donor shards, and for + * the ReshardingCollectionCloner to only discover afterwards the recipient had already stepped down + * from a NotPrimary error when inserting a batch of documents locally. + * + * Note that the recipient's primary-only service is responsible for managing the + * ReshardingCollectionCloner and would shut down the ReshardingCollectionCloner's task executor + * following the recipient stepping down. + * + * Also note that the ReshardingCollectionCloner is only created after step-up as part of the + * recipient's primary-only service and therefore would never be interrupted by step-up. + */ +template <typename Callable> +auto ReshardingCollectionCloner::_withTemporaryOperationContext(ServiceContext* serviceContext, + Callable&& callable) { + ThreadClient tc(kClientName, serviceContext); + { + stdx::lock_guard<Client> lk(*tc.get()); + tc->setSystemOperationKillableByStepdown(lk); + } + + auto opCtx = tc->makeOperationContext(); + opCtx->setAlwaysInterruptAtStepDownOrUp(); + + // The BlockingResultsMerger underlying by the $mergeCursors stage records how long the + // recipient spent waiting for documents from the donor shards. It doing so requires the CurOp + // to be marked as having started. + auto* curOp = CurOp::get(opCtx.get()); + curOp->ensureStarted(); + { + ON_BLOCK_EXIT([curOp] { curOp->done(); }); + return callable(opCtx.get()); + } +} - doc = pipeline->getNext(); +ExecutorFuture<void> ReshardingCollectionCloner::_insertBatchesUntilPipelineExhausted( + ServiceContext* serviceContext, + std::shared_ptr<executor::TaskExecutor> executor, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline) { + bool moreToCome = _withTemporaryOperationContext(serviceContext, [&](auto* opCtx) { + pipeline->reattachToOperationContext(opCtx); + auto batch = _fillBatch(*pipeline); + pipeline->detachFromOperationContext(); + + if (batch.empty()) { + return false; + } + + _insertBatch(opCtx, batch); + return true; + }); + + if (!moreToCome) { + return ExecutorFuture(std::move(executor)); } + + return ExecutorFuture(executor, std::move(pipeline)) + .then([this, serviceContext, executor](auto pipeline) { + return _insertBatchesUntilPipelineExhausted( + serviceContext, std::move(executor), std::move(pipeline)); + }); +} + +ExecutorFuture<void> ReshardingCollectionCloner::run( + ServiceContext* serviceContext, std::shared_ptr<executor::TaskExecutor> executor) { + return ExecutorFuture(executor) + .then([this, serviceContext] { + return _withTemporaryOperationContext(serviceContext, [&](auto* opCtx) { + // Assume that the input collection isn't a view. The collectionUUID parameter to + // the aggregate would enforce this anyway. + StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; + resolvedNamespaces[_sourceNss.coll()] = {_sourceNss, std::vector<BSONObj>{}}; + + // Assume that the config.cache.chunks collection isn't a view either. + auto tempNss = constructTemporaryReshardingNss(_sourceNss.db(), _sourceUUID); + auto tempCacheChunksNss = + NamespaceString(NamespaceString::kConfigDb, "cache.chunks." + tempNss.ns()); + resolvedNamespaces[tempCacheChunksNss.coll()] = {tempCacheChunksNss, + std::vector<BSONObj>{}}; + + auto expCtx = + make_intrusive<ExpressionContext>(opCtx, + boost::none, /* explain */ + false, /* fromMongos */ + false, /* needsMerge */ + false, /* allowDiskUse */ + false, /* bypassDocumentValidation */ + false, /* isMapReduceCommand */ + _sourceNss, + boost::none, /* runtimeConstants */ + nullptr, /* collator */ + MongoProcessInterface::create(opCtx), + std::move(resolvedNamespaces), + _sourceUUID); + + auto pipeline = _makePipeline(expCtx, _recipientShard, _atClusterTime, _outputNss); + pipeline->detachFromOperationContext(); + return pipeline; + }); + }) + .then([this, serviceContext, executor](auto pipeline) { + return _insertBatchesUntilPipelineExhausted( + serviceContext, std::move(executor), std::move(pipeline)); + }); } } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.h b/src/mongo/db/s/resharding/resharding_collection_cloner.h index 092c2e0e3c5..8b9b467a718 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner.h +++ b/src/mongo/db/s/resharding/resharding_collection_cloner.h @@ -27,41 +27,68 @@ * it in the license file. */ - #pragma once +#include <memory> + +#include "mongo/base/string_data.h" #include "mongo/bson/timestamp.h" #include "mongo/db/catalog/collection_catalog.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/pipeline.h" +#include "mongo/db/repl/oplog.h" #include "mongo/s/shard_key_pattern.h" +#include "mongo/util/future.h" namespace mongo { -class OperationContext; +namespace executor { + +class TaskExecutor; + +} // namespace executor + +class ServiceContext; class ReshardingCollectionCloner { public: ReshardingCollectionCloner(ShardKeyPattern newShardKeyPattern, NamespaceString sourceNss, - CollectionUUID sourceUUID); + CollectionUUID sourceUUID, + ShardId recipientShard, + Timestamp atClusterTime, + NamespaceString outputNss); - void runPipeline(OperationContext* opCtx, - const ShardId& recipientShard, - Timestamp atClusterTime, - const NamespaceString& outputNss); + ExecutorFuture<void> run(ServiceContext* serviceContext, + std::shared_ptr<executor::TaskExecutor>); private: - std::unique_ptr<Pipeline, PipelineDeleter> buildCursor( + static constexpr StringData kClientName = "ReshardingCollectionCloner"_sd; + + std::unique_ptr<Pipeline, PipelineDeleter> _makePipeline( const boost::intrusive_ptr<ExpressionContext>& expCtx, const ShardId& recipientShard, Timestamp atClusterTime, const NamespaceString& outputNss); + std::vector<InsertStatement> _fillBatch(Pipeline& pipeline); + void _insertBatch(OperationContext* opCtx, std::vector<InsertStatement>& batch); + + template <typename Callable> + auto _withTemporaryOperationContext(ServiceContext* serviceContext, Callable&& callable); + + ExecutorFuture<void> _insertBatchesUntilPipelineExhausted( + ServiceContext* serviceContext, + std::shared_ptr<executor::TaskExecutor> executor, + std::unique_ptr<Pipeline, PipelineDeleter> pipeline); + const ShardKeyPattern _newShardKeyPattern; const NamespaceString _sourceNss; const CollectionUUID _sourceUUID; + const ShardId _recipientShard; + const Timestamp _atClusterTime; + const NamespaceString _outputNss; }; } // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_server_parameters.idl b/src/mongo/db/s/resharding/resharding_server_parameters.idl new file mode 100644 index 00000000000..0699a2272c5 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_server_parameters.idl @@ -0,0 +1,48 @@ +# Copyright (C) 2020-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# <http://www.mongodb.com/licensing/server-side-public-license>. +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +# Server parameters for resharding. + +global: + cpp_namespace: "mongo::resharding" + +imports: + - "mongo/idl/basic_types.idl" + +server_parameters: + reshardingCollectionClonerBatchSizeInBytes: + description: >- + Limit for the number of bytes of data inserted per storage transaction (WriteUnitOfWork) + by ReshardingCollectionCloner. + set_at: startup + cpp_vartype: int + cpp_varname: gReshardingCollectionClonerBatchSizeInBytes + default: + expr: 100 * 1024 + validator: + gte: 1 |