summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
diff options
context:
space:
mode:
authorWilliam Schultz <william.schultz@mongodb.com>2018-04-05 13:56:47 -0400
committerWilliam Schultz <william.schultz@mongodb.com>2018-04-05 13:59:27 -0400
commite88c6d85036607ddf86105234917b4adfffbd612 (patch)
tree43ff091111fec8836654b0ccc3dcaff6e3d2a518 /src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
parentac6544a9194197b5ab10f563cf1a19dcdc42349c (diff)
downloadmongo-e88c6d85036607ddf86105234917b4adfffbd612.tar.gz
Revert "SERVER-33323 Use the IDL to serialize the ARM"
This reverts commit 7d09f278a2acf9791b36927d6af1d30347d60391.
Diffstat (limited to 'src/mongo/db/pipeline/document_source_merge_cursors_test.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors_test.cpp208
1 files changed, 60 insertions, 148 deletions
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
index 3614324d27d..4809ac54b50 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_cursors_test.cpp
@@ -111,7 +111,9 @@ private:
};
TEST_F(DocumentSourceMergeCursorsTest, ShouldRejectNonArray) {
- auto spec = BSON("$mergeCursors" << 2);
+ auto spec = BSON("$mergeCursors" << BSON(
+ "cursors" << BSON_ARRAY(BSON("ns" << kTestNss.ns() << "id" << 0LL << "host"
+ << kTestHost.toString()))));
ASSERT_THROWS_CODE(DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx()),
AssertionException,
17026);
@@ -211,66 +213,16 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToParseTheSerializedVersionOf
ASSERT(DocumentSourceMergeCursors::createFromBson(newSpec.firstElement(), getExpCtx()));
}
-RemoteCursor makeRemoteCursor(ShardId shardId, HostAndPort host, CursorResponse response) {
- RemoteCursor remoteCursor;
- remoteCursor.setShardId(std::move(shardId));
- remoteCursor.setHostAndPort(std::move(host));
- remoteCursor.setCursorResponse(std::move(response));
- return remoteCursor;
-}
-
-TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToParseSerializedARMParams) {
- AsyncResultsMergerParams params;
- params.setSort(BSON("y" << 1 << "z" << 1));
- params.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(makeRemoteCursor(
- kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, kExhaustedCursorID, {})));
- params.setRemotes(std::move(cursors));
- auto spec = BSON("$mergeCursors" << params.toBSON());
- auto mergeCursors =
- DocumentSourceMergeCursors::createFromBson(spec.firstElement(), getExpCtx());
- std::vector<Value> serializationArray;
- mergeCursors->serializeToArray(serializationArray);
- ASSERT_EQ(serializationArray.size(), 1UL);
-
- // Make sure the serialized version can be parsed into an identical AsyncResultsMergerParams.
- auto newSpec = serializationArray[0].getDocument().toBson();
- ASSERT(newSpec["$mergeCursors"].type() == BSONType::Object);
- auto newParams = AsyncResultsMergerParams::parse(IDLParserErrorContext("$mergeCursors test"),
- newSpec["$mergeCursors"].Obj());
- ASSERT_TRUE(params.getSort());
- ASSERT_BSONOBJ_EQ(*params.getSort(), *newParams.getSort());
- ASSERT_EQ(params.getCompareWholeSortKey(), newParams.getCompareWholeSortKey());
- ASSERT(params.getTailableMode() == newParams.getTailableMode());
- ASSERT(params.getBatchSize() == newParams.getBatchSize());
- ASSERT_EQ(params.getNss(), newParams.getNss());
- ASSERT_EQ(params.getAllowPartialResults(), newParams.getAllowPartialResults());
- ASSERT_EQ(newParams.getRemotes().size(), 1UL);
- ASSERT(newParams.getRemotes()[0].getShardId() == kTestShardIds[0].toString());
- ASSERT(newParams.getRemotes()[0].getHostAndPort() == kTestShardHosts[0]);
- ASSERT_EQ(newParams.getRemotes()[0].getCursorResponse().getNSS(), kTestNss);
- ASSERT_EQ(newParams.getRemotes()[0].getCursorResponse().getCursorId(), kExhaustedCursorID);
- ASSERT(newParams.getRemotes()[0].getCursorResponse().getBatch().empty());
-
- // Test that the $mergeCursors stage will accept the serialized format of
- // AsyncResultsMergerParams.
- ASSERT(DocumentSourceMergeCursors::createFromBson(newSpec.firstElement(), getExpCtx()));
-}
-
TEST_F(DocumentSourceMergeCursorsTest, ShouldReportEOFWithNoCursors) {
auto expCtx = getExpCtx();
- AsyncResultsMergerParams armParams;
- armParams.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(makeRemoteCursor(
- kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, kExhaustedCursorID, {})));
- cursors.emplace_back(makeRemoteCursor(
- kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, kExhaustedCursorID, {})));
- armParams.setRemotes(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(
+ kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, kExhaustedCursorID, {}));
+ cursors.emplace_back(
+ kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, kExhaustedCursorID, {}));
auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
auto mergeCursorsStage =
- DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx);
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx);
ASSERT_TRUE(mergeCursorsStage->getNext().isEOF());
}
@@ -284,17 +236,12 @@ BSONObj cursorResponseObj(const NamespaceString& nss,
TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToIterateCursorsUntilEOF) {
auto expCtx = getExpCtx();
- AsyncResultsMergerParams armParams;
- armParams.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})));
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})));
- armParams.setRemotes(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
pipeline->addInitialSource(
- DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx));
// Iterate the $mergeCursors stage asynchronously on a different thread, since it will block
// waiting for network responses, which we will manually schedule below.
@@ -333,17 +280,12 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldBeAbleToIterateCursorsUntilEOF) {
TEST_F(DocumentSourceMergeCursorsTest, ShouldNotKillCursorsIfNeverIterated) {
auto expCtx = getExpCtx();
- AsyncResultsMergerParams armParams;
- armParams.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})));
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})));
- armParams.setRemotes(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
pipeline->addInitialSource(
- DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx));
pipeline.reset(); // Delete the pipeline before using it.
@@ -353,15 +295,11 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldNotKillCursorsIfNeverIterated) {
TEST_F(DocumentSourceMergeCursorsTest, ShouldKillCursorIfPartiallyIterated) {
auto expCtx = getExpCtx();
- AsyncResultsMergerParams armParams;
- armParams.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})));
- armParams.setRemotes(std::move(cursors));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
pipeline->addInitialSource(
- DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx));
// Iterate the pipeline asynchronously on a different thread, since it will block waiting for
// network responses, which we will manually schedule below.
@@ -406,16 +344,12 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldOptimizeWithASortToEnsureCorrectOrd
auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx));
// Make a $mergeCursors stage and add it to the front of the pipeline.
- AsyncResultsMergerParams armParams;
- armParams.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})));
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})));
- armParams.setRemotes(std::move(cursors));
- pipeline->addInitialSource(
- DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
+ auto mergeCursorsStage =
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx);
+ pipeline->addInitialSource(std::move(mergeCursorsStage));
// After optimization we should only have a $mergeCursors stage.
pipeline->optimizePipeline();
@@ -430,7 +364,6 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldOptimizeWithASortToEnsureCorrectOrd
ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 3}}));
ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 4}}));
ASSERT_FALSE(static_cast<bool>(pipeline->getNext()));
- std::cout << "Finished";
});
onCommand([&](const auto& request) {
@@ -449,35 +382,36 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldOptimizeWithASortToEnsureCorrectOrd
future.timed_get(kFutureTimeout);
}
-TEST_F(DocumentSourceMergeCursorsTest, ShouldEnforceSortSpecifiedViaARMParams) {
+TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLeadingSort) {
auto expCtx = getExpCtx();
- auto pipeline = uassertStatusOK(Pipeline::create({}, expCtx));
- // Make a $mergeCursors stage with a sort on "x" and add it to the front of the pipeline.
- AsyncResultsMergerParams armParams;
- armParams.setNss(kTestNss);
- armParams.setSort(BSON("x" << 1));
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})));
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})));
- armParams.setRemotes(std::move(cursors));
- pipeline->addInitialSource(
- DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
+ // Make a pipeline with a single $sort stage that is merging pre-sorted results.
+ const bool mergingPresorted = true;
+ const long long limit = 3;
+ auto sortStage = DocumentSourceSort::create(
+ expCtx, BSON("x" << 1), limit, DocumentSourceSort::kMaxMemoryUsageBytes, mergingPresorted);
+ auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx));
- // After optimization we should only have a $mergeCursors stage.
+ // Make a $mergeCursors stage and add it to the front of the pipeline.
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
+ auto mergeCursorsStage =
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx);
+ pipeline->addInitialSource(std::move(mergeCursorsStage));
+
+ // After optimization, we should still have a $limit stage.
pipeline->optimizePipeline();
- ASSERT_EQ(pipeline->getSources().size(), 1UL);
+ ASSERT_EQ(pipeline->getSources().size(), 2UL);
ASSERT_TRUE(dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
+ ASSERT_TRUE(dynamic_cast<DocumentSourceLimit*>(pipeline->getSources().back().get()));
// Iterate the pipeline asynchronously on a different thread, since it will block waiting for
// network responses, which we will manually schedule below.
- auto future = launchAsync([&pipeline]() {
- ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 1}}));
- ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 2}}));
- ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 3}}));
- ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", 4}}));
+ auto future = launchAsync([&]() {
+ for (int i = 1; i <= limit; ++i) {
+ ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", i}}));
+ }
ASSERT_FALSE(static_cast<bool>(pipeline->getNext()));
});
@@ -497,7 +431,7 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldEnforceSortSpecifiedViaARMParams) {
future.timed_get(kFutureTimeout);
}
-TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLeadingSort) {
+TEST_F(DocumentSourceMergeCursorsTest, ShouldSerializeSortIfAbsorbedViaOptimize) {
auto expCtx = getExpCtx();
// Make a pipeline with a single $sort stage that is merging pre-sorted results.
@@ -508,16 +442,12 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLea
auto pipeline = uassertStatusOK(Pipeline::create({std::move(sortStage)}, expCtx));
// Make a $mergeCursors stage and add it to the front of the pipeline.
- AsyncResultsMergerParams armParams;
- armParams.setNss(kTestNss);
- std::vector<RemoteCursor> cursors;
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {})));
- cursors.emplace_back(
- makeRemoteCursor(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {})));
- armParams.setRemotes(std::move(cursors));
- pipeline->addInitialSource(
- DocumentSourceMergeCursors::create(executor(), std::move(armParams), expCtx));
+ std::vector<ClusterClientCursorParams::RemoteCursor> cursors;
+ cursors.emplace_back(kTestShardIds[0], kTestShardHosts[0], CursorResponse(expCtx->ns, 1, {}));
+ cursors.emplace_back(kTestShardIds[1], kTestShardHosts[1], CursorResponse(expCtx->ns, 2, {}));
+ auto mergeCursorsStage =
+ DocumentSourceMergeCursors::create(std::move(cursors), executor(), expCtx);
+ pipeline->addInitialSource(std::move(mergeCursorsStage));
// After optimization, we should still have a $limit stage.
pipeline->optimizePipeline();
@@ -525,29 +455,11 @@ TEST_F(DocumentSourceMergeCursorsTest, ShouldNotRemoveLimitWhenOptimizingWithLea
ASSERT_TRUE(dynamic_cast<DocumentSourceMergeCursors*>(pipeline->getSources().front().get()));
ASSERT_TRUE(dynamic_cast<DocumentSourceLimit*>(pipeline->getSources().back().get()));
- // Iterate the pipeline asynchronously on a different thread, since it will block waiting for
- // network responses, which we will manually schedule below.
- auto future = launchAsync([&]() {
- for (int i = 1; i <= limit; ++i) {
- ASSERT_DOCUMENT_EQ(*pipeline->getNext(), (Document{{"x", i}}));
- }
- ASSERT_FALSE(static_cast<bool>(pipeline->getNext()));
- });
-
- onCommand([&](const auto& request) {
- return cursorResponseObj(expCtx->ns,
- kExhaustedCursorID,
- {BSON("x" << 1 << "$sortKey" << BSON("" << 1)),
- BSON("x" << 3 << "$sortKey" << BSON("" << 3))});
- });
- onCommand([&](const auto& request) {
- return cursorResponseObj(expCtx->ns,
- kExhaustedCursorID,
- {BSON("x" << 2 << "$sortKey" << BSON("" << 2)),
- BSON("x" << 4 << "$sortKey" << BSON("" << 4))});
- });
-
- future.timed_get(kFutureTimeout);
+ auto serialized = pipeline->serialize();
+ ASSERT_EQ(serialized.size(), 3UL);
+ ASSERT_FALSE(serialized[0]["$mergeCursors"].missing());
+ ASSERT_FALSE(serialized[1]["$sort"].missing());
+ ASSERT_FALSE(serialized[2]["$limit"].missing());
}
} // namespace