diff options
author | William Schultz <william.schultz@mongodb.com> | 2018-09-07 16:57:54 -0400 |
---|---|---|
committer | William Schultz <william.schultz@mongodb.com> | 2018-09-07 17:19:34 -0400 |
commit | a8fe1d82c3c5484c7e8a6402a34bc45ea11d06f1 (patch) | |
tree | 4d37aeb1841841447f2404d9b298207dd5705282 /src/mongo/rpc | |
parent | 614477159b98497d6372f31ea8d918ca39259ccf (diff) | |
download | mongo-a8fe1d82c3c5484c7e8a6402a34bc45ea11d06f1.tar.gz |
SERVER-36299 Add integration test for exhaust with OP_MSG
This reverts commit 2d79aaf6d2b65f0355fcaa5d368ec3c4f493495a.
Diffstat (limited to 'src/mongo/rpc')
-rw-r--r-- | src/mongo/rpc/op_msg_integration_test.cpp | 131 |
1 files changed, 131 insertions, 0 deletions
diff --git a/src/mongo/rpc/op_msg_integration_test.cpp b/src/mongo/rpc/op_msg_integration_test.cpp index e0bdb01bef6..4486c342e1d 100644 --- a/src/mongo/rpc/op_msg_integration_test.cpp +++ b/src/mongo/rpc/op_msg_integration_test.cpp @@ -29,6 +29,7 @@ #include "mongo/platform/basic.h" #include "mongo/client/dbclient_connection.h" +#include "mongo/db/query/getmore_request.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/op_msg.h" #include "mongo/unittest/integration_test.h" @@ -197,4 +198,134 @@ TEST(OpMsg, DocumentSequenceReturnsWork) { << "admin")); } +TEST(OpMsg, ServerHandlesExhaustCorrectly) { + std::string errMsg; + auto conn = std::unique_ptr<DBClientBase>( + unittest::getFixtureConnectionString().connect("integration_test", errMsg)); + uassert(ErrorCodes::SocketException, errMsg, conn); + + // Only test exhaust against a single server. + if (conn->isReplicaSetMember() || conn->isMongos()) { + return; + } + + NamespaceString nss("test", "coll"); + + conn->dropCollection(nss.toString()); + + // Insert a few documents. + for (int i = 0; i < 5; i++) { + conn->insert(nss.toString(), BSON("_id" << i), 0); + } + + // Issue a find request to open a cursor but return 0 documents. + auto findCmd = BSON("find" << nss.coll() << "batchSize" << 0 << "sort" << BSON("_id" << 1)); + auto opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), findCmd); + auto request = opMsgRequest.serialize(); + + Message reply; + ASSERT(conn->call(request, reply)); + auto res = OpMsg::parse(reply).body; + const long long cursorId = res["cursor"]["id"].numberLong(); + ASSERT(res["cursor"]["firstBatch"].Array().empty()); + ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + + // Construct getMore request with exhaust flag. Set batch size so we will need multiple batches + // to exhaust the cursor. + int batchSize = 2; + GetMoreRequest gmr(nss, cursorId, batchSize, boost::none, boost::none, boost::none); + opMsgRequest = OpMsgRequest::fromDBAndBody(nss.db(), gmr.toBSON()); + request = opMsgRequest.serialize(); + OpMsg::setFlag(&request, OpMsg::kExhaustSupported); + + // Run getMore to initiate the exhaust stream. + ASSERT(conn->call(request, reply)); + auto lastRequestId = reply.header().getId(); + ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + ASSERT_EQ(res["cursor"]["id"].numberLong(), cursorId); + std::vector<BSONElement> nextBatch = res["cursor"]["nextBatch"].Array(); + ASSERT_EQ(nextBatch.size(), 2U); + ASSERT_BSONOBJ_EQ(nextBatch[0].embeddedObject(), BSON("_id" << 0)); + ASSERT_BSONOBJ_EQ(nextBatch[1].embeddedObject(), BSON("_id" << 1)); + + // Receive next exhaust batch. + conn->recv(reply, lastRequestId); + lastRequestId = reply.header().getId(); + ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + ASSERT_EQ(res["cursor"]["id"].numberLong(), cursorId); + nextBatch = res["cursor"]["nextBatch"].Array(); + ASSERT_EQ(nextBatch.size(), 2U); + ASSERT_BSONOBJ_EQ(nextBatch[0].embeddedObject(), BSON("_id" << 2)); + ASSERT_BSONOBJ_EQ(nextBatch[1].embeddedObject(), BSON("_id" << 3)); + + // Receive terminal batch. + ASSERT(conn->recv(reply, lastRequestId)); + ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + ASSERT_EQ(res["cursor"]["id"].numberLong(), 0); + nextBatch = res["cursor"]["nextBatch"].Array(); + ASSERT_EQ(nextBatch.size(), 1U); + ASSERT_BSONOBJ_EQ(nextBatch[0].embeddedObject(), BSON("_id" << 4)); +} + +TEST(OpMsg, ExhaustWithDBClientCursorBehavesCorrectly) { + // This test simply tries to verify that using the exhaust option with DBClientCursor works + // correctly. The externally visible behavior should technically be the same as a non-exhaust + // cursor. The exhaust cursor should ideally provide a performance win over non-exhaust, but we + // don't measure that here. + std::string errMsg; + auto conn = std::unique_ptr<DBClientBase>( + unittest::getFixtureConnectionString().connect("integration_test", errMsg)); + uassert(ErrorCodes::SocketException, errMsg, conn); + + // Only test exhaust against a single server. + if (conn->isReplicaSetMember() || conn->isMongos()) { + return; + } + + NamespaceString nss("test", "coll"); + conn->dropCollection(nss.toString()); + + const int nDocs = 5; + unittest::log() << "Inserting " << nDocs << " documents."; + for (int i = 0; i < nDocs; i++) { + auto doc = BSON("_id" << i); + conn->insert(nss.toString(), doc, 0); + } + + ASSERT_EQ(conn->count(nss.toString()), size_t(nDocs)); + unittest::log() << "Finished document insertion."; + + // Open an exhaust cursor. + int batchSize = 2; + auto cursor = + conn->query(nss, Query().sort("_id", 1), 0, 0, nullptr, QueryOption_Exhaust, batchSize); + + // Verify that the documents are returned properly. Exhaust cursors should still receive results + // in batches, so we check that these batches correspond to the given specified batch size. + ASSERT(cursor->more()); + ASSERT_BSONOBJ_EQ(cursor->next(), BSON("_id" << 0)); + ASSERT(cursor->more()); + ASSERT_BSONOBJ_EQ(cursor->next(), BSON("_id" << 1)); + ASSERT_EQ(cursor->objsLeftInBatch(), 0); + + ASSERT(cursor->more()); + ASSERT_BSONOBJ_EQ(cursor->next(), BSON("_id" << 2)); + ASSERT(cursor->more()); + ASSERT_BSONOBJ_EQ(cursor->next(), BSON("_id" << 3)); + ASSERT_EQ(cursor->objsLeftInBatch(), 0); + + ASSERT(cursor->more()); + ASSERT_BSONOBJ_EQ(cursor->next(), BSON("_id" << 4)); + ASSERT_EQ(cursor->objsLeftInBatch(), 0); + + // Should have consumed all documents at this point. + ASSERT(!cursor->more()); + ASSERT(cursor->isDead()); +} } // namespace mongo |