diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/query/cursor_response.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/query/cursor_response.h | 9 | ||||
-rw-r--r-- | src/mongo/db/query/cursor_response_test.cpp | 36 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_getmore_cmd.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger_test.cpp | 69 |
5 files changed, 83 insertions, 44 deletions
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp index 85afd949b1d..355596ae086 100644 --- a/src/mongo/db/query/cursor_response.cpp +++ b/src/mongo/db/query/cursor_response.cpp @@ -138,13 +138,16 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo return {{NamespaceString(fullns), cursorId, batch}}; } -void CursorResponse::addToBSON(BSONObjBuilder* builder) const { +void CursorResponse::addToBSON(CursorResponse::ResponseType responseType, + BSONObjBuilder* builder) const { BSONObjBuilder cursorBuilder(builder->subobjStart(kCursorField)); cursorBuilder.append(kIdField, cursorId); cursorBuilder.append(kNsField, nss.ns()); - BSONArrayBuilder batchBuilder(cursorBuilder.subarrayStart(kBatchField)); + const char* batchFieldName = + (responseType == ResponseType::InitialResponse) ? kBatchFieldInitial : kBatchField; + BSONArrayBuilder batchBuilder(cursorBuilder.subarrayStart(batchFieldName)); for (const BSONObj& obj : batch) { batchBuilder.append(obj); } @@ -155,9 +158,9 @@ void CursorResponse::addToBSON(BSONObjBuilder* builder) const { builder->append("ok", 1.0); } -BSONObj CursorResponse::toBSON() const { +BSONObj CursorResponse::toBSON(CursorResponse::ResponseType responseType) const { BSONObjBuilder builder; - addToBSON(&builder); + addToBSON(responseType, &builder); return builder.obj(); } diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h index e606dcd1559..ac15477e28c 100644 --- a/src/mongo/db/query/cursor_response.h +++ b/src/mongo/db/query/cursor_response.h @@ -66,6 +66,11 @@ void appendGetMoreResponseObject(long long cursorId, BSONObjBuilder* builder); struct CursorResponse { + enum class ResponseType { + InitialResponse, + SubsequentResponse, + }; + /** * Constructs from values for each of the fields. */ @@ -82,8 +87,8 @@ struct CursorResponse { /** * Converts this response to its raw BSON representation. */ - BSONObj toBSON() const; - void addToBSON(BSONObjBuilder* builder) const; + BSONObj toBSON(ResponseType responseType) const; + void addToBSON(ResponseType responseType, BSONObjBuilder* builder) const; const NamespaceString nss; const CursorId cursorId; diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp index 37f426d8eb9..9db3ad4083a 100644 --- a/src/mongo/db/query/cursor_response_test.cpp +++ b/src/mongo/db/query/cursor_response_test.cpp @@ -182,10 +182,22 @@ TEST(CursorResponseTest, parseFromBSONHandleErrorResponse) { ASSERT_EQ(result.getStatus().reason(), "does not work"); } -TEST(CursorResponseTest, toBSON) { +TEST(CursorResponseTest, toBSONInitialResponse) { std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)}; CursorResponse response(NamespaceString("testdb.testcoll"), CursorId(123), batch); - BSONObj responseObj = response.toBSON(); + BSONObj responseObj = response.toBSON(CursorResponse::ResponseType::InitialResponse); + BSONObj expectedResponse = BSON( + "cursor" << BSON("id" << CursorId(123) << "ns" + << "testdb.testcoll" + << "firstBatch" << BSON_ARRAY(BSON("_id" << 1) << BSON("_id" << 2))) + << "ok" << 1.0); + ASSERT_EQ(responseObj, expectedResponse); +} + +TEST(CursorResponseTest, toBSONSubsequentResponse) { + std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)}; + CursorResponse response(NamespaceString("testdb.testcoll"), CursorId(123), batch); + BSONObj responseObj = response.toBSON(CursorResponse::ResponseType::SubsequentResponse); BSONObj expectedResponse = BSON( "cursor" << BSON("id" << CursorId(123) << "ns" << "testdb.testcoll" @@ -194,12 +206,28 @@ TEST(CursorResponseTest, toBSON) { ASSERT_EQ(responseObj, expectedResponse); } -TEST(CursorResponseTest, addToBSON) { +TEST(CursorResponseTest, addToBSONInitialResponse) { + std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)}; + CursorResponse response(NamespaceString("testdb.testcoll"), CursorId(123), batch); + + BSONObjBuilder builder; + response.addToBSON(CursorResponse::ResponseType::InitialResponse, &builder); + BSONObj responseObj = builder.obj(); + + BSONObj expectedResponse = BSON( + "cursor" << BSON("id" << CursorId(123) << "ns" + << "testdb.testcoll" + << "firstBatch" << BSON_ARRAY(BSON("_id" << 1) << BSON("_id" << 2))) + << "ok" << 1.0); + ASSERT_EQ(responseObj, expectedResponse); +} + +TEST(CursorResponseTest, addToBSONSubsequentResponse) { std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)}; CursorResponse response(NamespaceString("testdb.testcoll"), CursorId(123), batch); BSONObjBuilder builder; - response.addToBSON(&builder); + response.addToBSON(CursorResponse::ResponseType::SubsequentResponse, &builder); BSONObj responseObj = builder.obj(); BSONObj expectedResponse = BSON( diff --git a/src/mongo/s/commands/cluster_getmore_cmd.cpp b/src/mongo/s/commands/cluster_getmore_cmd.cpp index 58957534680..85b36701049 100644 --- a/src/mongo/s/commands/cluster_getmore_cmd.cpp +++ b/src/mongo/s/commands/cluster_getmore_cmd.cpp @@ -114,7 +114,7 @@ public: return appendCommandStatus(result, response.getStatus()); } - response.getValue().addToBSON(&result); + response.getValue().addToBSON(CursorResponse::ResponseType::SubsequentResponse, &result); return true; } diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index aeac1010fb0..b72a1f61729 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -96,12 +96,13 @@ protected: } /** - * Schedules a list of getMore responses to be returned by the mock network. + * Schedules a list of cursor responses to be returned by the mock network. */ - void scheduleNetworkResponses(std::vector<CursorResponse> responses) { + void scheduleNetworkResponses(std::vector<CursorResponse> responses, + CursorResponse::ResponseType responseType) { std::vector<BSONObj> objs; for (const auto& cursorResponse : responses) { - objs.push_back(cursorResponse.toBSON()); + objs.push_back(cursorResponse.toBSON(responseType)); } scheduleNetworkResponseObjs(objs); } @@ -181,7 +182,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFind) { std::vector<BSONObj> batch1 = { fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; responses.emplace_back(_nss, CursorId(0), batch1); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); // Can't return any results until we have a response from all three shards. ASSERT_FALSE(arm->ready()); @@ -192,7 +193,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFind) { responses.emplace_back(_nss, CursorId(0), batch2); std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")}; responses.emplace_back(_nss, CursorId(0), batch3); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); @@ -227,7 +228,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) { responses.emplace_back(_nss, CursorId(11), batch2); std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")}; responses.emplace_back(_nss, CursorId(12), batch3); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -254,7 +255,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) { responses.emplace_back(_nss, CursorId(0), batch5); std::vector<BSONObj> batch6 = {fromjson("{_id: 10}")}; responses.emplace_back(_nss, CursorId(0), batch6); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -273,7 +274,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) { responses.clear(); std::vector<BSONObj> batch7 = {fromjson("{_id: 11}")}; responses.emplace_back(_nss, CursorId(0), batch7); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -297,7 +298,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindSorted) { responses.emplace_back(_nss, CursorId(0), batch2); std::vector<BSONObj> batch3 = {fromjson("{_id: 4}"), fromjson("{_id: 8}")}; responses.emplace_back(_nss, CursorId(0), batch3); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -331,7 +332,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) { responses.emplace_back(_nss, CursorId(0), batch2); std::vector<BSONObj> batch3 = {fromjson("{_id: 7}"), fromjson("{_id: 8}")}; responses.emplace_back(_nss, CursorId(2), batch3); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -350,7 +351,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) { responses.clear(); std::vector<BSONObj> batch4 = {fromjson("{_id: 7}"), fromjson("{_id: 10}")}; responses.emplace_back(_nss, CursorId(0), batch4); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -367,7 +368,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) { responses.clear(); std::vector<BSONObj> batch5 = {fromjson("{_id: 9}"), fromjson("{_id: 10}")}; responses.emplace_back(_nss, CursorId(0), batch5); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -396,7 +397,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) { std::vector<CursorResponse> responses; responses.emplace_back(_nss, CursorId(1), std::vector<BSONObj>()); responses.emplace_back(_nss, CursorId(0), std::vector<BSONObj>()); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); // In handling the responses from the first shard, the ARM should have already asked // for an additional batch from that shard. It won't have anything to return until it @@ -405,7 +406,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) { responses.clear(); std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")}; responses.emplace_back(_nss, CursorId(1), batch1); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -419,14 +420,14 @@ TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) { // do this, but there's no reason the ARM can't handle this by asking for more. responses.clear(); responses.emplace_back(_nss, CursorId(1), std::vector<BSONObj>()); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); // The shard responds with another batch and closes the cursor. ASSERT_FALSE(arm->ready()); responses.clear(); std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(0), batch2); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -449,7 +450,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { responses.emplace_back(_nss, CursorId(1), batch1); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(_nss, CursorId(2), batch2); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -470,7 +471,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { responses.clear(); std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")}; responses.emplace_back(_nss, CursorId(1), batch3); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); blackHoleNextRequest(); executor->waitForEvent(readyEvent); @@ -487,7 +488,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) { responses.clear(); std::vector<BSONObj> batch4 = {fromjson("{_id: 7}"), fromjson("{_id: 8}")}; responses.emplace_back(_nss, CursorId(0), batch4); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -514,7 +515,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { std::vector<BSONObj> batch1 = { fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")}; responses.emplace_back(_nss, CursorId(123), batch1); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -532,7 +533,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) { std::vector<BSONObj> batch2 = { fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")}; responses.emplace_back(_nss, CursorId(456), batch2); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -552,10 +553,12 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) { ASSERT_FALSE(arm->ready()); std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; - BSONObj response1 = CursorResponse(_nss, CursorId(123), batch1).toBSON(); + BSONObj response1 = CursorResponse(_nss, CursorId(123), batch1) + .toBSON(CursorResponse::ResponseType::InitialResponse); BSONObj response2 = fromjson("{foo: 'bar'}"); std::vector<BSONObj> batch3 = {fromjson("{_id: 4}"), fromjson("{_id: 5}")}; - BSONObj response3 = CursorResponse(_nss, CursorId(456), batch3).toBSON(); + BSONObj response3 = CursorResponse(_nss, CursorId(456), batch3) + .toBSON(CursorResponse::ResponseType::InitialResponse); scheduleNetworkResponseObjs({response1, response2, response3}); executor->waitForEvent(readyEvent); @@ -581,7 +584,7 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) { responses.emplace_back(_nss, CursorId(1), batch1); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(_nss, CursorId(2), batch2); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); scheduleErrorResponse({ErrorCodes::BadValue, "bad thing happened"}); executor->waitForEvent(readyEvent); @@ -610,7 +613,7 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(0), batch); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -680,7 +683,7 @@ TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) { responses.emplace_back(_nss, CursorId(0), batch2); std::vector<BSONObj> batch3 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")}; responses.emplace_back(_nss, CursorId(123), batch3); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); // Kill should be able to return right away if there are no pending batches. auto killedEvent = arm->kill(); @@ -700,7 +703,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(0), batch1); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); // Kill event will only be signalled once the pending batches are received. auto killedEvent = arm->kill(); @@ -725,7 +728,7 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(1), batch1); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); auto killedEvent = arm->kill(); @@ -757,7 +760,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(123), batch1); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -776,7 +779,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) { responses.clear(); std::vector<BSONObj> batch2 = {fromjson("{_id: 3}")}; responses.emplace_back(_nss, CursorId(123), batch2); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -800,7 +803,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) { std::vector<CursorResponse> responses; std::vector<BSONObj> batch; responses.emplace_back(_nss, CursorId(123), batch); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); // After receiving an empty batch, the ARM should return boost::none. @@ -823,7 +826,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")}; responses.emplace_back(_nss, CursorId(1), batch1); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); @@ -843,7 +846,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) { ASSERT_OK(request.getStatus()); ASSERT_EQ(*request.getValue().batchSize, 1LL); ASSERT_EQ(request.getValue().cursorid, 1LL); - scheduleNetworkResponses(responses); + scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse); executor->waitForEvent(readyEvent); ASSERT_TRUE(arm->ready()); |