diff options
author | Tess Avitabile <tess.avitabile@mongodb.com> | 2019-12-16 21:15:41 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-12-16 21:15:41 +0000 |
commit | ec498c5968974012d50d6d04c26cd2cd0db87d22 (patch) | |
tree | 39a80b70f400f27bd4af3d83748112bfed7cc3b4 /src/mongo/rpc/op_msg_integration_test.cpp | |
parent | d9f793354be29ef51e7d32e8cb46e7bf84b99d66 (diff) | |
download | mongo-ec498c5968974012d50d6d04c26cd2cd0db87d22.tar.gz |
SERVER-44510 Implement exhaust isMaster
Diffstat (limited to 'src/mongo/rpc/op_msg_integration_test.cpp')
-rw-r--r-- | src/mongo/rpc/op_msg_integration_test.cpp | 196 |
1 files changed, 191 insertions, 5 deletions
diff --git a/src/mongo/rpc/op_msg_integration_test.cpp b/src/mongo/rpc/op_msg_integration_test.cpp index 8659b7014d9..40a6ee044a8 100644 --- a/src/mongo/rpc/op_msg_integration_test.cpp +++ b/src/mongo/rpc/op_msg_integration_test.cpp @@ -30,6 +30,7 @@ #include "mongo/platform/basic.h" #include "mongo/client/dbclient_connection.h" +#include "mongo/client/dbclient_rs.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/query/getmore_request.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -288,7 +289,7 @@ void enableClientChecksum() { failPoint->setMode(FailPoint::off); } -void exhaustTest(bool enableChecksum) { +void exhaustGetMoreTest(bool enableChecksum) { std::string errMsg; auto conn = std::unique_ptr<DBClientBase>( unittest::getFixtureConnectionString().connect("integration_test", errMsg)); @@ -374,12 +375,197 @@ void exhaustTest(bool enableChecksum) { ASSERT_BSONOBJ_EQ(nextBatch[0].embeddedObject(), BSON("_id" << 4)); } -TEST(OpMsg, ServerHandlesExhaustCorrectly) { - exhaustTest(false); +TEST(OpMsg, ServerHandlesExhaustGetMoreCorrectly) { + exhaustGetMoreTest(false); } -TEST(OpMsg, ServerHandlesExhaustCorrectlyWithChecksum) { - exhaustTest(true); +TEST(OpMsg, ServerHandlesExhaustGetMoreCorrectlyWithChecksum) { + exhaustGetMoreTest(true); +} + +void exhaustIsMasterTest(bool enableChecksum) { + std::string errMsg; + auto fixtureConn = std::unique_ptr<DBClientBase>( + unittest::getFixtureConnectionString().connect("integration_test", errMsg)); + uassert(ErrorCodes::SocketException, errMsg, fixtureConn); + + // TODO SERVER-44813: Run this test on standalone. + // TODO SERVER-44521: Run this test on mongos. + if (!fixtureConn->isReplicaSetMember()) { + return; + } + + // Connect directly to the primary. + DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn(); + ASSERT(conn); + + if (!enableChecksum) { + disableClientChecksum(); + } + + ON_BLOCK_EXIT([&] { enableClientChecksum(); }); + + auto tickSource = getGlobalServiceContext()->getTickSource(); + + // Issue an isMaster command without a topology version. + auto isMasterCmd = BSON("isMaster" << 1); + auto opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd); + auto request = opMsgRequest.serialize(); + + Message reply; + ASSERT(conn->call(request, reply)); + auto res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + auto topologyVersion = res["topologyVersion"].Obj().getOwned(); + ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + // Reply has checksum if and only if the request did. + ASSERT_EQ(OpMsg::isFlagSet(reply, OpMsg::kChecksumPresent), enableChecksum); + + // Construct isMaster command with topologyVersion, maxAwaitTimeMS, and exhaust. + isMasterCmd = + BSON("isMaster" << 1 << "topologyVersion" << topologyVersion << "maxAwaitTimeMS" << 100); + opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd); + request = opMsgRequest.serialize(); + OpMsg::setFlag(&request, OpMsg::kExhaustSupported); + + // Run isMaster command to initiate the exhaust stream. + auto beforeExhaustCommand = tickSource->getTicks(); + ASSERT(conn->call(request, reply)); + auto afterFirstResponse = tickSource->getTicks(); + // Allow for clock skew when testing the response time. + ASSERT_GT(tickSource->ticksTo<Milliseconds>(afterFirstResponse - beforeExhaustCommand), + Milliseconds(50)); + ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + ASSERT_EQ(OpMsg::isFlagSet(reply, OpMsg::kChecksumPresent), enableChecksum); + res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + auto nextTopologyVersion = res["topologyVersion"].Obj().getOwned(); + ASSERT_BSONOBJ_EQ(topologyVersion, nextTopologyVersion); + + // Receive next exhaust message. + auto lastRequestId = reply.header().getId(); + ASSERT_OK(conn->recv(reply, lastRequestId)); + auto afterSecondResponse = tickSource->getTicks(); + // Allow for clock skew when testing the response time. + ASSERT_GT(tickSource->ticksTo<Milliseconds>(afterSecondResponse - afterFirstResponse), + Milliseconds(50)); + ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + ASSERT_EQ(OpMsg::isFlagSet(reply, OpMsg::kChecksumPresent), enableChecksum); + res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + nextTopologyVersion = res["topologyVersion"].Obj().getOwned(); + ASSERT_BSONOBJ_EQ(topologyVersion, nextTopologyVersion); + + // The exhaust stream would continue indefinitely. +} + +TEST(OpMsg, ServerHandlesExhaustIsMasterCorrectly) { + exhaustIsMasterTest(false); +} + +// TODO SERVER-44517: The checksum logic will be unified for exhaust commands, so we don't need to +// test checksum for both exhaust isMaster and exhaust getMore. +TEST(OpMsg, ServerHandlesExhaustIsMasterCorrectlyWithChecksum) { + exhaustIsMasterTest(true); +} + +TEST(OpMsg, ServerHandlesExhaustIsMasterWithTopologyChange) { + std::string errMsg; + auto fixtureConn = std::unique_ptr<DBClientBase>( + unittest::getFixtureConnectionString().connect("integration_test", errMsg)); + uassert(ErrorCodes::SocketException, errMsg, fixtureConn); + + // TODO SERVER-44813: Run this test on standalone. + // TODO SERVER-44521: Run this test on mongos. + if (!fixtureConn->isReplicaSetMember()) { + return; + } + + // Connect directly to the primary. + DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn(); + ASSERT(conn); + + auto tickSource = getGlobalServiceContext()->getTickSource(); + + // Issue an isMaster command without a topology version. + auto isMasterCmd = BSON("isMaster" << 1); + auto opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd); + auto request = opMsgRequest.serialize(); + + Message reply; + ASSERT(conn->call(request, reply)); + auto res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + auto topologyVersion = res["topologyVersion"].Obj().getOwned(); + ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + + // Construct isMaster command with topologyVersion, maxAwaitTimeMS, and exhaust. Use a different + // processId for the topologyVersion so that the first response is returned immediately. + isMasterCmd = BSON("isMaster" << 1 << "topologyVersion" + << BSON("processId" << OID::gen() << "counter" << 0LL) + << "maxAwaitTimeMS" << 100); + opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd); + request = opMsgRequest.serialize(); + OpMsg::setFlag(&request, OpMsg::kExhaustSupported); + + // Run isMaster command to initiate the exhaust stream. The first response should be received + // immediately. + auto beforeExhaustCommand = tickSource->getTicks(); + ASSERT(conn->call(request, reply)); + auto afterFirstResponse = tickSource->getTicks(); + // TODO SERVER-44514: Change the following assertion to use ASSERT_LT once the server responds + // immediately on a topologyVersion with a different processId. + // Allow for clock skew when testing the response time. + ASSERT_GT(tickSource->ticksTo<Milliseconds>(afterFirstResponse - beforeExhaustCommand), + Milliseconds(50)); + ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + auto nextTopologyVersion = res["topologyVersion"].Obj().getOwned(); + ASSERT_BSONOBJ_EQ(topologyVersion, nextTopologyVersion); + + // Receive next exhaust message. The second response waits for 'maxAwaitTimeMS'. + auto lastRequestId = reply.header().getId(); + ASSERT_OK(conn->recv(reply, lastRequestId)); + auto afterSecondResponse = tickSource->getTicks(); + // Allow for clock skew when testing the response time. + ASSERT_GT(tickSource->ticksTo<Milliseconds>(afterSecondResponse - afterFirstResponse), + Milliseconds(50)); + ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + nextTopologyVersion = res["topologyVersion"].Obj().getOwned(); + ASSERT_BSONOBJ_EQ(topologyVersion, nextTopologyVersion); + + // The exhaust stream would continue indefinitely. +} + +TEST(OpMsg, ServerRejectsExhaustIsMasterWithoutMaxAwaitTimeMS) { + std::string errMsg; + auto fixtureConn = std::unique_ptr<DBClientBase>( + unittest::getFixtureConnectionString().connect("integration_test", errMsg)); + uassert(ErrorCodes::SocketException, errMsg, fixtureConn); + + // TODO SERVER-44813: Run this test on standalone. + // TODO SERVER-44521: Run this test on mongos. + if (!fixtureConn->isReplicaSetMember()) { + return; + } + + // Connect directly to the primary. + DBClientBase* conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn(); + ASSERT(conn); + + // Issue an isMaster command with exhaust but no maxAwaitTimeMS. + auto isMasterCmd = BSON("isMaster" << 1); + auto opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd); + auto request = opMsgRequest.serialize(); + OpMsg::setFlag(&request, OpMsg::kExhaustSupported); + + Message reply; + ASSERT(conn->call(request, reply)); + auto res = OpMsg::parse(reply).body; + ASSERT_NOT_OK(getStatusFromCommandResult(res)); } TEST(OpMsg, ExhaustWithDBClientCursorBehavesCorrectly) { |