summaryrefslogtreecommitdiff
path: root/src/mongo/rpc/op_msg_integration_test.cpp
diff options
context:
space:
mode:
authorTess Avitabile <tess.avitabile@mongodb.com>2019-12-16 21:15:41 +0000
committerevergreen <evergreen@mongodb.com>2019-12-16 21:15:41 +0000
commitec498c5968974012d50d6d04c26cd2cd0db87d22 (patch)
tree39a80b70f400f27bd4af3d83748112bfed7cc3b4 /src/mongo/rpc/op_msg_integration_test.cpp
parentd9f793354be29ef51e7d32e8cb46e7bf84b99d66 (diff)
downloadmongo-ec498c5968974012d50d6d04c26cd2cd0db87d22.tar.gz
SERVER-44510 Implement exhaust isMaster
Diffstat (limited to 'src/mongo/rpc/op_msg_integration_test.cpp')
-rw-r--r--src/mongo/rpc/op_msg_integration_test.cpp196
1 files changed, 191 insertions, 5 deletions
diff --git a/src/mongo/rpc/op_msg_integration_test.cpp b/src/mongo/rpc/op_msg_integration_test.cpp
index 8659b7014d9..40a6ee044a8 100644
--- a/src/mongo/rpc/op_msg_integration_test.cpp
+++ b/src/mongo/rpc/op_msg_integration_test.cpp
@@ -30,6 +30,7 @@
#include "mongo/platform/basic.h"
#include "mongo/client/dbclient_connection.h"
+#include "mongo/client/dbclient_rs.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/query/getmore_request.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -288,7 +289,7 @@ void enableClientChecksum() {
failPoint->setMode(FailPoint::off);
}
-void exhaustTest(bool enableChecksum) {
+void exhaustGetMoreTest(bool enableChecksum) {
std::string errMsg;
auto conn = std::unique_ptr<DBClientBase>(
unittest::getFixtureConnectionString().connect("integration_test", errMsg));
@@ -374,12 +375,197 @@ void exhaustTest(bool enableChecksum) {
ASSERT_BSONOBJ_EQ(nextBatch[0].embeddedObject(), BSON("_id" << 4));
}
-TEST(OpMsg, ServerHandlesExhaustCorrectly) {
- exhaustTest(false);
+TEST(OpMsg, ServerHandlesExhaustGetMoreCorrectly) {
+ exhaustGetMoreTest(false);
}
-TEST(OpMsg, ServerHandlesExhaustCorrectlyWithChecksum) {
- exhaustTest(true);
+TEST(OpMsg, ServerHandlesExhaustGetMoreCorrectlyWithChecksum) {
+ exhaustGetMoreTest(true);
+}
+
+void exhaustIsMasterTest(bool enableChecksum) {
+ std::string errMsg;
+ auto fixtureConn = std::unique_ptr<DBClientBase>(
+ unittest::getFixtureConnectionString().connect("integration_test", errMsg));
+ uassert(ErrorCodes::SocketException, errMsg, fixtureConn);
+
+ // TODO SERVER-44813: Run this test on standalone.
+ // TODO SERVER-44521: Run this test on mongos.
+ if (!fixtureConn->isReplicaSetMember()) {
+ return;
+ }
+
+ // Connect directly to the primary.
+ DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn();
+ ASSERT(conn);
+
+ if (!enableChecksum) {
+ disableClientChecksum();
+ }
+
+ ON_BLOCK_EXIT([&] { enableClientChecksum(); });
+
+ auto tickSource = getGlobalServiceContext()->getTickSource();
+
+ // Issue an isMaster command without a topology version.
+ auto isMasterCmd = BSON("isMaster" << 1);
+ auto opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd);
+ auto request = opMsgRequest.serialize();
+
+ Message reply;
+ ASSERT(conn->call(request, reply));
+ auto res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+ auto topologyVersion = res["topologyVersion"].Obj().getOwned();
+ ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ // Reply has checksum if and only if the request did.
+ ASSERT_EQ(OpMsg::isFlagSet(reply, OpMsg::kChecksumPresent), enableChecksum);
+
+ // Construct isMaster command with topologyVersion, maxAwaitTimeMS, and exhaust.
+ isMasterCmd =
+ BSON("isMaster" << 1 << "topologyVersion" << topologyVersion << "maxAwaitTimeMS" << 100);
+ opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd);
+ request = opMsgRequest.serialize();
+ OpMsg::setFlag(&request, OpMsg::kExhaustSupported);
+
+ // Run isMaster command to initiate the exhaust stream.
+ auto beforeExhaustCommand = tickSource->getTicks();
+ ASSERT(conn->call(request, reply));
+ auto afterFirstResponse = tickSource->getTicks();
+ // Allow for clock skew when testing the response time.
+ ASSERT_GT(tickSource->ticksTo<Milliseconds>(afterFirstResponse - beforeExhaustCommand),
+ Milliseconds(50));
+ ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ ASSERT_EQ(OpMsg::isFlagSet(reply, OpMsg::kChecksumPresent), enableChecksum);
+ res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+ auto nextTopologyVersion = res["topologyVersion"].Obj().getOwned();
+ ASSERT_BSONOBJ_EQ(topologyVersion, nextTopologyVersion);
+
+ // Receive next exhaust message.
+ auto lastRequestId = reply.header().getId();
+ ASSERT_OK(conn->recv(reply, lastRequestId));
+ auto afterSecondResponse = tickSource->getTicks();
+ // Allow for clock skew when testing the response time.
+ ASSERT_GT(tickSource->ticksTo<Milliseconds>(afterSecondResponse - afterFirstResponse),
+ Milliseconds(50));
+ ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ ASSERT_EQ(OpMsg::isFlagSet(reply, OpMsg::kChecksumPresent), enableChecksum);
+ res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+ nextTopologyVersion = res["topologyVersion"].Obj().getOwned();
+ ASSERT_BSONOBJ_EQ(topologyVersion, nextTopologyVersion);
+
+ // The exhaust stream would continue indefinitely.
+}
+
+TEST(OpMsg, ServerHandlesExhaustIsMasterCorrectly) {
+ exhaustIsMasterTest(false);
+}
+
+// TODO SERVER-44517: The checksum logic will be unified for exhaust commands, so we don't need to
+// test checksum for both exhaust isMaster and exhaust getMore.
+TEST(OpMsg, ServerHandlesExhaustIsMasterCorrectlyWithChecksum) {
+ exhaustIsMasterTest(true);
+}
+
+TEST(OpMsg, ServerHandlesExhaustIsMasterWithTopologyChange) {
+ std::string errMsg;
+ auto fixtureConn = std::unique_ptr<DBClientBase>(
+ unittest::getFixtureConnectionString().connect("integration_test", errMsg));
+ uassert(ErrorCodes::SocketException, errMsg, fixtureConn);
+
+ // TODO SERVER-44813: Run this test on standalone.
+ // TODO SERVER-44521: Run this test on mongos.
+ if (!fixtureConn->isReplicaSetMember()) {
+ return;
+ }
+
+ // Connect directly to the primary.
+ DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn();
+ ASSERT(conn);
+
+ auto tickSource = getGlobalServiceContext()->getTickSource();
+
+ // Issue an isMaster command without a topology version.
+ auto isMasterCmd = BSON("isMaster" << 1);
+ auto opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd);
+ auto request = opMsgRequest.serialize();
+
+ Message reply;
+ ASSERT(conn->call(request, reply));
+ auto res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+ auto topologyVersion = res["topologyVersion"].Obj().getOwned();
+ ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+
+ // Construct isMaster command with topologyVersion, maxAwaitTimeMS, and exhaust. Use a different
+ // processId for the topologyVersion so that the first response is returned immediately.
+ isMasterCmd = BSON("isMaster" << 1 << "topologyVersion"
+ << BSON("processId" << OID::gen() << "counter" << 0LL)
+ << "maxAwaitTimeMS" << 100);
+ opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd);
+ request = opMsgRequest.serialize();
+ OpMsg::setFlag(&request, OpMsg::kExhaustSupported);
+
+ // Run isMaster command to initiate the exhaust stream. The first response should be received
+ // immediately.
+ auto beforeExhaustCommand = tickSource->getTicks();
+ ASSERT(conn->call(request, reply));
+ auto afterFirstResponse = tickSource->getTicks();
+ // TODO SERVER-44514: Change the following assertion to use ASSERT_LT once the server responds
+ // immediately on a topologyVersion with a different processId.
+ // Allow for clock skew when testing the response time.
+ ASSERT_GT(tickSource->ticksTo<Milliseconds>(afterFirstResponse - beforeExhaustCommand),
+ Milliseconds(50));
+ ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+ auto nextTopologyVersion = res["topologyVersion"].Obj().getOwned();
+ ASSERT_BSONOBJ_EQ(topologyVersion, nextTopologyVersion);
+
+ // Receive next exhaust message. The second response waits for 'maxAwaitTimeMS'.
+ auto lastRequestId = reply.header().getId();
+ ASSERT_OK(conn->recv(reply, lastRequestId));
+ auto afterSecondResponse = tickSource->getTicks();
+ // Allow for clock skew when testing the response time.
+ ASSERT_GT(tickSource->ticksTo<Milliseconds>(afterSecondResponse - afterFirstResponse),
+ Milliseconds(50));
+ ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+ nextTopologyVersion = res["topologyVersion"].Obj().getOwned();
+ ASSERT_BSONOBJ_EQ(topologyVersion, nextTopologyVersion);
+
+ // The exhaust stream would continue indefinitely.
+}
+
+TEST(OpMsg, ServerRejectsExhaustIsMasterWithoutMaxAwaitTimeMS) {
+ std::string errMsg;
+ auto fixtureConn = std::unique_ptr<DBClientBase>(
+ unittest::getFixtureConnectionString().connect("integration_test", errMsg));
+ uassert(ErrorCodes::SocketException, errMsg, fixtureConn);
+
+ // TODO SERVER-44813: Run this test on standalone.
+ // TODO SERVER-44521: Run this test on mongos.
+ if (!fixtureConn->isReplicaSetMember()) {
+ return;
+ }
+
+ // Connect directly to the primary.
+ DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn();
+ ASSERT(conn);
+
+ // Issue an isMaster command with exhaust but no maxAwaitTimeMS.
+ auto isMasterCmd = BSON("isMaster" << 1);
+ auto opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd);
+ auto request = opMsgRequest.serialize();
+ OpMsg::setFlag(&request, OpMsg::kExhaustSupported);
+
+ Message reply;
+ ASSERT(conn->call(request, reply));
+ auto res = OpMsg::parse(reply).body;
+ ASSERT_NOT_OK(getStatusFromCommandResult(res));
}
TEST(OpMsg, ExhaustWithDBClientCursorBehavesCorrectly) {