summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp16
-rw-r--r--src/mongo/db/repl/replication_info.cpp17
-rw-r--r--src/mongo/db/service_entry_point_common.cpp9
-rw-r--r--src/mongo/rpc/op_msg_integration_test.cpp261
-rw-r--r--src/mongo/s/commands/cluster_is_master_cmd.cpp15
-rw-r--r--src/mongo/s/commands/strategy.cpp8
-rw-r--r--src/mongo/transport/SConscript1
-rw-r--r--src/mongo/transport/ismaster_metrics.h93
-rw-r--r--src/mongo/transport/service_entry_point_impl.cpp5
9 files changed, 424 insertions, 1 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 5685e7955f2..751f971a648 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -93,6 +93,7 @@
#include "mongo/platform/mutex.h"
#include "mongo/rpc/metadata/oplog_query_metadata.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
+#include "mongo/transport/ismaster_metrics.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/scopeguard.h"
@@ -109,7 +110,10 @@ MONGO_FAIL_POINT_DEFINE(stepdownHangBeforeRSTLEnqueue);
// Fail setMaintenanceMode with ErrorCodes::NotSecondary to simulate a concurrent election.
MONGO_FAIL_POINT_DEFINE(setMaintenanceModeFailsWithNotSecondary);
MONGO_FAIL_POINT_DEFINE(forceSyncSourceRetryWaitForInitialSync);
+// Signals that an isMaster request has started waiting.
MONGO_FAIL_POINT_DEFINE(waitForIsMasterResponse);
+// Will cause an isMaster request to hang as it starts waiting.
+MONGO_FAIL_POINT_DEFINE(hangWhileWaitingForIsMasterResponse);
// Number of times we tried to go live as a secondary.
Counter64 attemptsToBecomeSecondary;
@@ -2062,10 +2066,16 @@ std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorImpl::awaitIsMaste
const TopologyVersion topologyVersion = _topCoord->getTopologyVersion();
lk.unlock();
+ IsMasterMetrics::get(opCtx)->incrementNumAwaitingTopologyChanges();
+
if (MONGO_unlikely(waitForIsMasterResponse.shouldFail())) {
// Used in tests that wait for this failpoint to be entered before triggering a topology
// change.
- LOGV2(21341, "waitForIsMasterResponse failpoint enabled.");
+ LOGV2(31464, "waitForIsMasterResponse failpoint enabled.");
+ }
+ if (MONGO_unlikely(hangWhileWaitingForIsMasterResponse.shouldFail())) {
+ LOGV2(21341, "Hanging due to hangWhileWaitingForIsMasterResponse failpoint.");
+ hangWhileWaitingForIsMasterResponse.pauseWhileSet(opCtx);
}
// Wait for a topology change with timeout set to deadline.
@@ -2083,6 +2093,7 @@ std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorImpl::awaitIsMaste
// Return an IsMasterResponse with the current topology version on timeout when waiting for
// a topology change.
stdx::lock_guard lk(_mutex);
+ IsMasterMetrics::get(opCtx)->decrementNumAwaitingTopologyChanges();
return _makeIsMasterResponse(horizonString, lk);
}
@@ -3241,6 +3252,8 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator(WithLock l
ON_BLOCK_EXIT([&] {
if (_rsConfig.isInitialized()) {
_fulfillTopologyChangePromise(opCtx, lk);
+ // Use the global ServiceContext here in case the current opCtx is null.
+ IsMasterMetrics::get(getGlobalServiceContext())->resetNumAwaitingTopologyChanges();
}
});
@@ -3736,6 +3749,7 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig(WithLock lk,
for (auto iter = _horizonToPromiseMap.begin(); iter != _horizonToPromiseMap.end(); iter++) {
iter->second->setError({ErrorCodes::SplitHorizonChange,
"Received a reconfig that changed the horizon parameters."});
+ IsMasterMetrics::get(opCtx)->resetNumAwaitingTopologyChanges();
}
if (_selfIndex >= 0) {
// Only create a new horizon promise mapping if the node exists in the new config.
diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp
index d5c183c365f..fd8ce22e8d0 100644
--- a/src/mongo/db/repl/replication_info.cpp
+++ b/src/mongo/db/repl/replication_info.cpp
@@ -63,13 +63,18 @@
#include "mongo/logv2/log.h"
#include "mongo/rpc/metadata/client_metadata.h"
#include "mongo/rpc/metadata/client_metadata_ismaster.h"
+#include "mongo/transport/ismaster_metrics.h"
#include "mongo/util/decimal_counter.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/map_util.h"
namespace mongo {
+// Hangs in the beginning of each isMaster command when set.
MONGO_FAIL_POINT_DEFINE(waitInIsMaster);
+// Awaitable isMaster requests with the proper topologyVersions will sleep for maxAwaitTimeMS on
+// standalones. This failpoint will hang right before doing this sleep when set.
+MONGO_FAIL_POINT_DEFINE(hangWaitingForIsMasterResponseOnStandalone);
using std::list;
using std::string;
@@ -121,6 +126,15 @@ TopologyVersion appendReplicationInfo(OperationContext* opCtx,
// The topologyVersion never changes on a running standalone process, so just sleep for
// maxAwaitTimeMS.
invariant(maxAwaitTimeMS);
+
+ IsMasterMetrics::get(opCtx)->incrementNumAwaitingTopologyChanges();
+ ON_BLOCK_EXIT([&] { IsMasterMetrics::get(opCtx)->decrementNumAwaitingTopologyChanges(); });
+ if (MONGO_unlikely(hangWaitingForIsMasterResponseOnStandalone.shouldFail())) {
+ // Used in tests that wait for this failpoint to be entered to guarantee that the
+ // request is waiting and metrics have been updated.
+ LOGV2(31462, "Hanging due to hangWaitingForIsMasterResponseOnStandalone failpoint.");
+ hangWaitingForIsMasterResponseOnStandalone.pauseWhileSet(opCtx);
+ }
opCtx->sleepFor(Milliseconds(*maxAwaitTimeMS));
}
@@ -499,6 +513,9 @@ public:
maxAwaitTimeMSField);
invariant(clientTopologyVersion);
+ InExhaustIsMaster::get(opCtx->getClient()->session().get())
+ ->setInExhaustIsMaster(true /* inExhaustIsMaster */);
+
if (clientTopologyVersion->getProcessId() == currentTopologyVersion.getProcessId() &&
clientTopologyVersion->getCounter() == currentTopologyVersion.getCounter()) {
// Indicate that an exhaust message should be generated and the previous BSONObj
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 7e1209c64de..20576ba4bd2 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -94,6 +94,8 @@
#include "mongo/rpc/metadata/tracking_metadata.h"
#include "mongo/rpc/op_msg.h"
#include "mongo/rpc/reply_builder_interface.h"
+#include "mongo/transport/ismaster_metrics.h"
+#include "mongo/transport/session.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
@@ -1283,6 +1285,13 @@ DbResponse receivedCommands(OperationContext* opCtx,
opCtx->setExhaust(OpMsg::isFlagSet(message, OpMsg::kExhaustSupported));
+ const auto session = opCtx->getClient()->session();
+ if (session) {
+ if (!opCtx->isExhaust() || c->getName() != "isMaster"_sd) {
+ InExhaustIsMaster::get(session.get())->setInExhaustIsMaster(false);
+ }
+ }
+
execCommandDatabase(opCtx, c, request, replyBuilder.get(), behaviors);
} catch (const DBException& ex) {
BSONObjBuilder metadataBob;
diff --git a/src/mongo/rpc/op_msg_integration_test.cpp b/src/mongo/rpc/op_msg_integration_test.cpp
index c94922ad55d..446e3beaaf2 100644
--- a/src/mongo/rpc/op_msg_integration_test.cpp
+++ b/src/mongo/rpc/op_msg_integration_test.cpp
@@ -44,6 +44,19 @@
namespace mongo {
+template <typename F>
+bool waitForIntVal(F&& f, int expectedValue) {
+ // Wait up to 10 seconds.
+ for (auto i = 0; i < 100; i++) {
+ int val = f();
+ if (expectedValue == val) {
+ return true;
+ }
+ sleepmillis(100);
+ }
+ return false;
+}
+
TEST(OpMsg, UnknownRequiredFlagClosesConnection) {
std::string errMsg;
auto conn = std::unique_ptr<DBClientBase>(
@@ -678,6 +691,254 @@ TEST(OpMsg, ServerRejectsExhaustIsMasterWithoutMaxAwaitTimeMS) {
ASSERT_NOT_OK(getStatusFromCommandResult(res));
}
+TEST(OpMsg, ServerStatusCorrectlyShowsExhaustIsMasterMetrics) {
+ std::string errMsg;
+ auto fixtureConn = std::unique_ptr<DBClientBase>(
+ unittest::getFixtureConnectionString().connect("integration_test", errMsg));
+ uassert(ErrorCodes::SocketException, errMsg, fixtureConn);
+ DBClientBase* conn = fixtureConn.get();
+
+ if (fixtureConn->isReplicaSetMember()) {
+ // Connect directly to the primary.
+ conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn();
+ ASSERT(conn);
+ }
+
+ // Wait for stale exhuast streams to finish closing before testing the exhaust isMaster metrics.
+ ASSERT(waitForIntVal(
+ [&] {
+ auto serverStatusCmd = BSON("serverStatus" << 1);
+ BSONObj serverStatusReply;
+ ASSERT(conn->runCommand("admin", serverStatusCmd, serverStatusReply));
+ return serverStatusReply["connections"]["exhaustIsMaster"].numberInt();
+ },
+ 0));
+
+ // Issue an isMaster command without a topology version.
+ auto isMasterCmd = BSON("isMaster" << 1);
+ auto opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd);
+ auto request = opMsgRequest.serialize();
+
+ Message reply;
+ ASSERT(conn->call(request, reply));
+ auto res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+ auto topologyVersion = res["topologyVersion"].Obj().getOwned();
+ ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+
+ isMasterCmd =
+ BSON("isMaster" << 1 << "topologyVersion" << topologyVersion << "maxAwaitTimeMS" << 100);
+ opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd);
+ request = opMsgRequest.serialize();
+ OpMsg::setFlag(&request, OpMsg::kExhaustSupported);
+
+ // Run isMaster command to initiate the exhaust stream.
+ ASSERT(conn->call(request, reply));
+ ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+
+ // Start a new connection to the server to check the serverStatus metrics.
+ std::string newErrMsg;
+ auto conn2 = std::unique_ptr<DBClientBase>(
+ unittest::getFixtureConnectionString().connect("integration_test", newErrMsg));
+ uassert(ErrorCodes::SocketException, newErrMsg, conn2);
+
+ auto serverStatusCmd = BSON("serverStatus" << 1);
+ BSONObj serverStatusReply;
+ ASSERT(conn2->runCommand("admin", serverStatusCmd, serverStatusReply));
+ ASSERT_EQUALS(1, serverStatusReply["connections"]["exhaustIsMaster"].numberInt());
+
+ // The exhaust stream would continue indefinitely.
+}
+
+TEST(OpMsg, ExhaustIsMasterMetricDecrementsOnNewOpAfterTerminatingExhaustStream) {
+ std::string errMsg;
+ auto fixtureConn = std::unique_ptr<DBClientBase>(
+ unittest::getFixtureConnectionString().connect("integration_test", errMsg));
+ uassert(ErrorCodes::SocketException, errMsg, fixtureConn);
+ DBClientBase* conn1 = fixtureConn.get();
+
+ if (fixtureConn->isReplicaSetMember()) {
+ // Connect directly to the primary.
+ conn1 = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn();
+ ASSERT(conn1);
+ }
+
+ // Wait for stale exhuast streams to finish closing before testing the exhaust isMaster metrics.
+ ASSERT(waitForIntVal(
+ [&] {
+ auto serverStatusCmd = BSON("serverStatus" << 1);
+ BSONObj serverStatusReply;
+ ASSERT(conn1->runCommand("admin", serverStatusCmd, serverStatusReply));
+ return serverStatusReply["connections"]["exhaustIsMaster"].numberInt();
+ },
+ 0));
+
+ // Issue an isMaster command without a topology version.
+ auto isMasterCmd = BSON("isMaster" << 1);
+ auto opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd);
+ auto request = opMsgRequest.serialize();
+
+ Message reply;
+ ASSERT(conn1->call(request, reply));
+ auto res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+ auto topologyVersion = res["topologyVersion"].Obj().getOwned();
+ ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+
+ isMasterCmd =
+ BSON("isMaster" << 1 << "topologyVersion" << topologyVersion << "maxAwaitTimeMS" << 100);
+ opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd);
+ request = opMsgRequest.serialize();
+ OpMsg::setFlag(&request, OpMsg::kExhaustSupported);
+
+ // Run isMaster command to initiate the exhaust stream.
+ ASSERT(conn1->call(request, reply));
+ auto lastRequestId = reply.header().getId();
+ ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+
+ // Start a new connection to the server to check the serverStatus metrics.
+ std::string newErrMsg;
+ auto conn2 = std::unique_ptr<DBClientBase>(
+ unittest::getFixtureConnectionString().connect("integration_test", newErrMsg));
+ uassert(ErrorCodes::SocketException, newErrMsg, conn2);
+
+ auto serverStatusCmd = BSON("serverStatus" << 1);
+ BSONObj serverStatusReply;
+ ASSERT(conn2->runCommand("admin", serverStatusCmd, serverStatusReply));
+ ASSERT_EQUALS(1, serverStatusReply["connections"]["exhaustIsMaster"].numberInt());
+
+ const auto failPointObj = BSON("configureFailPoint"
+ << "failCommand"
+ << "mode" << BSON("times" << 1) << "data"
+ << BSON("errorCode" << ErrorCodes::NotMaster << "failCommands"
+ << BSON_ARRAY("isMaster")));
+ auto response = conn2->runCommand(OpMsgRequest::fromDBAndBody("admin", failPointObj));
+ ASSERT_OK(getStatusFromCommandResult(response->getCommandReply()));
+
+ // Receive the next exhaust isMaster.
+ ASSERT_OK(conn1->recv(reply, lastRequestId));
+ ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ lastRequestId = reply.header().getId();
+ res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+
+ // Receive the failed exhaust isMaster. This should close the exhaust stream.
+ ASSERT_OK(conn1->recv(reply, lastRequestId));
+ res = OpMsg::parse(reply).body;
+ ASSERT_NOT_OK(getStatusFromCommandResult(res));
+
+ // Terminating the exhaust stream should not decrement the number of 'exhaustIsMaster'.
+ ASSERT(conn2->runCommand("admin", serverStatusCmd, serverStatusReply));
+ ASSERT_EQUALS(1, serverStatusReply["connections"]["exhaustIsMaster"].numberInt());
+
+ // 'exhaustIsMaster' should now decrement after calling serverStatus on the connection that used
+ // to have the exhaust stream.
+ ASSERT(conn1->runCommand("admin", serverStatusCmd, serverStatusReply));
+ ASSERT_EQUALS(0, serverStatusReply["connections"]["exhaustIsMaster"].numberInt());
+}
+
+TEST(OpMsg, ExhaustIsMasterMetricOnNewExhaustIsMasterAfterTerminatingExhaustStream) {
+ std::string errMsg;
+ auto fixtureConn = std::unique_ptr<DBClientBase>(
+ unittest::getFixtureConnectionString().connect("integration_test", errMsg));
+ uassert(ErrorCodes::SocketException, errMsg, fixtureConn);
+ DBClientBase* conn1 = fixtureConn.get();
+
+ if (fixtureConn->isReplicaSetMember()) {
+ // Connect directly to the primary.
+ conn1 = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn();
+ ASSERT(conn1);
+ }
+
+ // Wait for stale exhuast streams to finish closing before testing the exhaust isMaster metrics.
+ ASSERT(waitForIntVal(
+ [&] {
+ auto serverStatusCmd = BSON("serverStatus" << 1);
+ BSONObj serverStatusReply;
+ ASSERT(conn1->runCommand("admin", serverStatusCmd, serverStatusReply));
+ return serverStatusReply["connections"]["exhaustIsMaster"].numberInt();
+ },
+ 0));
+
+ // Issue an isMaster command without a topology version.
+ auto isMasterCmd = BSON("isMaster" << 1);
+ auto opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd);
+ auto request = opMsgRequest.serialize();
+
+ Message reply;
+ ASSERT(conn1->call(request, reply));
+ auto res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+ auto topologyVersion = res["topologyVersion"].Obj().getOwned();
+ ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+
+ isMasterCmd =
+ BSON("isMaster" << 1 << "topologyVersion" << topologyVersion << "maxAwaitTimeMS" << 100);
+ opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd);
+ request = opMsgRequest.serialize();
+ OpMsg::setFlag(&request, OpMsg::kExhaustSupported);
+
+ // Run isMaster command to initiate the exhaust stream.
+ ASSERT(conn1->call(request, reply));
+ auto lastRequestId = reply.header().getId();
+ ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+
+ // Start a new connection to the server to check the serverStatus metrics.
+ std::string newErrMsg;
+ auto conn2 = std::unique_ptr<DBClientBase>(
+ unittest::getFixtureConnectionString().connect("integration_test", newErrMsg));
+ uassert(ErrorCodes::SocketException, newErrMsg, conn2);
+
+ auto serverStatusCmd = BSON("serverStatus" << 1);
+ BSONObj serverStatusReply;
+ ASSERT(conn2->runCommand("admin", serverStatusCmd, serverStatusReply));
+ ASSERT_EQUALS(1, serverStatusReply["connections"]["exhaustIsMaster"].numberInt());
+
+ const auto failPointObj = BSON("configureFailPoint"
+ << "failCommand"
+ << "mode" << BSON("times" << 1) << "data"
+ << BSON("errorCode" << ErrorCodes::NotMaster << "failCommands"
+ << BSON_ARRAY("isMaster")));
+ auto response = conn2->runCommand(OpMsgRequest::fromDBAndBody("admin", failPointObj));
+ ASSERT_OK(getStatusFromCommandResult(response->getCommandReply()));
+
+ // Receive the next exhaust isMaster.
+ ASSERT_OK(conn1->recv(reply, lastRequestId));
+ ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ lastRequestId = reply.header().getId();
+ res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+
+ // Receive the failed exhaust isMaster. This should close the exhaust stream.
+ ASSERT_OK(conn1->recv(reply, lastRequestId));
+ res = OpMsg::parse(reply).body;
+ ASSERT_NOT_OK(getStatusFromCommandResult(res));
+
+ // Terminating the exhaust stream should not decrement the number of 'exhaustIsMaster'.
+ ASSERT(conn2->runCommand("admin", serverStatusCmd, serverStatusReply));
+ ASSERT_EQUALS(1, serverStatusReply["connections"]["exhaustIsMaster"].numberInt());
+
+ opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd);
+ request = opMsgRequest.serialize();
+ OpMsg::setFlag(&request, OpMsg::kExhaustSupported);
+
+ // Run isMaster command on conn1 to initiate a new exhaust stream.
+ ASSERT(conn1->call(request, reply));
+ ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome));
+ res = OpMsg::parse(reply).body;
+ ASSERT_OK(getStatusFromCommandResult(res));
+
+ // 'exhaustIsMaster' should not increment or decrement after initiating a new exhaust stream.
+ ASSERT(conn2->runCommand("admin", serverStatusCmd, serverStatusReply));
+ ASSERT_EQUALS(1, serverStatusReply["connections"]["exhaustIsMaster"].numberInt());
+}
+
TEST(OpMsg, ExhaustWithDBClientCursorBehavesCorrectly) {
// This test simply tries to verify that using the exhaust option with DBClientCursor works
// correctly. The externally visible behavior should technically be the same as a non-exhaust
diff --git a/src/mongo/s/commands/cluster_is_master_cmd.cpp b/src/mongo/s/commands/cluster_is_master_cmd.cpp
index 203f5e2e3df..084e9ef7488 100644
--- a/src/mongo/s/commands/cluster_is_master_cmd.cpp
+++ b/src/mongo/s/commands/cluster_is_master_cmd.cpp
@@ -42,6 +42,7 @@
#include "mongo/rpc/metadata/client_metadata.h"
#include "mongo/rpc/metadata/client_metadata_ismaster.h"
#include "mongo/rpc/topology_version_gen.h"
+#include "mongo/transport/ismaster_metrics.h"
#include "mongo/transport/message_compressor_manager.h"
#include "mongo/util/map_util.h"
#include "mongo/util/net/socket_utils.h"
@@ -49,7 +50,11 @@
namespace mongo {
+// Hangs in the beginning of each isMaster command when set.
MONGO_FAIL_POINT_DEFINE(waitInIsMaster);
+// Awaitable isMaster requests with the proper topologyVersions are expected to sleep for
+// maxAwaitTimeMS on mongos. This failpoint will hang right before doing this sleep when set.
+MONGO_FAIL_POINT_DEFINE(hangWhileWaitingForIsMasterResponse);
TopologyVersion mongosTopologyVersion;
@@ -152,6 +157,13 @@ public:
// The topologyVersion never changes on a running mongos process, so just sleep for
// maxAwaitTimeMS.
+ IsMasterMetrics::get(opCtx)->incrementNumAwaitingTopologyChanges();
+ ON_BLOCK_EXIT(
+ [&] { IsMasterMetrics::get(opCtx)->decrementNumAwaitingTopologyChanges(); });
+ if (MONGO_unlikely(hangWhileWaitingForIsMasterResponse.shouldFail())) {
+ LOGV2(31463, "hangWhileWaitingForIsMasterResponse failpoint enabled.");
+ hangWhileWaitingForIsMasterResponse.pauseWhileSet(opCtx);
+ }
opCtx->sleepFor(Milliseconds(maxAwaitTimeMS));
}
} else {
@@ -200,6 +212,9 @@ public:
maxAwaitTimeMSField);
invariant(clientTopologyVersion);
+ InExhaustIsMaster::get(opCtx->getClient()->session().get())
+ ->setInExhaustIsMaster(true /* inExhaustIsMaster */);
+
if (clientTopologyVersion->getProcessId() == mongosTopologyVersion.getProcessId() &&
clientTopologyVersion->getCounter() == mongosTopologyVersion.getCounter()) {
// Indicate that an exhaust message should be generated and the previous BSONObj
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 75045a461f7..3cb5015ff63 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -83,6 +83,8 @@
#include "mongo/s/shard_invalidated_for_targeting_exception.h"
#include "mongo/s/stale_exception.h"
#include "mongo/s/transaction_router.h"
+#include "mongo/transport/ismaster_metrics.h"
+#include "mongo/transport/session.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/scopeguard.h"
#include "mongo/util/str.h"
@@ -999,6 +1001,12 @@ DbResponse Strategy::clientCommand(OperationContext* opCtx, const Message& m) {
}();
opCtx->setExhaust(OpMsg::isFlagSet(m, OpMsg::kExhaustSupported));
+ const auto session = opCtx->getClient()->session();
+ if (session) {
+ if (!opCtx->isExhaust() || request.getCommandName() != "isMaster"_sd) {
+ InExhaustIsMaster::get(session.get())->setInExhaustIsMaster(false);
+ }
+ }
// Execute.
std::string db = request.getDatabase().toString();
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index 24e5f6f2ee5..95ebc3a1fbb 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -7,6 +7,7 @@ env = env.Clone()
env.Library(
target='transport_layer_common',
source=[
+ 'ismaster_metrics.cpp',
'service_entry_point_utils.cpp',
'session.cpp',
'transport_layer.cpp',
diff --git a/src/mongo/transport/ismaster_metrics.h b/src/mongo/transport/ismaster_metrics.h
new file mode 100644
index 00000000000..514b2088551
--- /dev/null
+++ b/src/mongo/transport/ismaster_metrics.h
@@ -0,0 +1,93 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/operation_context.h"
+#include "mongo/db/service_context.h"
+#include "mongo/platform/atomic_word.h"
+
+namespace mongo {
+
+/**
+ * A decoration on the Session object used to track exhaust isMaster metrics.
+ */
+class InExhaustIsMaster {
+public:
+ InExhaustIsMaster() = default;
+
+ InExhaustIsMaster(const InExhaustIsMaster&) = delete;
+ InExhaustIsMaster& operator=(const InExhaustIsMaster&) = delete;
+ InExhaustIsMaster(InExhaustIsMaster&&) = delete;
+ InExhaustIsMaster& operator=(InExhaustIsMaster&&) = delete;
+
+ static InExhaustIsMaster* get(transport::Session* session);
+ void setInExhaustIsMaster(bool inExhaustIsMaster);
+ bool getInExhaustIsMaster() const;
+ ~InExhaustIsMaster();
+
+private:
+ bool _inExhaustIsMaster = false;
+};
+
+/**
+ * Container for awaitable isMaster statistics.
+ */
+class IsMasterMetrics {
+ IsMasterMetrics(const IsMasterMetrics&) = delete;
+ IsMasterMetrics& operator=(const IsMasterMetrics&) = delete;
+ IsMasterMetrics(IsMasterMetrics&&) = delete;
+ IsMasterMetrics& operator=(IsMasterMetrics&&) = delete;
+
+public:
+ IsMasterMetrics() = default;
+
+ static IsMasterMetrics* get(ServiceContext* service);
+ static IsMasterMetrics* get(OperationContext* opCtx);
+
+ size_t getNumExhaustIsMaster() const;
+
+ size_t getNumAwaitingTopologyChanges() const;
+ void incrementNumAwaitingTopologyChanges();
+ void decrementNumAwaitingTopologyChanges();
+
+ void resetNumAwaitingTopologyChanges();
+ friend InExhaustIsMaster;
+
+private:
+ void incrementNumExhaustIsMaster();
+ void decrementNumExhaustIsMaster();
+
+ // The number of clients currently waiting in isMaster for a topology change.
+ AtomicWord<size_t> _connectionsAwaitingTopologyChanges{0};
+ // The number of connections whose last request was an isMaster with exhaustAllowed.
+ AtomicWord<size_t> _exhaustIsMasterConnections{0};
+};
+
+} // namespace mongo
diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp
index b219240e6fa..22edb0cb03f 100644
--- a/src/mongo/transport/service_entry_point_impl.cpp
+++ b/src/mongo/transport/service_entry_point_impl.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/auth/restriction_environment.h"
#include "mongo/db/service_context.h"
#include "mongo/logv2/log.h"
+#include "mongo/transport/ismaster_metrics.h"
#include "mongo/transport/service_state_machine.h"
#include "mongo/transport/session.h"
#include "mongo/util/processinfo.h"
@@ -271,6 +272,10 @@ void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const {
bob->append("totalCreated", static_cast<int>(_createdConnections.load()));
if (auto sc = getGlobalServiceContext()) {
bob->append("active", static_cast<int>(sc->getActiveClientOperations()));
+ bob->append("exhaustIsMaster",
+ static_cast<int>(IsMasterMetrics::get(sc)->getNumExhaustIsMaster()));
+ bob->append("awaitingTopologyChanges",
+ static_cast<int>(IsMasterMetrics::get(sc)->getNumAwaitingTopologyChanges()));
}
if (_adminInternalPool) {