summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorJason Rassi <rassi@10gen.com>2015-08-28 02:17:49 -0400
committerJason Rassi <rassi@10gen.com>2015-08-28 15:32:04 -0400
commit450dc96f2e57bea4af7929d2ad10d3afb774e410 (patch)
tree1b5c9bebfee570f77fa75d273d8d23cd4e2942e8 /src/mongo
parent1c5fdf89ee6fd377096b9369b94ee490792a22b8 (diff)
downloadmongo-450dc96f2e57bea4af7929d2ad10d3afb774e410.tar.gz
SERVER-19569 CursorResponse::toBSON() ability to return first batch
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/query/cursor_response.cpp11
-rw-r--r--src/mongo/db/query/cursor_response.h9
-rw-r--r--src/mongo/db/query/cursor_response_test.cpp36
-rw-r--r--src/mongo/s/commands/cluster_getmore_cmd.cpp2
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp69
5 files changed, 83 insertions, 44 deletions
diff --git a/src/mongo/db/query/cursor_response.cpp b/src/mongo/db/query/cursor_response.cpp
index 85afd949b1d..355596ae086 100644
--- a/src/mongo/db/query/cursor_response.cpp
+++ b/src/mongo/db/query/cursor_response.cpp
@@ -138,13 +138,16 @@ StatusWith<CursorResponse> CursorResponse::parseFromBSON(const BSONObj& cmdRespo
return {{NamespaceString(fullns), cursorId, batch}};
}
-void CursorResponse::addToBSON(BSONObjBuilder* builder) const {
+void CursorResponse::addToBSON(CursorResponse::ResponseType responseType,
+ BSONObjBuilder* builder) const {
BSONObjBuilder cursorBuilder(builder->subobjStart(kCursorField));
cursorBuilder.append(kIdField, cursorId);
cursorBuilder.append(kNsField, nss.ns());
- BSONArrayBuilder batchBuilder(cursorBuilder.subarrayStart(kBatchField));
+ const char* batchFieldName =
+ (responseType == ResponseType::InitialResponse) ? kBatchFieldInitial : kBatchField;
+ BSONArrayBuilder batchBuilder(cursorBuilder.subarrayStart(batchFieldName));
for (const BSONObj& obj : batch) {
batchBuilder.append(obj);
}
@@ -155,9 +158,9 @@ void CursorResponse::addToBSON(BSONObjBuilder* builder) const {
builder->append("ok", 1.0);
}
-BSONObj CursorResponse::toBSON() const {
+BSONObj CursorResponse::toBSON(CursorResponse::ResponseType responseType) const {
BSONObjBuilder builder;
- addToBSON(&builder);
+ addToBSON(responseType, &builder);
return builder.obj();
}
diff --git a/src/mongo/db/query/cursor_response.h b/src/mongo/db/query/cursor_response.h
index e606dcd1559..ac15477e28c 100644
--- a/src/mongo/db/query/cursor_response.h
+++ b/src/mongo/db/query/cursor_response.h
@@ -66,6 +66,11 @@ void appendGetMoreResponseObject(long long cursorId,
BSONObjBuilder* builder);
struct CursorResponse {
+ enum class ResponseType {
+ InitialResponse,
+ SubsequentResponse,
+ };
+
/**
* Constructs from values for each of the fields.
*/
@@ -82,8 +87,8 @@ struct CursorResponse {
/**
* Converts this response to its raw BSON representation.
*/
- BSONObj toBSON() const;
- void addToBSON(BSONObjBuilder* builder) const;
+ BSONObj toBSON(ResponseType responseType) const;
+ void addToBSON(ResponseType responseType, BSONObjBuilder* builder) const;
const NamespaceString nss;
const CursorId cursorId;
diff --git a/src/mongo/db/query/cursor_response_test.cpp b/src/mongo/db/query/cursor_response_test.cpp
index 37f426d8eb9..9db3ad4083a 100644
--- a/src/mongo/db/query/cursor_response_test.cpp
+++ b/src/mongo/db/query/cursor_response_test.cpp
@@ -182,10 +182,22 @@ TEST(CursorResponseTest, parseFromBSONHandleErrorResponse) {
ASSERT_EQ(result.getStatus().reason(), "does not work");
}
-TEST(CursorResponseTest, toBSON) {
+TEST(CursorResponseTest, toBSONInitialResponse) {
std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)};
CursorResponse response(NamespaceString("testdb.testcoll"), CursorId(123), batch);
- BSONObj responseObj = response.toBSON();
+ BSONObj responseObj = response.toBSON(CursorResponse::ResponseType::InitialResponse);
+ BSONObj expectedResponse = BSON(
+ "cursor" << BSON("id" << CursorId(123) << "ns"
+ << "testdb.testcoll"
+ << "firstBatch" << BSON_ARRAY(BSON("_id" << 1) << BSON("_id" << 2)))
+ << "ok" << 1.0);
+ ASSERT_EQ(responseObj, expectedResponse);
+}
+
+TEST(CursorResponseTest, toBSONSubsequentResponse) {
+ std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)};
+ CursorResponse response(NamespaceString("testdb.testcoll"), CursorId(123), batch);
+ BSONObj responseObj = response.toBSON(CursorResponse::ResponseType::SubsequentResponse);
BSONObj expectedResponse = BSON(
"cursor" << BSON("id" << CursorId(123) << "ns"
<< "testdb.testcoll"
@@ -194,12 +206,28 @@ TEST(CursorResponseTest, toBSON) {
ASSERT_EQ(responseObj, expectedResponse);
}
-TEST(CursorResponseTest, addToBSON) {
+TEST(CursorResponseTest, addToBSONInitialResponse) {
+ std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)};
+ CursorResponse response(NamespaceString("testdb.testcoll"), CursorId(123), batch);
+
+ BSONObjBuilder builder;
+ response.addToBSON(CursorResponse::ResponseType::InitialResponse, &builder);
+ BSONObj responseObj = builder.obj();
+
+ BSONObj expectedResponse = BSON(
+ "cursor" << BSON("id" << CursorId(123) << "ns"
+ << "testdb.testcoll"
+ << "firstBatch" << BSON_ARRAY(BSON("_id" << 1) << BSON("_id" << 2)))
+ << "ok" << 1.0);
+ ASSERT_EQ(responseObj, expectedResponse);
+}
+
+TEST(CursorResponseTest, addToBSONSubsequentResponse) {
std::vector<BSONObj> batch = {BSON("_id" << 1), BSON("_id" << 2)};
CursorResponse response(NamespaceString("testdb.testcoll"), CursorId(123), batch);
BSONObjBuilder builder;
- response.addToBSON(&builder);
+ response.addToBSON(CursorResponse::ResponseType::SubsequentResponse, &builder);
BSONObj responseObj = builder.obj();
BSONObj expectedResponse = BSON(
diff --git a/src/mongo/s/commands/cluster_getmore_cmd.cpp b/src/mongo/s/commands/cluster_getmore_cmd.cpp
index 58957534680..85b36701049 100644
--- a/src/mongo/s/commands/cluster_getmore_cmd.cpp
+++ b/src/mongo/s/commands/cluster_getmore_cmd.cpp
@@ -114,7 +114,7 @@ public:
return appendCommandStatus(result, response.getStatus());
}
- response.getValue().addToBSON(&result);
+ response.getValue().addToBSON(CursorResponse::ResponseType::SubsequentResponse, &result);
return true;
}
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index aeac1010fb0..b72a1f61729 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -96,12 +96,13 @@ protected:
}
/**
- * Schedules a list of getMore responses to be returned by the mock network.
+ * Schedules a list of cursor responses to be returned by the mock network.
*/
- void scheduleNetworkResponses(std::vector<CursorResponse> responses) {
+ void scheduleNetworkResponses(std::vector<CursorResponse> responses,
+ CursorResponse::ResponseType responseType) {
std::vector<BSONObj> objs;
for (const auto& cursorResponse : responses) {
- objs.push_back(cursorResponse.toBSON());
+ objs.push_back(cursorResponse.toBSON(responseType));
}
scheduleNetworkResponseObjs(objs);
}
@@ -181,7 +182,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFind) {
std::vector<BSONObj> batch1 = {
fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
responses.emplace_back(_nss, CursorId(0), batch1);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
// Can't return any results until we have a response from all three shards.
ASSERT_FALSE(arm->ready());
@@ -192,7 +193,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFind) {
responses.emplace_back(_nss, CursorId(0), batch2);
std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")};
responses.emplace_back(_nss, CursorId(0), batch3);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse);
executor->waitForEvent(readyEvent);
@@ -227,7 +228,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) {
responses.emplace_back(_nss, CursorId(11), batch2);
std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")};
responses.emplace_back(_nss, CursorId(12), batch3);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
executor->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -254,7 +255,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) {
responses.emplace_back(_nss, CursorId(0), batch5);
std::vector<BSONObj> batch6 = {fromjson("{_id: 10}")};
responses.emplace_back(_nss, CursorId(0), batch6);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse);
executor->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -273,7 +274,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMore) {
responses.clear();
std::vector<BSONObj> batch7 = {fromjson("{_id: 11}")};
responses.emplace_back(_nss, CursorId(0), batch7);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse);
executor->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -297,7 +298,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindSorted) {
responses.emplace_back(_nss, CursorId(0), batch2);
std::vector<BSONObj> batch3 = {fromjson("{_id: 4}"), fromjson("{_id: 8}")};
responses.emplace_back(_nss, CursorId(0), batch3);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
executor->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -331,7 +332,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) {
responses.emplace_back(_nss, CursorId(0), batch2);
std::vector<BSONObj> batch3 = {fromjson("{_id: 7}"), fromjson("{_id: 8}")};
responses.emplace_back(_nss, CursorId(2), batch3);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
executor->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -350,7 +351,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) {
responses.clear();
std::vector<BSONObj> batch4 = {fromjson("{_id: 7}"), fromjson("{_id: 10}")};
responses.emplace_back(_nss, CursorId(0), batch4);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse);
executor->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -367,7 +368,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindAndGetMoreSorted) {
responses.clear();
std::vector<BSONObj> batch5 = {fromjson("{_id: 9}"), fromjson("{_id: 10}")};
responses.emplace_back(_nss, CursorId(0), batch5);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse);
executor->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -396,7 +397,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) {
std::vector<CursorResponse> responses;
responses.emplace_back(_nss, CursorId(1), std::vector<BSONObj>());
responses.emplace_back(_nss, CursorId(0), std::vector<BSONObj>());
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
// In handling the responses from the first shard, the ARM should have already asked
// for an additional batch from that shard. It won't have anything to return until it
@@ -405,7 +406,7 @@ TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) {
responses.clear();
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}")};
responses.emplace_back(_nss, CursorId(1), batch1);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse);
executor->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -419,14 +420,14 @@ TEST_F(AsyncResultsMergerTest, ClusterFindInitialBatchSizeIsZero) {
// do this, but there's no reason the ARM can't handle this by asking for more.
responses.clear();
responses.emplace_back(_nss, CursorId(1), std::vector<BSONObj>());
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse);
// The shard responds with another batch and closes the cursor.
ASSERT_FALSE(arm->ready());
responses.clear();
std::vector<BSONObj> batch2 = {fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(0), batch2);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse);
executor->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -449,7 +450,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
responses.emplace_back(_nss, CursorId(1), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
responses.emplace_back(_nss, CursorId(2), batch2);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
executor->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -470,7 +471,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
responses.clear();
std::vector<BSONObj> batch3 = {fromjson("{_id: 5}"), fromjson("{_id: 6}")};
responses.emplace_back(_nss, CursorId(1), batch3);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse);
blackHoleNextRequest();
executor->waitForEvent(readyEvent);
@@ -487,7 +488,7 @@ TEST_F(AsyncResultsMergerTest, StreamResultsFromOneShardIfOtherDoesntRespond) {
responses.clear();
std::vector<BSONObj> batch4 = {fromjson("{_id: 7}"), fromjson("{_id: 8}")};
responses.emplace_back(_nss, CursorId(0), batch4);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse);
executor->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -514,7 +515,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
std::vector<BSONObj> batch1 = {
fromjson("{_id: 1}"), fromjson("{_id: 2}"), fromjson("{_id: 3}")};
responses.emplace_back(_nss, CursorId(123), batch1);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
executor->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -532,7 +533,7 @@ TEST_F(AsyncResultsMergerTest, ErrorOnMismatchedCursorIds) {
std::vector<BSONObj> batch2 = {
fromjson("{_id: 4}"), fromjson("{_id: 5}"), fromjson("{_id: 6}")};
responses.emplace_back(_nss, CursorId(456), batch2);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse);
executor->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -552,10 +553,12 @@ TEST_F(AsyncResultsMergerTest, BadResponseReceivedFromShard) {
ASSERT_FALSE(arm->ready());
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
- BSONObj response1 = CursorResponse(_nss, CursorId(123), batch1).toBSON();
+ BSONObj response1 = CursorResponse(_nss, CursorId(123), batch1)
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
BSONObj response2 = fromjson("{foo: 'bar'}");
std::vector<BSONObj> batch3 = {fromjson("{_id: 4}"), fromjson("{_id: 5}")};
- BSONObj response3 = CursorResponse(_nss, CursorId(456), batch3).toBSON();
+ BSONObj response3 = CursorResponse(_nss, CursorId(456), batch3)
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
scheduleNetworkResponseObjs({response1, response2, response3});
executor->waitForEvent(readyEvent);
@@ -581,7 +584,7 @@ TEST_F(AsyncResultsMergerTest, ErrorReceivedFromShard) {
responses.emplace_back(_nss, CursorId(1), batch1);
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
responses.emplace_back(_nss, CursorId(2), batch2);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
scheduleErrorResponse({ErrorCodes::BadValue, "bad thing happened"});
executor->waitForEvent(readyEvent);
@@ -610,7 +613,7 @@ TEST_F(AsyncResultsMergerTest, ErrorCantScheduleEventBeforeLastSignaled) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(0), batch);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
executor->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -680,7 +683,7 @@ TEST_F(AsyncResultsMergerTest, KillAllBatchesReceived) {
responses.emplace_back(_nss, CursorId(0), batch2);
std::vector<BSONObj> batch3 = {fromjson("{_id: 3}"), fromjson("{_id: 4}")};
responses.emplace_back(_nss, CursorId(123), batch3);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
// Kill should be able to return right away if there are no pending batches.
auto killedEvent = arm->kill();
@@ -700,7 +703,7 @@ TEST_F(AsyncResultsMergerTest, KillTwoOutstandingBatches) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(0), batch1);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
// Kill event will only be signalled once the pending batches are received.
auto killedEvent = arm->kill();
@@ -725,7 +728,7 @@ TEST_F(AsyncResultsMergerTest, NextEventErrorsAfterKill) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(1), batch1);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
auto killedEvent = arm->kill();
@@ -757,7 +760,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(123), batch1);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
executor->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -776,7 +779,7 @@ TEST_F(AsyncResultsMergerTest, TailableBasic) {
responses.clear();
std::vector<BSONObj> batch2 = {fromjson("{_id: 3}")};
responses.emplace_back(_nss, CursorId(123), batch2);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse);
executor->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -800,7 +803,7 @@ TEST_F(AsyncResultsMergerTest, TailableEmptyBatch) {
std::vector<CursorResponse> responses;
std::vector<BSONObj> batch;
responses.emplace_back(_nss, CursorId(123), batch);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
executor->waitForEvent(readyEvent);
// After receiving an empty batch, the ARM should return boost::none.
@@ -823,7 +826,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
std::vector<BSONObj> batch1 = {fromjson("{_id: 1}"), fromjson("{_id: 2}")};
responses.emplace_back(_nss, CursorId(1), batch1);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::InitialResponse);
executor->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());
@@ -843,7 +846,7 @@ TEST_F(AsyncResultsMergerTest, GetMoreBatchSizes) {
ASSERT_OK(request.getStatus());
ASSERT_EQ(*request.getValue().batchSize, 1LL);
ASSERT_EQ(request.getValue().cursorid, 1LL);
- scheduleNetworkResponses(responses);
+ scheduleNetworkResponses(responses, CursorResponse::ResponseType::SubsequentResponse);
executor->waitForEvent(readyEvent);
ASSERT_TRUE(arm->ready());