diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2021-01-08 14:49:27 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-01-08 15:32:55 +0000 |
commit | 0416c69269754a28d98c6288b09738d98c8bdfa2 (patch) | |
tree | d1be9212a0eb6fca35231407d6299a2538096ef2 /src | |
parent | 6221e2ee0ff5527f13d25a28a5de6a245bd0e641 (diff) | |
download | mongo-0416c69269754a28d98c6288b09738d98c8bdfa2.tar.gz |
SERVER-49293 Make resharding collection cloning resume from highest _id.
Diffstat (limited to 'src')
5 files changed, 159 insertions, 23 deletions
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp index 039288dc823..8a0fbc10998 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp +++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp @@ -600,7 +600,8 @@ void abandonCacheIfSentToShards(Pipeline* shardsPipeline) { std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors( const boost::intrusive_ptr<ExpressionContext>& expCtx, - stdx::variant<std::unique_ptr<Pipeline, PipelineDeleter>, AggregateCommand> targetRequest) { + stdx::variant<std::unique_ptr<Pipeline, PipelineDeleter>, AggregateCommand> targetRequest, + boost::optional<BSONObj> shardCursorsSortSpec) { auto&& [aggRequest, pipeline] = [&] { return stdx::visit( visit_helper::Overloaded{ @@ -642,10 +643,12 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors( } std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline; - boost::optional<BSONObj> shardCursorsSortSpec = boost::none; if (shardDispatchResults.splitPipeline) { mergePipeline = std::move(shardDispatchResults.splitPipeline->mergePipeline); - shardCursorsSortSpec = shardDispatchResults.splitPipeline->shardCursorsSortSpec; + if (shardDispatchResults.splitPipeline->shardCursorsSortSpec) { + uassert(4929304, "Split pipeline provides its own sort already", !shardCursorsSortSpec); + shardCursorsSortSpec = shardDispatchResults.splitPipeline->shardCursorsSortSpec; + } } else { // We have not split the pipeline, and will execute entirely on the remote shards. Set up an // empty local pipeline which we will attach the merge cursors stage to. diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h index b2be0661aab..2f8831f8bfa 100644 --- a/src/mongo/db/pipeline/sharded_agg_helpers.h +++ b/src/mongo/db/pipeline/sharded_agg_helpers.h @@ -204,7 +204,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(Pipeline* owne */ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors( const boost::intrusive_ptr<ExpressionContext>& expCtx, - stdx::variant<std::unique_ptr<Pipeline, PipelineDeleter>, AggregateCommand> targetRequest); + stdx::variant<std::unique_ptr<Pipeline, PipelineDeleter>, AggregateCommand> targetRequest, + boost::optional<BSONObj> shardCursorsSortSpec = boost::none); /** * For a sharded or unsharded collection, establishes a remote cursor on only the specified shard, diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp index 5819b96c22f..67e4f844de7 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp +++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp @@ -41,6 +41,7 @@ #include "mongo/db/client.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop.h" +#include "mongo/db/dbhelpers.h" #include "mongo/db/exec/document_value/document.h" #include "mongo/db/exec/document_value/value.h" #include "mongo/db/pipeline/aggregation_request_helper.h" @@ -48,6 +49,7 @@ #include "mongo/db/pipeline/document_source_match.h" #include "mongo/db/pipeline/document_source_replace_root.h" #include "mongo/db/pipeline/sharded_agg_helpers.h" +#include "mongo/db/query/query_request.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/s/resharding_util.h" #include "mongo/db/service_context.h" @@ -60,6 +62,20 @@ #include "mongo/util/str.h" namespace mongo { +namespace { + +bool collectionHasSimpleCollation(OperationContext* opCtx, const NamespaceString& nss) { + auto catalogCache = Grid::get(opCtx)->catalogCache(); + auto sourceChunkMgr = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss)); + + uassert(ErrorCodes::NamespaceNotSharded, + str::stream() << "Expected collection " << nss << " to be sharded", + sourceChunkMgr.isSharded()); + + return !sourceChunkMgr.getDefaultCollator(); +} + +} // namespace ReshardingCollectionCloner::ReshardingCollectionCloner(ShardKeyPattern newShardKeyPattern, NamespaceString sourceNss, @@ -75,7 +91,9 @@ ReshardingCollectionCloner::ReshardingCollectionCloner(ShardKeyPattern newShardK _outputNss(std::move(outputNss)) {} std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::makePipeline( - OperationContext* opCtx, std::shared_ptr<MongoProcessInterface> mongoProcessInterface) { + OperationContext* opCtx, + std::shared_ptr<MongoProcessInterface> mongoProcessInterface, + Value resumeId) { using Doc = Document; using Arr = std::vector<Value>; using V = Value; @@ -91,6 +109,19 @@ std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::makePipel NamespaceString(NamespaceString::kConfigDb, "cache.chunks." + tempNss.ns()); resolvedNamespaces[tempCacheChunksNss.coll()] = {tempCacheChunksNss, std::vector<BSONObj>{}}; + // sharded_agg_helpers::targetShardsAndAddMergeCursors() ignores the collation set on the + // AggregationRequest (or lack thereof) and instead only considers the collator set on the + // ExpressionContext. Setting nullptr as the collator on the ExpressionContext means that the + // aggregation pipeline is always using the "simple" collation, even when the collection default + // collation for _sourceNss is non-simple. The chunk ranges in the $lookup stage must be + // compared using the simple collation because collections are always sharded using the simple + // collation. However, resuming by _id is only efficient (i.e. non-blocking seek/sort) when the + // aggregation pipeline would be using the collection's default collation. We cannot do both so + // we choose to disallow automatic resuming for collections with non-simple default collations. + uassert(4929303, + "Cannot resume cloning when sharded collection has non-simple default collation", + resumeId.missing() || collectionHasSimpleCollation(opCtx, _sourceNss)); + auto expCtx = make_intrusive<ExpressionContext>(opCtx, boost::none, /* explain */ false, /* fromMongos */ @@ -107,6 +138,14 @@ std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::makePipel Pipeline::SourceContainer stages; + if (!resumeId.missing()) { + stages.emplace_back(DocumentSourceMatch::create( + Doc{{"$expr", + Doc{{"$gte", Arr{V{"$_id"_sd}, V{Doc{{"$literal", std::move(resumeId)}}}}}}}} + .toBson(), + expCtx)); + } + stages.emplace_back(DocumentSourceReplaceRoot::createFromBson( fromjson("{$replaceWith: {original: '$$ROOT'}}").firstElement(), expCtx)); @@ -154,16 +193,62 @@ std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::makePipel stages.emplace_back( DocumentSourceMatch::create(fromjson("{intersectingChunk: {$ne: []}}"), expCtx)); - stages.emplace_back(DocumentSourceReplaceRoot::createFromBson( - fromjson("{$replaceWith: '$original'}").firstElement(), expCtx)); - return Pipeline::create(std::move(stages), expCtx); + + // We use $arrayToObject to synthesize the $sortKeys needed by the AsyncResultsMerger to merge + // the results from all of the donor shards by {_id: 1}. This expression wouldn't be correct if + // the aggregation pipeline was using a non-"simple" collation. + stages.emplace_back( + DocumentSourceReplaceRoot::createFromBson(fromjson("{$replaceWith: {$mergeObjects: [\ + '$original',\ + {$arrayToObject: {$concatArrays: [[{\ + k: {$literal: '$sortKey'},\ + v: ['$original._id']\ + }]]}}\ + ]}}") + .firstElement(), + expCtx)); + + return Pipeline::create(std::move(stages), std::move(expCtx)); +} + +Value ReshardingCollectionCloner::_findHighestInsertedId(OperationContext* opCtx) { + AutoGetCollection outputColl(opCtx, _outputNss, MODE_IS); + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << "Resharding collection cloner's output collection '" << _outputNss + << "' did not already exist", + outputColl); + + auto qr = std::make_unique<QueryRequest>(_outputNss); + qr->setLimit(1); + qr->setSort(BSON("_id" << -1)); + + auto recordId = Helpers::findOne(opCtx, *outputColl, std::move(qr), true /* requireIndex */); + if (!recordId.isNormal()) { + return Value{}; + } + + auto doc = outputColl->docFor(opCtx, recordId).value(); + auto value = Value{doc["_id"]}; + uassert(4929300, + "Missing _id field for document in temporary resharding collection", + !value.missing()); + + return value; } std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::_targetAggregationRequest( OperationContext* opCtx, const Pipeline& pipeline) { AggregateCommand request(_sourceNss, pipeline.serializeToBson()); request.setCollectionUUID(_sourceUUID); - request.setHint(BSON("_id" << 1)); + + auto hint = collectionHasSimpleCollation(opCtx, _sourceNss) + ? boost::optional<BSONObj>{BSON("_id" << 1)} + : boost::none; + + if (hint) { + request.setHint(*hint); + } + request.setReadConcern(BSON(repl::ReadConcernArgs::kLevelFieldName << repl::readConcernLevels::kSnapshotName << repl::ReadConcernArgs::kAtClusterTimeFieldName @@ -175,8 +260,11 @@ std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::_targetAg _sourceNss, "targeting donor shards for resharding collection cloning"_sd, [&] { + // We use the hint as an implied sort for $mergeCursors because + // the aggregation pipeline synthesizes the necessary $sortKeys + // fields in the result set. return sharded_agg_helpers::targetShardsAndAddMergeCursors( - pipeline.getContext(), request); + pipeline.getContext(), request, hint); }); } @@ -276,8 +364,36 @@ ExecutorFuture<void> ReshardingCollectionCloner::run( return AsyncTry([this, chainCtx] { if (!chainCtx->pipeline) { chainCtx->pipeline = _withTemporaryOperationContext([&](auto* opCtx) { + auto idToResumeFrom = _findHighestInsertedId(opCtx); auto pipeline = _targetAggregationRequest( - opCtx, *makePipeline(opCtx, MongoProcessInterface::create(opCtx))); + opCtx, + *makePipeline( + opCtx, MongoProcessInterface::create(opCtx), idToResumeFrom)); + + if (!idToResumeFrom.missing()) { + // Skip inserting the first document retrieved after resuming because + // $gte was used in the aggregation pipeline. + auto firstDoc = pipeline->getNext(); + uassert(4929301, + str::stream() + << "Expected pipeline to retrieve document with _id: " + << redact(idToResumeFrom.toString()), + firstDoc); + + // Note that the following uassert() could throw because we're using the + // simple string comparator and the collection could have a non-simple + // collation. However, it would still be correct to throw an exception + // because it would mean the collection being resharded contains multiple + // documents with the same _id value as far as global uniqueness is + // concerned. + const auto& firstId = (*firstDoc)["_id"]; + uassert(4929302, + str::stream() + << "Expected pipeline to retrieve document with _id: " + << redact(idToResumeFrom.toString()) + << ", but got _id: " << redact(firstId.toString()), + ValueComparator::kInstance.evaluate(firstId == idToResumeFrom)); + } pipeline->detachFromOperationContext(); pipeline.get_deleter().dismissDisposal(); diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.h b/src/mongo/db/s/resharding/resharding_collection_cloner.h index 74710fe63bb..d40209ccf03 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner.h +++ b/src/mongo/db/s/resharding/resharding_collection_cloner.h @@ -34,6 +34,7 @@ #include "mongo/bson/timestamp.h" #include "mongo/db/catalog/collection_catalog.h" +#include "mongo/db/exec/document_value/value.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/pipeline/pipeline.h" @@ -62,12 +63,16 @@ public: NamespaceString outputNss); std::unique_ptr<Pipeline, PipelineDeleter> makePipeline( - OperationContext* opCtx, std::shared_ptr<MongoProcessInterface> mongoProcessInterface); + OperationContext* opCtx, + std::shared_ptr<MongoProcessInterface> mongoProcessInterface, + Value resumeId = Value()); ExecutorFuture<void> run(std::shared_ptr<executor::TaskExecutor> executor, CancelationToken cancelToken); private: + Value _findHighestInsertedId(OperationContext* opCtx); + std::unique_ptr<Pipeline, PipelineDeleter> _targetAggregationRequest(OperationContext* opCtx, const Pipeline& pipeline); diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp index eefefd0fa11..5116cb46583 100644 --- a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp +++ b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp @@ -124,11 +124,13 @@ TEST_F(ReshardingCollectionClonerTest, MinKeyChunk) { auto next = pipeline->getNext(); ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 1 << "x" << MINKEY), next->toBson()); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 1 << "x" << MINKEY << "$sortKey" << BSON_ARRAY(1)), + next->toBson()); next = pipeline->getNext(); ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 2 << "x" << -0.001), next->toBson()); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 2 << "x" << -0.001 << "$sortKey" << BSON_ARRAY(2)), + next->toBson()); ASSERT_FALSE(pipeline->getNext()); } @@ -148,19 +150,23 @@ TEST_F(ReshardingCollectionClonerTest, MaxKeyChunk) { auto next = pipeline->getNext(); ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 3 << "x" << 0LL), next->toBson()); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 3 << "x" << 0LL << "$sortKey" << BSON_ARRAY(3)), + next->toBson()); next = pipeline->getNext(); ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0.0), next->toBson()); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0.0 << "$sortKey" << BSON_ARRAY(4)), + next->toBson()); next = pipeline->getNext(); ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0.001), next->toBson()); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0.001 << "$sortKey" << BSON_ARRAY(5)), + next->toBson()); next = pipeline->getNext(); ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 6 << "x" << MAXKEY), next->toBson()); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 6 << "x" << MAXKEY << "$sortKey" << BSON_ARRAY(6)), + next->toBson()); ASSERT_FALSE(pipeline->getNext()); } @@ -193,19 +199,23 @@ TEST_F(ReshardingCollectionClonerTest, HashedShardKey) { auto next = pipeline->getNext(); ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 3 << "x" << -0.123), next->toBson()); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 3 << "x" << -0.123 << "$sortKey" << BSON_ARRAY(3)), + next->toBson()); next = pipeline->getNext(); ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0), next->toBson()); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0 << "$sortKey" << BSON_ARRAY(4)), + next->toBson()); next = pipeline->getNext(); ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0LL), next->toBson()); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0LL << "$sortKey" << BSON_ARRAY(5)), + next->toBson()); next = pipeline->getNext(); ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 6 << "x" << 0.123), next->toBson()); + ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 6 << "x" << 0.123 << "$sortKey" << BSON_ARRAY(6)), + next->toBson()); ASSERT_FALSE(pipeline->getNext()); } @@ -238,7 +248,8 @@ TEST_F(ReshardingCollectionClonerTest, CompoundHashedShardKey) { auto next = pipeline->getNext(); ASSERT(next); - ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0 << "y" << 0), next->toBson()); + ASSERT_BSONOBJ_BINARY_EQ( + BSON("_id" << 4 << "x" << 0 << "y" << 0 << "$sortKey" << BSON_ARRAY(4)), next->toBson()); ASSERT_FALSE(pipeline->getNext()); } |