summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2021-01-08 14:49:27 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-01-08 15:32:55 +0000
commit0416c69269754a28d98c6288b09738d98c8bdfa2 (patch)
treed1be9212a0eb6fca35231407d6299a2538096ef2 /src
parent6221e2ee0ff5527f13d25a28a5de6a245bd0e641 (diff)
downloadmongo-0416c69269754a28d98c6288b09738d98c8bdfa2.tar.gz
SERVER-49293 Make resharding collection cloning resume from highest _id.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.cpp9
-rw-r--r--src/mongo/db/pipeline/sharded_agg_helpers.h3
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner.cpp130
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner.h7
-rw-r--r--src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp33
5 files changed, 159 insertions, 23 deletions
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.cpp b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
index 039288dc823..8a0fbc10998 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.cpp
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.cpp
@@ -600,7 +600,8 @@ void abandonCacheIfSentToShards(Pipeline* shardsPipeline) {
std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- stdx::variant<std::unique_ptr<Pipeline, PipelineDeleter>, AggregateCommand> targetRequest) {
+ stdx::variant<std::unique_ptr<Pipeline, PipelineDeleter>, AggregateCommand> targetRequest,
+ boost::optional<BSONObj> shardCursorsSortSpec) {
auto&& [aggRequest, pipeline] = [&] {
return stdx::visit(
visit_helper::Overloaded{
@@ -642,10 +643,12 @@ std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
}
std::unique_ptr<Pipeline, PipelineDeleter> mergePipeline;
- boost::optional<BSONObj> shardCursorsSortSpec = boost::none;
if (shardDispatchResults.splitPipeline) {
mergePipeline = std::move(shardDispatchResults.splitPipeline->mergePipeline);
- shardCursorsSortSpec = shardDispatchResults.splitPipeline->shardCursorsSortSpec;
+ if (shardDispatchResults.splitPipeline->shardCursorsSortSpec) {
+ uassert(4929304, "Split pipeline provides its own sort already", !shardCursorsSortSpec);
+ shardCursorsSortSpec = shardDispatchResults.splitPipeline->shardCursorsSortSpec;
+ }
} else {
// We have not split the pipeline, and will execute entirely on the remote shards. Set up an
// empty local pipeline which we will attach the merge cursors stage to.
diff --git a/src/mongo/db/pipeline/sharded_agg_helpers.h b/src/mongo/db/pipeline/sharded_agg_helpers.h
index b2be0661aab..2f8831f8bfa 100644
--- a/src/mongo/db/pipeline/sharded_agg_helpers.h
+++ b/src/mongo/db/pipeline/sharded_agg_helpers.h
@@ -204,7 +204,8 @@ std::unique_ptr<Pipeline, PipelineDeleter> attachCursorToPipeline(Pipeline* owne
*/
std::unique_ptr<Pipeline, PipelineDeleter> targetShardsAndAddMergeCursors(
const boost::intrusive_ptr<ExpressionContext>& expCtx,
- stdx::variant<std::unique_ptr<Pipeline, PipelineDeleter>, AggregateCommand> targetRequest);
+ stdx::variant<std::unique_ptr<Pipeline, PipelineDeleter>, AggregateCommand> targetRequest,
+ boost::optional<BSONObj> shardCursorsSortSpec = boost::none);
/**
* For a sharded or unsharded collection, establishes a remote cursor on only the specified shard,
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
index 5819b96c22f..67e4f844de7 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/client.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
+#include "mongo/db/dbhelpers.h"
#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/exec/document_value/value.h"
#include "mongo/db/pipeline/aggregation_request_helper.h"
@@ -48,6 +49,7 @@
#include "mongo/db/pipeline/document_source_match.h"
#include "mongo/db/pipeline/document_source_replace_root.h"
#include "mongo/db/pipeline/sharded_agg_helpers.h"
+#include "mongo/db/query/query_request.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/s/resharding_util.h"
#include "mongo/db/service_context.h"
@@ -60,6 +62,20 @@
#include "mongo/util/str.h"
namespace mongo {
+namespace {
+
+bool collectionHasSimpleCollation(OperationContext* opCtx, const NamespaceString& nss) {
+ auto catalogCache = Grid::get(opCtx)->catalogCache();
+ auto sourceChunkMgr = uassertStatusOK(catalogCache->getCollectionRoutingInfo(opCtx, nss));
+
+ uassert(ErrorCodes::NamespaceNotSharded,
+ str::stream() << "Expected collection " << nss << " to be sharded",
+ sourceChunkMgr.isSharded());
+
+ return !sourceChunkMgr.getDefaultCollator();
+}
+
+} // namespace
ReshardingCollectionCloner::ReshardingCollectionCloner(ShardKeyPattern newShardKeyPattern,
NamespaceString sourceNss,
@@ -75,7 +91,9 @@ ReshardingCollectionCloner::ReshardingCollectionCloner(ShardKeyPattern newShardK
_outputNss(std::move(outputNss)) {}
std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::makePipeline(
- OperationContext* opCtx, std::shared_ptr<MongoProcessInterface> mongoProcessInterface) {
+ OperationContext* opCtx,
+ std::shared_ptr<MongoProcessInterface> mongoProcessInterface,
+ Value resumeId) {
using Doc = Document;
using Arr = std::vector<Value>;
using V = Value;
@@ -91,6 +109,19 @@ std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::makePipel
NamespaceString(NamespaceString::kConfigDb, "cache.chunks." + tempNss.ns());
resolvedNamespaces[tempCacheChunksNss.coll()] = {tempCacheChunksNss, std::vector<BSONObj>{}};
+ // sharded_agg_helpers::targetShardsAndAddMergeCursors() ignores the collation set on the
+ // AggregationRequest (or lack thereof) and instead only considers the collator set on the
+ // ExpressionContext. Setting nullptr as the collator on the ExpressionContext means that the
+ // aggregation pipeline is always using the "simple" collation, even when the collection default
+ // collation for _sourceNss is non-simple. The chunk ranges in the $lookup stage must be
+ // compared using the simple collation because collections are always sharded using the simple
+ // collation. However, resuming by _id is only efficient (i.e. non-blocking seek/sort) when the
+ // aggregation pipeline would be using the collection's default collation. We cannot do both so
+ // we choose to disallow automatic resuming for collections with non-simple default collations.
+ uassert(4929303,
+ "Cannot resume cloning when sharded collection has non-simple default collation",
+ resumeId.missing() || collectionHasSimpleCollation(opCtx, _sourceNss));
+
auto expCtx = make_intrusive<ExpressionContext>(opCtx,
boost::none, /* explain */
false, /* fromMongos */
@@ -107,6 +138,14 @@ std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::makePipel
Pipeline::SourceContainer stages;
+ if (!resumeId.missing()) {
+ stages.emplace_back(DocumentSourceMatch::create(
+ Doc{{"$expr",
+ Doc{{"$gte", Arr{V{"$_id"_sd}, V{Doc{{"$literal", std::move(resumeId)}}}}}}}}
+ .toBson(),
+ expCtx));
+ }
+
stages.emplace_back(DocumentSourceReplaceRoot::createFromBson(
fromjson("{$replaceWith: {original: '$$ROOT'}}").firstElement(), expCtx));
@@ -154,16 +193,62 @@ std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::makePipel
stages.emplace_back(
DocumentSourceMatch::create(fromjson("{intersectingChunk: {$ne: []}}"), expCtx));
- stages.emplace_back(DocumentSourceReplaceRoot::createFromBson(
- fromjson("{$replaceWith: '$original'}").firstElement(), expCtx));
- return Pipeline::create(std::move(stages), expCtx);
+
+ // We use $arrayToObject to synthesize the $sortKeys needed by the AsyncResultsMerger to merge
+ // the results from all of the donor shards by {_id: 1}. This expression wouldn't be correct if
+ // the aggregation pipeline was using a non-"simple" collation.
+ stages.emplace_back(
+ DocumentSourceReplaceRoot::createFromBson(fromjson("{$replaceWith: {$mergeObjects: [\
+ '$original',\
+ {$arrayToObject: {$concatArrays: [[{\
+ k: {$literal: '$sortKey'},\
+ v: ['$original._id']\
+ }]]}}\
+ ]}}")
+ .firstElement(),
+ expCtx));
+
+ return Pipeline::create(std::move(stages), std::move(expCtx));
+}
+
+Value ReshardingCollectionCloner::_findHighestInsertedId(OperationContext* opCtx) {
+ AutoGetCollection outputColl(opCtx, _outputNss, MODE_IS);
+ uassert(ErrorCodes::NamespaceNotFound,
+ str::stream() << "Resharding collection cloner's output collection '" << _outputNss
+ << "' did not already exist",
+ outputColl);
+
+ auto qr = std::make_unique<QueryRequest>(_outputNss);
+ qr->setLimit(1);
+ qr->setSort(BSON("_id" << -1));
+
+ auto recordId = Helpers::findOne(opCtx, *outputColl, std::move(qr), true /* requireIndex */);
+ if (!recordId.isNormal()) {
+ return Value{};
+ }
+
+ auto doc = outputColl->docFor(opCtx, recordId).value();
+ auto value = Value{doc["_id"]};
+ uassert(4929300,
+ "Missing _id field for document in temporary resharding collection",
+ !value.missing());
+
+ return value;
}
std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::_targetAggregationRequest(
OperationContext* opCtx, const Pipeline& pipeline) {
AggregateCommand request(_sourceNss, pipeline.serializeToBson());
request.setCollectionUUID(_sourceUUID);
- request.setHint(BSON("_id" << 1));
+
+ auto hint = collectionHasSimpleCollation(opCtx, _sourceNss)
+ ? boost::optional<BSONObj>{BSON("_id" << 1)}
+ : boost::none;
+
+ if (hint) {
+ request.setHint(*hint);
+ }
+
request.setReadConcern(BSON(repl::ReadConcernArgs::kLevelFieldName
<< repl::readConcernLevels::kSnapshotName
<< repl::ReadConcernArgs::kAtClusterTimeFieldName
@@ -175,8 +260,11 @@ std::unique_ptr<Pipeline, PipelineDeleter> ReshardingCollectionCloner::_targetAg
_sourceNss,
"targeting donor shards for resharding collection cloning"_sd,
[&] {
+ // We use the hint as an implied sort for $mergeCursors because
+ // the aggregation pipeline synthesizes the necessary $sortKeys
+ // fields in the result set.
return sharded_agg_helpers::targetShardsAndAddMergeCursors(
- pipeline.getContext(), request);
+ pipeline.getContext(), request, hint);
});
}
@@ -276,8 +364,36 @@ ExecutorFuture<void> ReshardingCollectionCloner::run(
return AsyncTry([this, chainCtx] {
if (!chainCtx->pipeline) {
chainCtx->pipeline = _withTemporaryOperationContext([&](auto* opCtx) {
+ auto idToResumeFrom = _findHighestInsertedId(opCtx);
auto pipeline = _targetAggregationRequest(
- opCtx, *makePipeline(opCtx, MongoProcessInterface::create(opCtx)));
+ opCtx,
+ *makePipeline(
+ opCtx, MongoProcessInterface::create(opCtx), idToResumeFrom));
+
+ if (!idToResumeFrom.missing()) {
+ // Skip inserting the first document retrieved after resuming because
+ // $gte was used in the aggregation pipeline.
+ auto firstDoc = pipeline->getNext();
+ uassert(4929301,
+ str::stream()
+ << "Expected pipeline to retrieve document with _id: "
+ << redact(idToResumeFrom.toString()),
+ firstDoc);
+
+ // Note that the following uassert() could throw because we're using the
+ // simple string comparator and the collection could have a non-simple
+ // collation. However, it would still be correct to throw an exception
+ // because it would mean the collection being resharded contains multiple
+ // documents with the same _id value as far as global uniqueness is
+ // concerned.
+ const auto& firstId = (*firstDoc)["_id"];
+ uassert(4929302,
+ str::stream()
+ << "Expected pipeline to retrieve document with _id: "
+ << redact(idToResumeFrom.toString())
+ << ", but got _id: " << redact(firstId.toString()),
+ ValueComparator::kInstance.evaluate(firstId == idToResumeFrom));
+ }
pipeline->detachFromOperationContext();
pipeline.get_deleter().dismissDisposal();
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner.h b/src/mongo/db/s/resharding/resharding_collection_cloner.h
index 74710fe63bb..d40209ccf03 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner.h
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner.h
@@ -34,6 +34,7 @@
#include "mongo/bson/timestamp.h"
#include "mongo/db/catalog/collection_catalog.h"
+#include "mongo/db/exec/document_value/value.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/pipeline/pipeline.h"
@@ -62,12 +63,16 @@ public:
NamespaceString outputNss);
std::unique_ptr<Pipeline, PipelineDeleter> makePipeline(
- OperationContext* opCtx, std::shared_ptr<MongoProcessInterface> mongoProcessInterface);
+ OperationContext* opCtx,
+ std::shared_ptr<MongoProcessInterface> mongoProcessInterface,
+ Value resumeId = Value());
ExecutorFuture<void> run(std::shared_ptr<executor::TaskExecutor> executor,
CancelationToken cancelToken);
private:
+ Value _findHighestInsertedId(OperationContext* opCtx);
+
std::unique_ptr<Pipeline, PipelineDeleter> _targetAggregationRequest(OperationContext* opCtx,
const Pipeline& pipeline);
diff --git a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp
index eefefd0fa11..5116cb46583 100644
--- a/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_collection_cloner_test.cpp
@@ -124,11 +124,13 @@ TEST_F(ReshardingCollectionClonerTest, MinKeyChunk) {
auto next = pipeline->getNext();
ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 1 << "x" << MINKEY), next->toBson());
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 1 << "x" << MINKEY << "$sortKey" << BSON_ARRAY(1)),
+ next->toBson());
next = pipeline->getNext();
ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 2 << "x" << -0.001), next->toBson());
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 2 << "x" << -0.001 << "$sortKey" << BSON_ARRAY(2)),
+ next->toBson());
ASSERT_FALSE(pipeline->getNext());
}
@@ -148,19 +150,23 @@ TEST_F(ReshardingCollectionClonerTest, MaxKeyChunk) {
auto next = pipeline->getNext();
ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 3 << "x" << 0LL), next->toBson());
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 3 << "x" << 0LL << "$sortKey" << BSON_ARRAY(3)),
+ next->toBson());
next = pipeline->getNext();
ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0.0), next->toBson());
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0.0 << "$sortKey" << BSON_ARRAY(4)),
+ next->toBson());
next = pipeline->getNext();
ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0.001), next->toBson());
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0.001 << "$sortKey" << BSON_ARRAY(5)),
+ next->toBson());
next = pipeline->getNext();
ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 6 << "x" << MAXKEY), next->toBson());
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 6 << "x" << MAXKEY << "$sortKey" << BSON_ARRAY(6)),
+ next->toBson());
ASSERT_FALSE(pipeline->getNext());
}
@@ -193,19 +199,23 @@ TEST_F(ReshardingCollectionClonerTest, HashedShardKey) {
auto next = pipeline->getNext();
ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 3 << "x" << -0.123), next->toBson());
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 3 << "x" << -0.123 << "$sortKey" << BSON_ARRAY(3)),
+ next->toBson());
next = pipeline->getNext();
ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0), next->toBson());
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0 << "$sortKey" << BSON_ARRAY(4)),
+ next->toBson());
next = pipeline->getNext();
ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0LL), next->toBson());
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 5 << "x" << 0LL << "$sortKey" << BSON_ARRAY(5)),
+ next->toBson());
next = pipeline->getNext();
ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 6 << "x" << 0.123), next->toBson());
+ ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 6 << "x" << 0.123 << "$sortKey" << BSON_ARRAY(6)),
+ next->toBson());
ASSERT_FALSE(pipeline->getNext());
}
@@ -238,7 +248,8 @@ TEST_F(ReshardingCollectionClonerTest, CompoundHashedShardKey) {
auto next = pipeline->getNext();
ASSERT(next);
- ASSERT_BSONOBJ_BINARY_EQ(BSON("_id" << 4 << "x" << 0 << "y" << 0), next->toBson());
+ ASSERT_BSONOBJ_BINARY_EQ(
+ BSON("_id" << 4 << "x" << 0 << "y" << 0 << "$sortKey" << BSON_ARRAY(4)), next->toBson());
ASSERT_FALSE(pipeline->getNext());
}