summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2018-02-14 15:48:53 -0500
committerCharlie Swanson <charlie.swanson@mongodb.com>2018-03-06 12:18:27 -0500
commita7106b407cecdcfa8ba6c8765c9874bce65a6d5a (patch)
treeae17d3d93617f04af362627bd656ecd82e67840a /src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
parent29be917dd176b883880198347d872fa10d1ad701 (diff)
downloadmongo-a7106b407cecdcfa8ba6c8765c9874bce65a6d5a.tar.gz
SERVER-24978 Use AsyncResultsMerger in $mergeCursors
Diffstat (limited to 'src/mongo/db/pipeline/document_source_merge_cursors_test.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors_test.cpp488
1 files changed, 488 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
new file mode 100644
index 00000000000..8007aea0db0
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
@@ -0,0 +1,488 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/document_source_merge_cursors.h"
+
+#include "mongo/client/remote_command_targeter_factory_mock.h"
+#include "mongo/client/remote_command_targeter_mock.h"
+#include "mongo/db/json.h"
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document_source_limit.h"
+#include "mongo/db/pipeline/document_source_sort.h"
+#include "mongo/db/pipeline/document_value_test_util.h"
+#include "mongo/db/pipeline/expression_context_for_test.h"
+#include "mongo/db/query/cursor_response.h"
+#include "mongo/db/query/getmore_request.h"
+#include "mongo/db/query/query_request.h"
+#include "mongo/executor/network_interface_mock.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/sharding_test_fixture.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+using executor::NetworkInterfaceMock;
+using executor::RemoteCommandRequest;
+using executor::RemoteCommandResponse;
+
+using ResponseStatus = executor::TaskExecutor::ResponseStatus;
+
+const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345);
+const std::vector<ShardId> kTestShardIds = {
+ ShardId("FakeShard1"), ShardId("FakeShard2"), ShardId("FakeShard3")};
+const std::vector<HostAndPort> kTestShardHosts = {HostAndPort("FakeShard1Host", 12345),
+ HostAndPort("FakeShard2Host", 12345),
+ HostAndPort("FakeShard3Host", 12345)};
+
+const NamespaceString kTestNss = NamespaceString("test.mergeCursors"_sd);
+const HostAndPort kTestHost = HostAndPort("localhost:27017"_sd);
+
+const CursorId kExhaustedCursorID = 0;
+
+class DocumentSourceMergeCursorsTest : public ShardingTestFixture {
+public:
+ DocumentSourceMergeCursorsTest() : _expCtx(new ExpressionContextForTest(kTestNss)) {}
+
+ void setUp() override {
+ ShardingTestFixture::setUp();
+ setRemote(HostAndPort("ClientHost", 12345));
+
+ configTargeter()->setFindHostReturnValue(kTestConfigShardHost);
+
+ std::vector<ShardType> shards;
+ for (size_t i = 0; i < kTestShardIds.size(); i++) {
+ ShardType shardType;
+ shardType.setName(kTestShardIds[i].toString());
+ shardType.setHost(kTestShardHosts[i].toString());
+
+ shards.push_back(shardType);
+
+ std::unique_ptr<RemoteCommandTargeterMock> targeter(
+ stdx::make_unique<RemoteCommandTargeterMock>());
+ targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHosts[i]));
+ targeter->setFindHostReturnValue(kTestShardHosts[i]);
+
+ targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHosts[i]),
+ std::move(targeter));
+ }
+
+ setupShards(shards);
+ }
+
+ boost::intrusive_ptr<ExpressionContextForTest> getExpCtx() {
+ return _expCtx.get();
+ }
+
+private:
+ boost::intrusive_ptr<ExpressionContextForTest> _expCtx;
+};
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectNonArray) {
+ auto spec = BSON("$mergeCursors" << BSON(
+ "cursors" << BSON_ARRAY(BSON("ns" << kTestNss.ns() << "id" << 0LL << "host"
+ << kTestHost.toString()))));
+ ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
+ AssertionException,
+ 17026);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectEmptyArray) {
+ auto spec = BSON("$mergeCursors" << BSONArray());
+ ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
+ AssertionException,
+ 50729);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectCursorWithNoNamespace) {
+ auto spec =
+ BSON("$mergeCursors" << BSON_ARRAY(BSON("id" << 0LL << "host" << kTestHost.toString())));
+ ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
+ AssertionException,
+ 50731);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectCursorWithNonStringNamespace) {
+ auto spec = BSON("$mergeCursors" << BSON_ARRAY(
+ BSON("ns" << 4 << "id" << 0LL << "host" << kTestHost.toString())));
+ ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
+ AssertionException,
+ 50731);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectCursorsWithDifferentNamespaces) {
+ auto spec = BSON(
+ "$mergeCursors"
+ << BSON_ARRAY(BSON("ns" << kTestNss.ns() << "id" << 0LL << "host" << kTestHost.toString())
+ << BSON("ns"
+ << "test.other"_sd
+ << "id"
+ << 0LL
+ << "host"
+ << kTestHost.toString())));
+ ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
+ AssertionException,
+ 50720);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectCursorWithNoHost) {
+ auto spec = BSON("$mergeCursors" << BSON_ARRAY(BSON("ns" << kTestNss.ns() << "id" << 0LL)));
+ ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
+ AssertionException,
+ 50721);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectCursorWithNonStringHost) {
+ auto spec = BSON("$mergeCursors"
+ << BSON_ARRAY(BSON("ns" << kTestNss.ns() << "id" << 0LL << "host" << 4LL)));
+ ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
+ AssertionException,
+ 50721);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectCursorWithNonLongId) {
+ auto spec = BSON("$mergeCursors" << BSON_ARRAY(BSON("ns" << kTestNss.ns() << "id"
+ << "zero"
+ << "host"
+ << kTestHost.toString())));
+ ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
+ AssertionException,
+ 50722);
+ spec = BSON("$mergeCursors" << BSON_ARRAY(
+ BSON("ns" << kTestNss.ns() << "id" << 0 << "host" << kTestHost.toString())));
+ ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
+ AssertionException,
+ 50722);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectCursorWithExtraField) {
+ auto spec =
+ BSON("$mergeCursors" << BSON_ARRAY(BSON(
+ "ns" << kTestNss.ns() << "id" << 0LL << "host" << kTestHost.toString() << "extra"
+ << "unexpected")));
+ ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
+ AssertionException,
+ 50730);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToParseTheSerializedVersionOfItself) {
+ auto spec =
+ BSON("$mergeCursors" << BSON_ARRAY(
+ BSON("ns" << kTestNss.ns() << "id" << 1LL << "host" << kTestHost.toString())
+ << BSON("ns" << kTestNss.ns() << "id" << 2LL << "host" << kTestHost.toString())));
+ auto mergeCursors =
+ DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx());
+ std::vector<Value> serializationArray;
+ mergeCursors->serializeToArray(serializationArray);
+ ASSERT_EQ(serializationArray.size(), 1UL);
+ // The serialized version might not be identical to 'spec', the fields might be in a different
+ // order, etc. Here we just make sure that the final parse doesn't throw.
+ auto newSpec = serializationArray[0].getDocument().toBson();
+ ASSERT(DocumentSourceMergeCursors::createFromBson(newSpec.firstElement(), getExpCtx()));
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldReportEOFWithNoCursors) {
+ auto expCtx = getExpCtx();
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(
+ kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, kExhaustedCursorID, {}));
+ cursors.emplace_back(
+ kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, kExhaustedCursorID, {}));
+ auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
+ auto mergeCursorsStage =
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx);
+
+ ASSERT_TRUE(mergeCursorsStage->getNext().isEOF());
+}
+
+BSONObj cursorResponseObj(const NamespaceString& nss,
+ CursorId cursorId,
+ std::vector<BSONObj> batch) {
+ return CursorResponse{nss, cursorId, std::move(batch)}.toBSON(
+ CursorResponse::ResponseType::SubsequentResponse);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToIterateCursorsUntilEOF) {
+ auto expCtx = getExpCtx();
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
+ auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
+ pipeline->addInitialSource(
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx));
+
+ // Iterate the $mergeCursors stage asynchronously on a different thread, since it will block
+ // waiting for network responses, which we will manually schedule below.
+ auto future = launchAsync([&pipeline]() {
+ for (int i = 0; i < 5; ++i) {
+ ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 1}}));
+ }
+ ASSERT_FALSE(static_cast<bool>(pipeline->getNext()));
+ });
+
+
+ // Schedule responses to two getMores which keep the cursor open.
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return cursorResponseObj(
+ expCtx->ns, request.cmdObj["getMore"].Long(), {BSON("x" << 1), BSON("x" << 1)});
+ });
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return cursorResponseObj(
+ expCtx->ns, request.cmdObj["getMore"].Long(), {BSON("x" << 1), BSON("x" << 1)});
+ });
+
+ // Schedule responses to two getMores which report the cursor is exhausted.
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return cursorResponseObj(expCtx->ns, kExhaustedCursorID, {});
+ });
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return cursorResponseObj(expCtx->ns, kExhaustedCursorID, {BSON("x" << 1)});
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldNotKillCursorsIfNeverIterated) {
+ auto expCtx = getExpCtx();
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
+ auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
+ pipeline->addInitialSource(
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx));
+
+ pipeline.reset(); // Delete the pipeline before using it.
+
+ network()->enterNetwork();
+ ASSERT_FALSE(network()->hasReadyRequests());
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldKillCursorsIfPartiallyIterated) {
+ auto expCtx = getExpCtx();
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
+ auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
+ pipeline->addInitialSource(
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx));
+
+ // Iterate the pipeline asynchronously on a different thread, since it will block waiting for
+ // network responses, which we will manually schedule below.
+ auto future = launchAsync([&pipeline]() {
+ ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 1}}));
+ pipeline.reset(); // Stop iterating and delete the pipeline.
+ });
+
+ // Note we do not use 'kExhaustedCursorID' here, so the cursors are still open.
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return cursorResponseObj(expCtx->ns, 1, {BSON("x" << 1), BSON("x" << 1)});
+ });
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return cursorResponseObj(expCtx->ns, 2, {BSON("x" << 1), BSON("x" << 1)});
+ });
+
+ // Here we're looking for the killCursors requests to be scheduled.
+ std::set<CursorId> killedCursors;
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["killCursors"]);
+ auto cursors = request.cmdObj["cursors"];
+ ASSERT_EQ(cursors.type(), BSONType::Array);
+ auto cursorsArray = cursors.Array();
+ ASSERT_FALSE(cursorsArray.empty());
+ auto cursorId = cursorsArray[0].Long();
+ ASSERT(cursorId == 1 || cursorId == 2);
+ killedCursors.insert(cursorId);
+ // The ARM doesn't actually inspect the response of the killCursors, so we don't have to put
+ // anything except {ok: 1}.
+ return BSON("ok" << 1);
+ });
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["killCursors"]);
+ auto cursors = request.cmdObj["cursors"];
+ ASSERT_EQ(cursors.type(), BSONType::Array);
+ auto cursorsArray = cursors.Array();
+ ASSERT_FALSE(cursorsArray.empty());
+ auto cursorId = cursorsArray[0].Long();
+ ASSERT(cursorId == 1 || cursorId == 2);
+ killedCursors.insert(cursorId);
+ // The ARM doesn't actually inspect the response of the killCursors, so we don't have to put
+ // anything except {ok: 1}.
+ return BSON("ok" << 1);
+ });
+
+ future.timed_get(kFutureTimeout);
+ ASSERT_EQ(killedCursors.size(), 2UL);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldOptimizeWithASortToEnsureCorrectOrder) {
+ auto expCtx = getExpCtx();
+
+ // Make a pipeline with a single $sort stage that is merging pre-sorted results.
+ const bool mergingPresorted = true;
+ const long long noLimit = -1;
+ auto sortStage = DocumentSourceSort::create(expCtx,
+ BSON("x" << 1),
+ noLimit,
+ DocumentSourceSort::kMaxMemoryUsageBytes,
+ mergingPresorted);
+ auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx));
+
+ // Make a $mergeCursors stage and add it to the front of the pipeline.
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
+ auto mergeCursorsStage =
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx);
+ pipeline->addInitialSource(std::move(mergeCursorsStage));
+
+ // After optimization we should only have a $mergeCursors stage.
+ pipeline->optimizePipeline();
+ ASSERT_EQ(pipeline->getSources().size(), 1UL);
+ ASSERT_TRUE(dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
+
+ // Iterate the pipeline asynchronously on a different thread, since it will block waiting for
+ // network responses, which we will manually schedule below.
+ auto future = launchAsync([&pipeline]() {
+ ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 1}}));
+ ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 2}}));
+ ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 3}}));
+ ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 4}}));
+ ASSERT_FALSE(static_cast<bool>(pipeline->getNext()));
+ });
+
+ onCommand([&](const auto& request) {
+ return cursorResponseObj(expCtx->ns,
+ kExhaustedCursorID,
+ {BSON("x" << 1 << "$sortKey" << BSON("" << 1)),
+ BSON("x" << 3 << "$sortKey" << BSON("" << 3))});
+ });
+ onCommand([&](const auto& request) {
+ return cursorResponseObj(expCtx->ns,
+ kExhaustedCursorID,
+ {BSON("x" << 2 << "$sortKey" << BSON("" << 2)),
+ BSON("x" << 4 << "$sortKey" << BSON("" << 4))});
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLeadingSort) {
+ auto expCtx = getExpCtx();
+
+ // Make a pipeline with a single $sort stage that is merging pre-sorted results.
+ const bool mergingPresorted = true;
+ const long long limit = 3;
+ auto sortStage = DocumentSourceSort::create(
+ expCtx, BSON("x" << 1), limit, DocumentSourceSort::kMaxMemoryUsageBytes, mergingPresorted);
+ auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx));
+
+ // Make a $mergeCursors stage and add it to the front of the pipeline.
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
+ auto mergeCursorsStage =
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx);
+ pipeline->addInitialSource(std::move(mergeCursorsStage));
+
+ // After optimization, we should still have a $limit stage.
+ pipeline->optimizePipeline();
+ ASSERT_EQ(pipeline->getSources().size(), 2UL);
+ ASSERT_TRUE(dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
+ ASSERT_TRUE(dynamic_cast<DocumentSourceLimit*>(pipeline->getSources().back().get()));
+
+ // Iterate the pipeline asynchronously on a different thread, since it will block waiting for
+ // network responses, which we will manually schedule below.
+ auto future = launchAsync([&]() {
+ for (int i = 1; i <= limit; ++i) {
+ ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", i}}));
+ }
+ ASSERT_FALSE(static_cast<bool>(pipeline->getNext()));
+ });
+
+ onCommand([&](const auto& request) {
+ return cursorResponseObj(expCtx->ns,
+ kExhaustedCursorID,
+ {BSON("x" << 1 << "$sortKey" << BSON("" << 1)),
+ BSON("x" << 3 << "$sortKey" << BSON("" << 3))});
+ });
+ onCommand([&](const auto& request) {
+ return cursorResponseObj(expCtx->ns,
+ kExhaustedCursorID,
+ {BSON("x" << 2 << "$sortKey" << BSON("" << 2)),
+ BSON("x" << 4 << "$sortKey" << BSON("" << 4))});
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(DocumentSourceMergeCursorsTest, ShouldSerializeSortIfAbsorbedViaOptimize) {
+ auto expCtx = getExpCtx();
+
+ // Make a pipeline with a single $sort stage that is merging pre-sorted results.
+ const bool mergingPresorted = true;
+ const long long limit = 3;
+ auto sortStage = DocumentSourceSort::create(
+ expCtx, BSON("x" << 1), limit, DocumentSourceSort::kMaxMemoryUsageBytes, mergingPresorted);
+ auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx));
+
+ // Make a $mergeCursors stage and add it to the front of the pipeline.
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
+ auto mergeCursorsStage =
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx);
+ pipeline->addInitialSource(std::move(mergeCursorsStage));
+
+ // After optimization, we should still have a $limit stage.
+ pipeline->optimizePipeline();
+ ASSERT_EQ(pipeline->getSources().size(), 2UL);
+ ASSERT_TRUE(dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
+ ASSERT_TRUE(dynamic_cast<DocumentSourceLimit*>(pipeline->getSources().back().get()));
+
+ auto serialized = pipeline->serialize();
+ ASSERT_EQ(serialized.size(), 3UL);
+ ASSERT_FALSE(serialized[0]["$mergeCursors"].missing());
+ ASSERT_FALSE(serialized[1]["$sort"].missing());
+ ASSERT_FALSE(serialized[2]["$limit"].missing());
+}
+
+} // namespace
+} // namespace mongo