diff options
Diffstat (limited to 'src/mongo/db/pipeline/document_source_merge_cursors_test.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_merge_cursors_test.cpp | 208 |
1 files changed, 60 insertions, 148 deletions
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp index 3614324d27d..4809ac54b50 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp +++ b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp @@ -111,7 +111,9 @@ private: }; TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectNonArray) { - auto spec = BSON("$mergeCursors" << 2); + auto spec = BSON("$mergeCursors" << BSON( + "cursors" << BSON_ARRAY(BSON("ns" << kTestNss.ns() << "id" << 0LL << "host" + << kTestHost.toString())))); ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()), AssertionException, 17026); @@ -211,66 +213,16 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToParseTheSerializedVersionOf ASSERT(DocumentSourceMergeCursors::createFromBson(newSpec.firstElement(), getExpCtx())); } -RemoteCursor makeRemoteCursor(ShardId shardId, HostAndPort host, CursorResponse response) { - RemoteCursor remoteCursor; - remoteCursor.setShardId(std::move(shardId)); - remoteCursor.setHostAndPort(std::move(host)); - remoteCursor.setCursorResponse(std::move(response)); - return remoteCursor; -} - -TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToParseSerializedARMParams) { - AsyncResultsMergerParams params; - params.setSort(BSON("y" << 1 << "z" << 1)); - params.setNss(kTestNss); - std::vector<RemoteCursor> cursors; - cursors.emplace_back(makeRemoteCursor( - kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, kExhaustedCursorID, {}))); - params.setRemotes(std::move(cursors)); - auto spec = BSON("$mergeCursors" << params.toBSON()); - auto mergeCursors = - DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()); - std::vector<Value> serializationArray; - mergeCursors->serializeToArray(serializationArray); - ASSERT_EQ(serializationArray.size(), 1UL); - - // Make sure the serialized version can be parsed into an identical AsyncResultsMergerParams. - auto newSpec = serializationArray[0].getDocument().toBson(); - ASSERT(newSpec["$mergeCursors"].type() == BSONType::Object); - auto newParams = AsyncResultsMergerParams::parse(IDLParserErrorContext("$mergeCursors test"), - newSpec["$mergeCursors"].Obj()); - ASSERT_TRUE(params.getSort()); - ASSERT_BSONOBJ_EQ(*params.getSort(), *newParams.getSort()); - ASSERT_EQ(params.getCompareWholeSortKey(), newParams.getCompareWholeSortKey()); - ASSERT(params.getTailableMode() == newParams.getTailableMode()); - ASSERT(params.getBatchSize() == newParams.getBatchSize()); - ASSERT_EQ(params.getNss(), newParams.getNss()); - ASSERT_EQ(params.getAllowPartialResults(), newParams.getAllowPartialResults()); - ASSERT_EQ(newParams.getRemotes().size(), 1UL); - ASSERT(newParams.getRemotes()[0].getShardId() == kTestShardIds[0].toString()); - ASSERT(newParams.getRemotes()[0].getHostAndPort() == kTestShardHosts[0]); - ASSERT_EQ(newParams.getRemotes()[0].getCursorResponse().getNSS(), kTestNss); - ASSERT_EQ(newParams.getRemotes()[0].getCursorResponse().getCursorId(), kExhaustedCursorID); - ASSERT(newParams.getRemotes()[0].getCursorResponse().getBatch().empty()); - - // Test that the $mergeCursors stage will accept the serialized format of - // AsyncResultsMergerParams. - ASSERT(DocumentSourceMergeCursors::createFromBson(newSpec.firstElement(), getExpCtx())); -} - TEST_F(DocumentSourceMergeCursorsTest, ShouldReportEOFWithNoCursors) { auto expCtx = getExpCtx(); - AsyncResultsMergerParams armParams; - armParams.setNss(kTestNss); - std::vector<RemoteCursor> cursors; - cursors.emplace_back(makeRemoteCursor( - kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, kExhaustedCursorID, {}))); - cursors.emplace_back(makeRemoteCursor( - kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, kExhaustedCursorID, {}))); - armParams.setRemotes(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back( + kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, kExhaustedCursorID, {})); + cursors.emplace_back( + kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, kExhaustedCursorID, {})); auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); auto mergeCursorsStage = - DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx); + DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx); ASSERT_TRUE(mergeCursorsStage->getNext().isEOF()); } @@ -284,17 +236,12 @@ BSONObj cursorResponseObj(const NamespaceString& nss, TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToIterateCursorsUntilEOF) { auto expCtx = getExpCtx(); - AsyncResultsMergerParams armParams; - armParams.setNss(kTestNss); - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}))); - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}))); - armParams.setRemotes(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})); auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); pipeline->addInitialSource( - DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); + DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx)); // Iterate the $mergeCursors stage asynchronously on a different thread, since it will block // waiting for network responses, which we will manually schedule below. @@ -333,17 +280,12 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToIterateCursorsUntilEOF) { TEST_F(DocumentSourceMergeCursorsTest, ShouldNotKillCursorsIfNeverIterated) { auto expCtx = getExpCtx(); - AsyncResultsMergerParams armParams; - armParams.setNss(kTestNss); - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}))); - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}))); - armParams.setRemotes(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})); auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); pipeline->addInitialSource( - DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); + DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx)); pipeline.reset(); // Delete the pipeline before using it. @@ -353,15 +295,11 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldNotKillCursorsIfNeverIterated) { TEST_F(DocumentSourceMergeCursorsTest, ShouldKillCursorIfPartiallyIterated) { auto expCtx = getExpCtx(); - AsyncResultsMergerParams armParams; - armParams.setNss(kTestNss); - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}))); - armParams.setRemotes(std::move(cursors)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); pipeline->addInitialSource( - DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); + DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx)); // Iterate the pipeline asynchronously on a different thread, since it will block waiting for // network responses, which we will manually schedule below. @@ -406,16 +344,12 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldOptimizeWithASortToEnsureCorrectOrd auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx)); // Make a $mergeCursors stage and add it to the front of the pipeline. - AsyncResultsMergerParams armParams; - armParams.setNss(kTestNss); - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}))); - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}))); - armParams.setRemotes(std::move(cursors)); - pipeline->addInitialSource( - DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})); + auto mergeCursorsStage = + DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx); + pipeline->addInitialSource(std::move(mergeCursorsStage)); // After optimization we should only have a $mergeCursors stage. pipeline->optimizePipeline(); @@ -430,7 +364,6 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldOptimizeWithASortToEnsureCorrectOrd ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 3}})); ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 4}})); ASSERT_FALSE(static_cast<bool>(pipeline->getNext())); - std::cout << "Finished"; }); onCommand([&](const auto& request) { @@ -449,35 +382,36 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldOptimizeWithASortToEnsureCorrectOrd future.timed_get(kFutureTimeout); } -TEST_F(DocumentSourceMergeCursorsTest, ShouldEnforceSortSpecifiedViaARMParams) { +TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLeadingSort) { auto expCtx = getExpCtx(); - auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx)); - // Make a $mergeCursors stage with a sort on "x" and add it to the front of the pipeline. - AsyncResultsMergerParams armParams; - armParams.setNss(kTestNss); - armParams.setSort(BSON("x" << 1)); - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}))); - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}))); - armParams.setRemotes(std::move(cursors)); - pipeline->addInitialSource( - DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); + // Make a pipeline with a single $sort stage that is merging pre-sorted results. + const bool mergingPresorted = true; + const long long limit = 3; + auto sortStage = DocumentSourceSort::create( + expCtx, BSON("x" << 1), limit, DocumentSourceSort::kMaxMemoryUsageBytes, mergingPresorted); + auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx)); - // After optimization we should only have a $mergeCursors stage. + // Make a $mergeCursors stage and add it to the front of the pipeline. + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})); + auto mergeCursorsStage = + DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx); + pipeline->addInitialSource(std::move(mergeCursorsStage)); + + // After optimization, we should still have a $limit stage. pipeline->optimizePipeline(); - ASSERT_EQ(pipeline->getSources().size(), 1UL); + ASSERT_EQ(pipeline->getSources().size(), 2UL); ASSERT_TRUE(dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); + ASSERT_TRUE(dynamic_cast<DocumentSourceLimit*>(pipeline->getSources().back().get())); // Iterate the pipeline asynchronously on a different thread, since it will block waiting for // network responses, which we will manually schedule below. - auto future = launchAsync([&pipeline]() { - ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 1}})); - ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 2}})); - ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 3}})); - ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 4}})); + auto future = launchAsync([&]() { + for (int i = 1; i <= limit; ++i) { + ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", i}})); + } ASSERT_FALSE(static_cast<bool>(pipeline->getNext())); }); @@ -497,7 +431,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldEnforceSortSpecifiedViaARMParams) { future.timed_get(kFutureTimeout); } -TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLeadingSort) { +TEST_F(DocumentSourceMergeCursorsTest, ShouldSerializeSortIfAbsorbedViaOptimize) { auto expCtx = getExpCtx(); // Make a pipeline with a single $sort stage that is merging pre-sorted results. @@ -508,16 +442,12 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLea auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx)); // Make a $mergeCursors stage and add it to the front of the pipeline. - AsyncResultsMergerParams armParams; - armParams.setNss(kTestNss); - std::vector<RemoteCursor> cursors; - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}))); - cursors.emplace_back( - makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}))); - armParams.setRemotes(std::move(cursors)); - pipeline->addInitialSource( - DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx)); + std::vector<ClusterClientCursorParams::RemoteCursor> cursors; + cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})); + cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})); + auto mergeCursorsStage = + DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx); + pipeline->addInitialSource(std::move(mergeCursorsStage)); // After optimization, we should still have a $limit stage. pipeline->optimizePipeline(); @@ -525,29 +455,11 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLea ASSERT_TRUE(dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get())); ASSERT_TRUE(dynamic_cast<DocumentSourceLimit*>(pipeline->getSources().back().get())); - // Iterate the pipeline asynchronously on a different thread, since it will block waiting for - // network responses, which we will manually schedule below. - auto future = launchAsync([&]() { - for (int i = 1; i <= limit; ++i) { - ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", i}})); - } - ASSERT_FALSE(static_cast<bool>(pipeline->getNext())); - }); - - onCommand([&](const auto& request) { - return cursorResponseObj(expCtx->ns, - kExhaustedCursorID, - {BSON("x" << 1 << "$sortKey" << BSON("" << 1)), - BSON("x" << 3 << "$sortKey" << BSON("" << 3))}); - }); - onCommand([&](const auto& request) { - return cursorResponseObj(expCtx->ns, - kExhaustedCursorID, - {BSON("x" << 2 << "$sortKey" << BSON("" << 2)), - BSON("x" << 4 << "$sortKey" << BSON("" << 4))}); - }); - - future.timed_get(kFutureTimeout); + auto serialized = pipeline->serialize(); + ASSERT_EQ(serialized.size(), 3UL); + ASSERT_FALSE(serialized[0]["$mergeCursors"].missing()); + ASSERT_FALSE(serialized[1]["$sort"].missing()); + ASSERT_FALSE(serialized[2]["$limit"].missing()); } } // namespace |