summaryrefslogtreecommitdiff
path: root/src/mongo/rpc
diff options
context:
space:
mode:
authorWilliam Schultz <william.schultz@mongodb.com>2018-09-07 16:57:54 -0400
committerWilliam Schultz <william.schultz@mongodb.com>2018-09-07 17:19:34 -0400
commita8fe1d82c3c5484c7e8a6402a34bc45ea11d06f1 (patch)
tree4d37aeb1841841447f2404d9b298207dd5705282 /src/mongo/rpc
parent614477159b98497d6372f31ea8d918ca39259ccf (diff)
downloadmongo-a8fe1d82c3c5484c7e8a6402a34bc45ea11d06f1.tar.gz
SERVER-36299 Add integration test for exhaust with OP_MSG
This reverts commit 2d79aaf6d2b65f0355fcaa5d368ec3c4f493495a.
Diffstat (limited to 'src/mongo/rpc')
-rw-r--r--src/mongo/rpc/op_msg_integration_test.cpp131
1 files changed, 131 insertions, 0 deletions
diff --git a/src/mongo/rpc/op_msg_integration_test.cpp b/src/mongo/rpc/op_msg_integration_test.cpp
index e0bdb01bef6..4486c342e1d 100644
--- a/src/mongo/rpc/op_msg_integration_test.cpp
+++ b/src/mongo/rpc/op_msg_integration_test.cpp
@@ -29,6 +29,7 @@
#include "mongo/platform/basic.h"
#include "mongo/client/dbclient_connection.h"
+#include "mongo/db/query/getmore_request.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/op_msg.h"
#include "mongo/unittest/integration_test.h"
@@ -197,4 +198,134 @@ TEST(OpMsg, DocumentSequenceReturnsWork) {
<< "admin"));
}
+TEST(OpMsg, ServerHandlesExhaustCorrectly) {
+ std::string errMsg;
+ auto conn = std::unique_ptr<DBClientBase>(
+ unittest::getFixtureConnectionString().connect("integration_test", errMsg));
+ uassert(ErrorCodes::SocketException, errMsg, conn);
+
+ // Only test exhaust against a single server.
+ if (conn->isReplicaSetMember() || conn->isMongos()) {
+ return;
+ }
+
+ NamespaceString nss("test", "coll");
+
+ conn->dropCollection(nss.toString());
+
+ // Insert a few documents.
+ for (int i = 0; i < 5; i++) {
+ conn->insert(nss.toString(), BSON("_id" << i), 0);
+ }
+
+ // Issue a find request to open a cursor but return 0 documents.
+ auto findCmd = BSON("find" << nss.coll() << "batchSize" << 0 << "sort" << BSON("_id" << 1));
+ auto opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), findCmd);
+ auto request = opMsgRequest.serialize();
+
+ Message reply;
+ ASSERT(conn->call(request, reply));
+ auto res = OpMsg::parse(reply).body;
+ const long long cursorId = res["cursor"]["id"].numberLong();
+ ASSERT(res["cursor"]["firstBatch"].Array().empty());
+ ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+
+ // Construct getMore request with exhaust flag. Set batch size so we will need multiple batches
+ // to exhaust the cursor.
+ int batchSize = 2;
+ GetMoreRequest gmr(nss, cursorId, batchSize, boost::none, boost::none, boost::none);
+ opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), gmr.toBSON());
+ request = opMsgRequest.serialize();
+ OpMsg::setFlag(&request, OpMsg::kExhaustSupported);
+
+ // Run getMore to initiate the exhaust stream.
+ ASSERT(conn->call(request, reply));
+ auto lastRequestId = reply.header().getId();
+ ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+ ASSERT_EQ(res["cursor"]["id"].numberLong(), cursorId);
+ std::vector<BSONElement> nextBatch = res["cursor"]["nextBatch"].Array();
+ ASSERT_EQ(nextBatch.size(), 2U);
+ ASSERT_BSONOBJ_EQ(nextBatch[0].embeddedObject(), BSON("_id" << 0));
+ ASSERT_BSONOBJ_EQ(nextBatch[1].embeddedObject(), BSON("_id" << 1));
+
+ // Receive next exhaust batch.
+ conn->recv(reply, lastRequestId);
+ lastRequestId = reply.header().getId();
+ ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+ ASSERT_EQ(res["cursor"]["id"].numberLong(), cursorId);
+ nextBatch = res["cursor"]["nextBatch"].Array();
+ ASSERT_EQ(nextBatch.size(), 2U);
+ ASSERT_BSONOBJ_EQ(nextBatch[0].embeddedObject(), BSON("_id" << 2));
+ ASSERT_BSONOBJ_EQ(nextBatch[1].embeddedObject(), BSON("_id" << 3));
+
+ // Receive terminal batch.
+ ASSERT(conn->recv(reply, lastRequestId));
+ ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+ ASSERT_EQ(res["cursor"]["id"].numberLong(), 0);
+ nextBatch = res["cursor"]["nextBatch"].Array();
+ ASSERT_EQ(nextBatch.size(), 1U);
+ ASSERT_BSONOBJ_EQ(nextBatch[0].embeddedObject(), BSON("_id" << 4));
+}
+
+TEST(OpMsg, ExhaustWithDBClientCursorBehavesCorrectly) {
+ // This test simply tries to verify that using the exhaust option with DBClientCursor works
+ // correctly. The externally visible behavior should technically be the same as a non-exhaust
+ // cursor. The exhaust cursor should ideally provide a performance win over non-exhaust, but we
+ // don't measure that here.
+ std::string errMsg;
+ auto conn = std::unique_ptr<DBClientBase>(
+ unittest::getFixtureConnectionString().connect("integration_test", errMsg));
+ uassert(ErrorCodes::SocketException, errMsg, conn);
+
+ // Only test exhaust against a single server.
+ if (conn->isReplicaSetMember() || conn->isMongos()) {
+ return;
+ }
+
+ NamespaceString nss("test", "coll");
+ conn->dropCollection(nss.toString());
+
+ const int nDocs = 5;
+ unittest::log() << "Inserting " << nDocs << " documents.";
+ for (int i = 0; i < nDocs; i++) {
+ auto doc = BSON("_id" << i);
+ conn->insert(nss.toString(), doc, 0);
+ }
+
+ ASSERT_EQ(conn->count(nss.toString()), size_t(nDocs));
+ unittest::log() << "Finished document insertion.";
+
+ // Open an exhaust cursor.
+ int batchSize = 2;
+ auto cursor =
+ conn->query(nss, Query().sort("_id", 1), 0, 0, nullptr, QueryOption_Exhaust, batchSize);
+
+ // Verify that the documents are returned properly. Exhaust cursors should still receive results
+ // in batches, so we check that these batches correspond to the given specified batch size.
+ ASSERT(cursor->more());
+ ASSERT_BSONOBJ_EQ(cursor->next(), BSON("_id" << 0));
+ ASSERT(cursor->more());
+ ASSERT_BSONOBJ_EQ(cursor->next(), BSON("_id" << 1));
+ ASSERT_EQ(cursor->objsLeftInBatch(), 0);
+
+ ASSERT(cursor->more());
+ ASSERT_BSONOBJ_EQ(cursor->next(), BSON("_id" << 2));
+ ASSERT(cursor->more());
+ ASSERT_BSONOBJ_EQ(cursor->next(), BSON("_id" << 3));
+ ASSERT_EQ(cursor->objsLeftInBatch(), 0);
+
+ ASSERT(cursor->more());
+ ASSERT_BSONOBJ_EQ(cursor->next(), BSON("_id" << 4));
+ ASSERT_EQ(cursor->objsLeftInBatch(), 0);
+
+ // Should have consumed all documents at this point.
+ ASSERT(!cursor->more());
+ ASSERT(cursor->isDead());
+}
} // namespace mongo