diff options
author | Jason Chan <jason.chan@mongodb.com> | 2020-02-20 19:53:40 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2020-02-20 19:53:40 +0000 |
commit | b4915c29d4848439e23857c45fd3fcf94622b015 (patch) | |
tree | d003c8310d71c41947b1c28224b68402f1456c59 /src/mongo | |
parent | 5342122069c8dacc798955b561c504556dab01e1 (diff) | |
download | mongo-b4915c29d4848439e23857c45fd3fcf94622b015.tar.gz |
SERVER-44522 serverStatus metrics for awaitable isMaster
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_info.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 9 | ||||
-rw-r--r-- | src/mongo/rpc/op_msg_integration_test.cpp | 218 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_is_master_cmd.cpp | 16 | ||||
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 8 | ||||
-rw-r--r-- | src/mongo/transport/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/transport/ismaster_metrics.cpp | 98 | ||||
-rw-r--r-- | src/mongo/transport/ismaster_metrics.h | 93 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_impl.cpp | 5 |
10 files changed, 481 insertions, 2 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index c3cfc7b646f..a85ba3a627f 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -93,6 +93,7 @@ #include "mongo/platform/mutex.h" #include "mongo/rpc/metadata/oplog_query_metadata.h" #include "mongo/rpc/metadata/repl_set_metadata.h" +#include "mongo/transport/ismaster_metrics.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point.h" #include "mongo/util/log.h" @@ -110,7 +111,10 @@ MONGO_FAIL_POINT_DEFINE(stepdownHangBeforeRSTLEnqueue); // Fail setMaintenanceMode with ErrorCodes::NotSecondary to simulate a concurrent election. MONGO_FAIL_POINT_DEFINE(setMaintenanceModeFailsWithNotSecondary); MONGO_FAIL_POINT_DEFINE(forceSyncSourceRetryWaitForInitialSync); +// Signals that an isMaster request has started waiting. MONGO_FAIL_POINT_DEFINE(waitForIsMasterResponse); +// Will cause an isMaster request to hang as it starts waiting. +MONGO_FAIL_POINT_DEFINE(hangWhileWaitingForIsMasterResponse); // Number of times we tried to go live as a secondary. Counter64 attemptsToBecomeSecondary; @@ -2059,10 +2063,16 @@ std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorImpl::awaitIsMaste const TopologyVersion topologyVersion = _topCoord->getTopologyVersion(); lk.unlock(); + IsMasterMetrics::get(opCtx)->incrementNumAwaitingTopologyChanges(); + if (MONGO_unlikely(waitForIsMasterResponse.shouldFail())) { // Used in tests that wait for this failpoint to be entered before triggering a topology // change. - LOGV2(21341, "waitForIsMasterResponse failpoint enabled."); + LOGV2(31464, "waitForIsMasterResponse failpoint enabled."); + } + if (MONGO_unlikely(hangWhileWaitingForIsMasterResponse.shouldFail())) { + LOGV2(21341, "Hanging due to hangWhileWaitingForIsMasterResponse failpoint."); + hangWhileWaitingForIsMasterResponse.pauseWhileSet(opCtx); } // Wait for a topology change with timeout set to deadline. @@ -2080,6 +2090,7 @@ std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorImpl::awaitIsMaste // Return an IsMasterResponse with the current topology version on timeout when waiting for // a topology change. stdx::lock_guard lk(_mutex); + IsMasterMetrics::get(opCtx)->decrementNumAwaitingTopologyChanges(); return _makeIsMasterResponse(horizonString, lk); } @@ -3238,6 +3249,8 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator(WithLock l ON_BLOCK_EXIT([&] { if (_rsConfig.isInitialized()) { _fulfillTopologyChangePromise(opCtx, lk); + // Use the global ServiceContext here in case the current opCtx is null. + IsMasterMetrics::get(getGlobalServiceContext())->resetNumAwaitingTopologyChanges(); } }); @@ -3733,6 +3746,7 @@ ReplicationCoordinatorImpl::_setCurrentRSConfig(WithLock lk, for (auto iter = _horizonToPromiseMap.begin(); iter != _horizonToPromiseMap.end(); iter++) { iter->second->setError({ErrorCodes::SplitHorizonChange, "Received a reconfig that changed the horizon parameters."}); + IsMasterMetrics::get(opCtx)->resetNumAwaitingTopologyChanges(); } if (_selfIndex >= 0) { // Only create a new horizon promise mapping if the node exists in the new config. diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp index 6d65ffb2bd4..ac7acfd2f03 100644 --- a/src/mongo/db/repl/replication_info.cpp +++ b/src/mongo/db/repl/replication_info.cpp @@ -60,8 +60,10 @@ #include "mongo/db/storage/storage_options.h" #include "mongo/db/wire_version.h" #include "mongo/executor/network_interface.h" +#include "mongo/logv2/log.h" #include "mongo/rpc/metadata/client_metadata.h" #include "mongo/rpc/metadata/client_metadata_ismaster.h" +#include "mongo/transport/ismaster_metrics.h" #include "mongo/util/decimal_counter.h" #include "mongo/util/fail_point.h" #include "mongo/util/log.h" @@ -69,7 +71,11 @@ namespace mongo { +// Hangs in the beginning of each isMaster command when set. MONGO_FAIL_POINT_DEFINE(waitInIsMaster); +// Awaitable isMaster requests with the proper topologyVersions will sleep for maxAwaitTimeMS on +// standalones. This failpoint will hang right before doing this sleep when set. +MONGO_FAIL_POINT_DEFINE(hangWaitingForIsMasterResponseOnStandalone); using std::list; using std::string; @@ -106,7 +112,6 @@ TopologyVersion appendReplicationInfo(OperationContext* opCtx, invariant(isMasterResponse->getTopologyVersion()); return isMasterResponse->getTopologyVersion().get(); } - auto currentTopologyVersion = replCoord->getTopologyVersion(); if (clientTopologyVersion && @@ -121,6 +126,15 @@ TopologyVersion appendReplicationInfo(OperationContext* opCtx, // The topologyVersion never changes on a running standalone process, so just sleep for // maxAwaitTimeMS. invariant(maxAwaitTimeMS); + + IsMasterMetrics::get(opCtx)->incrementNumAwaitingTopologyChanges(); + ON_BLOCK_EXIT([&] { IsMasterMetrics::get(opCtx)->decrementNumAwaitingTopologyChanges(); }); + if (MONGO_unlikely(hangWaitingForIsMasterResponseOnStandalone.shouldFail())) { + // Used in tests that wait for this failpoint to be entered to guarantee that the + // request is waiting and metrics have been updated. + LOGV2(31462, "Hanging due to hangWaitingForIsMasterResponseOnStandalone failpoint."); + hangWaitingForIsMasterResponseOnStandalone.pauseWhileSet(opCtx); + } opCtx->sleepFor(Milliseconds(*maxAwaitTimeMS)); } @@ -499,6 +513,9 @@ public: maxAwaitTimeMSField); invariant(clientTopologyVersion); + InExhaustIsMaster::get(opCtx->getClient()->session().get()) + ->setInExhaustIsMaster(true /* inExhaustIsMaster */); + if (clientTopologyVersion->getProcessId() == currentTopologyVersion.getProcessId() && clientTopologyVersion->getCounter() == currentTopologyVersion.getCounter()) { // Indicate that an exhaust message should be generated and the previous BSONObj diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 30fe2096622..2851dd05282 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -94,6 +94,8 @@ #include "mongo/rpc/metadata/tracking_metadata.h" #include "mongo/rpc/op_msg.h" #include "mongo/rpc/reply_builder_interface.h" +#include "mongo/transport/ismaster_metrics.h" +#include "mongo/transport/session.h" #include "mongo/util/fail_point.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" @@ -1280,6 +1282,13 @@ DbResponse receivedCommands(OperationContext* opCtx, opCtx->setExhaust(OpMsg::isFlagSet(message, OpMsg::kExhaustSupported)); + const auto session = opCtx->getClient()->session(); + if (session) { + if (!opCtx->isExhaust() || c->getName() != "isMaster"_sd) { + InExhaustIsMaster::get(session.get())->setInExhaustIsMaster(false); + } + } + execCommandDatabase(opCtx, c, request, replyBuilder.get(), behaviors); } catch (const DBException& ex) { BSONObjBuilder metadataBob; diff --git a/src/mongo/rpc/op_msg_integration_test.cpp b/src/mongo/rpc/op_msg_integration_test.cpp index 68dc5fa259b..e8e2cb09275 100644 --- a/src/mongo/rpc/op_msg_integration_test.cpp +++ b/src/mongo/rpc/op_msg_integration_test.cpp @@ -679,6 +679,224 @@ TEST(OpMsg, ServerRejectsExhaustIsMasterWithoutMaxAwaitTimeMS) { ASSERT_NOT_OK(getStatusFromCommandResult(res)); } +TEST(OpMsg, ServerStatusCorrectlyShowsExhaustIsMasterMetrics) { + std::string errMsg; + auto fixtureConn = std::unique_ptr<DBClientBase>( + unittest::getFixtureConnectionString().connect("integration_test", errMsg)); + uassert(ErrorCodes::SocketException, errMsg, fixtureConn); + DBClientBase* conn = fixtureConn.get(); + + if (fixtureConn->isReplicaSetMember()) { + // Connect directly to the primary. + conn = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn(); + ASSERT(conn); + } + + // Issue an isMaster command without a topology version. + auto isMasterCmd = BSON("isMaster" << 1); + auto opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd); + auto request = opMsgRequest.serialize(); + + Message reply; + ASSERT(conn->call(request, reply)); + auto res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + auto topologyVersion = res["topologyVersion"].Obj().getOwned(); + ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + + isMasterCmd = + BSON("isMaster" << 1 << "topologyVersion" << topologyVersion << "maxAwaitTimeMS" << 100); + opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd); + request = opMsgRequest.serialize(); + OpMsg::setFlag(&request, OpMsg::kExhaustSupported); + + // Run isMaster command to initiate the exhaust stream. + ASSERT(conn->call(request, reply)); + ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + + // Start a new connection to the server to check the serverStatus metrics. + std::string newErrMsg; + auto conn2 = std::unique_ptr<DBClientBase>( + unittest::getFixtureConnectionString().connect("integration_test", newErrMsg)); + uassert(ErrorCodes::SocketException, newErrMsg, conn2); + + auto serverStatusCmd = BSON("serverStatus" << 1); + BSONObj serverStatusReply; + ASSERT(conn2->runCommand("admin", serverStatusCmd, serverStatusReply)); + ASSERT_EQUALS(1, serverStatusReply["connections"]["exhaustIsMaster"].numberInt()); + + // The exhaust stream would continue indefinitely. +} + +TEST(OpMsg, ExhaustIsMasterMetricDecrementsOnNewOpAfterTerminatingExhaustStream) { + std::string errMsg; + auto fixtureConn = std::unique_ptr<DBClientBase>( + unittest::getFixtureConnectionString().connect("integration_test", errMsg)); + uassert(ErrorCodes::SocketException, errMsg, fixtureConn); + DBClientBase* conn1 = fixtureConn.get(); + + if (fixtureConn->isReplicaSetMember()) { + // Connect directly to the primary. + conn1 = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn(); + ASSERT(conn1); + } + + // Issue an isMaster command without a topology version. + auto isMasterCmd = BSON("isMaster" << 1); + auto opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd); + auto request = opMsgRequest.serialize(); + + Message reply; + ASSERT(conn1->call(request, reply)); + auto res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + auto topologyVersion = res["topologyVersion"].Obj().getOwned(); + ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + + isMasterCmd = + BSON("isMaster" << 1 << "topologyVersion" << topologyVersion << "maxAwaitTimeMS" << 100); + opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd); + request = opMsgRequest.serialize(); + OpMsg::setFlag(&request, OpMsg::kExhaustSupported); + + // Run isMaster command to initiate the exhaust stream. + ASSERT(conn1->call(request, reply)); + auto lastRequestId = reply.header().getId(); + ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + + // Start a new connection to the server to check the serverStatus metrics. + std::string newErrMsg; + auto conn2 = std::unique_ptr<DBClientBase>( + unittest::getFixtureConnectionString().connect("integration_test", newErrMsg)); + uassert(ErrorCodes::SocketException, newErrMsg, conn2); + + auto serverStatusCmd = BSON("serverStatus" << 1); + BSONObj serverStatusReply; + ASSERT(conn2->runCommand("admin", serverStatusCmd, serverStatusReply)); + ASSERT_EQUALS(1, serverStatusReply["connections"]["exhaustIsMaster"].numberInt()); + + const auto failPointObj = BSON("configureFailPoint" + << "failCommand" + << "mode" << BSON("times" << 1) << "data" + << BSON("errorCode" << ErrorCodes::NotMaster << "failCommands" + << BSON_ARRAY("isMaster"))); + auto response = conn2->runCommand(OpMsgRequest::fromDBAndBody("admin", failPointObj)); + ASSERT_OK(getStatusFromCommandResult(response->getCommandReply())); + + // Receive the next exhaust isMaster. + ASSERT_OK(conn1->recv(reply, lastRequestId)); + ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + lastRequestId = reply.header().getId(); + res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + + // Receive the failed exhaust isMaster. This should close the exhaust stream. + ASSERT_OK(conn1->recv(reply, lastRequestId)); + res = OpMsg::parse(reply).body; + ASSERT_NOT_OK(getStatusFromCommandResult(res)); + + // Terminating the exhaust stream should not decrement the number of 'exhaustIsMaster'. + ASSERT(conn2->runCommand("admin", serverStatusCmd, serverStatusReply)); + ASSERT_EQUALS(1, serverStatusReply["connections"]["exhaustIsMaster"].numberInt()); + + // 'exhaustIsMaster' should now decrement after calling serverStatus on the connection that used + // to have the exhaust stream. + ASSERT(conn1->runCommand("admin", serverStatusCmd, serverStatusReply)); + ASSERT_EQUALS(0, serverStatusReply["connections"]["exhaustIsMaster"].numberInt()); +} + +TEST(OpMsg, ExhaustIsMasterMetricOnNewExhaustIsMasterAfterTerminatingExhaustStream) { + std::string errMsg; + auto fixtureConn = std::unique_ptr<DBClientBase>( + unittest::getFixtureConnectionString().connect("integration_test", errMsg)); + uassert(ErrorCodes::SocketException, errMsg, fixtureConn); + DBClientBase* conn1 = fixtureConn.get(); + + if (fixtureConn->isReplicaSetMember()) { + // Connect directly to the primary. + conn1 = &static_cast<DBClientReplicaSet*>(fixtureConn.get())->masterConn(); + ASSERT(conn1); + } + + // Issue an isMaster command without a topology version. + auto isMasterCmd = BSON("isMaster" << 1); + auto opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd); + auto request = opMsgRequest.serialize(); + + Message reply; + ASSERT(conn1->call(request, reply)); + auto res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + auto topologyVersion = res["topologyVersion"].Obj().getOwned(); + ASSERT(!OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + + isMasterCmd = + BSON("isMaster" << 1 << "topologyVersion" << topologyVersion << "maxAwaitTimeMS" << 100); + opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd); + request = opMsgRequest.serialize(); + OpMsg::setFlag(&request, OpMsg::kExhaustSupported); + + // Run isMaster command to initiate the exhaust stream. + ASSERT(conn1->call(request, reply)); + auto lastRequestId = reply.header().getId(); + ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + + // Start a new connection to the server to check the serverStatus metrics. + std::string newErrMsg; + auto conn2 = std::unique_ptr<DBClientBase>( + unittest::getFixtureConnectionString().connect("integration_test", newErrMsg)); + uassert(ErrorCodes::SocketException, newErrMsg, conn2); + + auto serverStatusCmd = BSON("serverStatus" << 1); + BSONObj serverStatusReply; + ASSERT(conn2->runCommand("admin", serverStatusCmd, serverStatusReply)); + ASSERT_EQUALS(1, serverStatusReply["connections"]["exhaustIsMaster"].numberInt()); + + const auto failPointObj = BSON("configureFailPoint" + << "failCommand" + << "mode" << BSON("times" << 1) << "data" + << BSON("errorCode" << ErrorCodes::NotMaster << "failCommands" + << BSON_ARRAY("isMaster"))); + auto response = conn2->runCommand(OpMsgRequest::fromDBAndBody("admin", failPointObj)); + ASSERT_OK(getStatusFromCommandResult(response->getCommandReply())); + + // Receive the next exhaust isMaster. + ASSERT_OK(conn1->recv(reply, lastRequestId)); + ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + lastRequestId = reply.header().getId(); + res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + + // Receive the failed exhaust isMaster. This should close the exhaust stream. + ASSERT_OK(conn1->recv(reply, lastRequestId)); + res = OpMsg::parse(reply).body; + ASSERT_NOT_OK(getStatusFromCommandResult(res)); + + // Terminating the exhaust stream should not decrement the number of 'exhaustIsMaster'. + ASSERT(conn2->runCommand("admin", serverStatusCmd, serverStatusReply)); + ASSERT_EQUALS(1, serverStatusReply["connections"]["exhaustIsMaster"].numberInt()); + + opMsgRequest = OpMsgRequest::fromDBAndBody("admin", isMasterCmd); + request = opMsgRequest.serialize(); + OpMsg::setFlag(&request, OpMsg::kExhaustSupported); + + // Run isMaster command on conn1 to initiate a new exhaust stream. + ASSERT(conn1->call(request, reply)); + ASSERT(OpMsg::isFlagSet(reply, OpMsg::kMoreToCome)); + res = OpMsg::parse(reply).body; + ASSERT_OK(getStatusFromCommandResult(res)); + + // 'exhaustIsMaster' should not increment or decrement after initiating a new exhaust stream. + ASSERT(conn2->runCommand("admin", serverStatusCmd, serverStatusReply)); + ASSERT_EQUALS(1, serverStatusReply["connections"]["exhaustIsMaster"].numberInt()); +} + TEST(OpMsg, ExhaustWithDBClientCursorBehavesCorrectly) { // This test simply tries to verify that using the exhaust option with DBClientCursor works // correctly. The externally visible behavior should technically be the same as a non-exhaust diff --git a/src/mongo/s/commands/cluster_is_master_cmd.cpp b/src/mongo/s/commands/cluster_is_master_cmd.cpp index 7432b5b6e33..59002801d61 100644 --- a/src/mongo/s/commands/cluster_is_master_cmd.cpp +++ b/src/mongo/s/commands/cluster_is_master_cmd.cpp @@ -38,9 +38,11 @@ #include "mongo/db/operation_context.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/wire_version.h" +#include "mongo/logv2/log.h" #include "mongo/rpc/metadata/client_metadata.h" #include "mongo/rpc/metadata/client_metadata_ismaster.h" #include "mongo/rpc/topology_version_gen.h" +#include "mongo/transport/ismaster_metrics.h" #include "mongo/transport/message_compressor_manager.h" #include "mongo/util/log.h" #include "mongo/util/map_util.h" @@ -49,7 +51,11 @@ namespace mongo { +// Hangs in the beginning of each isMaster command when set. MONGO_FAIL_POINT_DEFINE(waitInIsMaster); +// Awaitable isMaster requests with the proper topologyVersions are expected to sleep for +// maxAwaitTimeMS on mongos. This failpoint will hang right before doing this sleep when set. +MONGO_FAIL_POINT_DEFINE(hangWhileWaitingForIsMasterResponse); TopologyVersion mongosTopologyVersion; @@ -152,6 +158,13 @@ public: // The topologyVersion never changes on a running mongos process, so just sleep for // maxAwaitTimeMS. + IsMasterMetrics::get(opCtx)->incrementNumAwaitingTopologyChanges(); + ON_BLOCK_EXIT( + [&] { IsMasterMetrics::get(opCtx)->decrementNumAwaitingTopologyChanges(); }); + if (MONGO_unlikely(hangWhileWaitingForIsMasterResponse.shouldFail())) { + LOGV2(31463, "hangWhileWaitingForIsMasterResponse failpoint enabled."); + hangWhileWaitingForIsMasterResponse.pauseWhileSet(opCtx); + } opCtx->sleepFor(Milliseconds(maxAwaitTimeMS)); } } else { @@ -200,6 +213,9 @@ public: maxAwaitTimeMSField); invariant(clientTopologyVersion); + InExhaustIsMaster::get(opCtx->getClient()->session().get()) + ->setInExhaustIsMaster(true /* inExhaustIsMaster */); + if (clientTopologyVersion->getProcessId() == mongosTopologyVersion.getProcessId() && clientTopologyVersion->getCounter() == mongosTopologyVersion.getCounter()) { // Indicate that an exhaust message should be generated and the previous BSONObj diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 990ae16a972..82026c4f1b4 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -83,6 +83,8 @@ #include "mongo/s/shard_invalidated_for_targeting_exception.h" #include "mongo/s/stale_exception.h" #include "mongo/s/transaction_router.h" +#include "mongo/transport/ismaster_metrics.h" +#include "mongo/transport/session.h" #include "mongo/util/fail_point.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" @@ -1000,6 +1002,12 @@ DbResponse Strategy::clientCommand(OperationContext* opCtx, const Message& m) { }(); opCtx->setExhaust(OpMsg::isFlagSet(m, OpMsg::kExhaustSupported)); + const auto session = opCtx->getClient()->session(); + if (session) { + if (!opCtx->isExhaust() || request.getCommandName() != "isMaster"_sd) { + InExhaustIsMaster::get(session.get())->setInExhaustIsMaster(false); + } + } // Execute. std::string db = request.getDatabase().toString(); diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 24e5f6f2ee5..95ebc3a1fbb 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -7,6 +7,7 @@ env = env.Clone() env.Library( target='transport_layer_common', source=[ + 'ismaster_metrics.cpp', 'service_entry_point_utils.cpp', 'session.cpp', 'transport_layer.cpp', diff --git a/src/mongo/transport/ismaster_metrics.cpp b/src/mongo/transport/ismaster_metrics.cpp new file mode 100644 index 00000000000..3179f28d2fb --- /dev/null +++ b/src/mongo/transport/ismaster_metrics.cpp @@ -0,0 +1,98 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork +#include "mongo/transport/ismaster_metrics.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace { +const auto IsMasterMetricsDecoration = ServiceContext::declareDecoration<IsMasterMetrics>(); +const auto InExhaustIsMasterDecoration = transport::Session::declareDecoration<InExhaustIsMaster>(); +} // namespace + +IsMasterMetrics* IsMasterMetrics::get(ServiceContext* service) { + return &IsMasterMetricsDecoration(service); +} + +IsMasterMetrics* IsMasterMetrics::get(OperationContext* opCtx) { + return get(opCtx->getServiceContext()); +} + +size_t IsMasterMetrics::getNumExhaustIsMaster() const { + return _exhaustIsMasterConnections.load(); +} + +void IsMasterMetrics::incrementNumExhaustIsMaster() { + _exhaustIsMasterConnections.fetchAndAdd(1); +} + +void IsMasterMetrics::decrementNumExhaustIsMaster() { + _exhaustIsMasterConnections.fetchAndSubtract(1); +} + +size_t IsMasterMetrics::getNumAwaitingTopologyChanges() const { + return _connectionsAwaitingTopologyChanges.load(); +} + +void IsMasterMetrics::incrementNumAwaitingTopologyChanges() { + _connectionsAwaitingTopologyChanges.fetchAndAdd(1); +} + +void IsMasterMetrics::decrementNumAwaitingTopologyChanges() { + _connectionsAwaitingTopologyChanges.fetchAndSubtract(1); +} + +void IsMasterMetrics::resetNumAwaitingTopologyChanges() { + _connectionsAwaitingTopologyChanges.store(0); +} + +InExhaustIsMaster* InExhaustIsMaster::get(transport::Session* session) { + return &InExhaustIsMasterDecoration(session); +} + +InExhaustIsMaster::~InExhaustIsMaster() { + if (_inExhaustIsMaster) { + IsMasterMetrics::get(getGlobalServiceContext())->decrementNumExhaustIsMaster(); + } +} + +bool InExhaustIsMaster::getInExhaustIsMaster() const { + return _inExhaustIsMaster; +} + +void InExhaustIsMaster::setInExhaustIsMaster(bool inExhaustIsMaster) { + if (!_inExhaustIsMaster && inExhaustIsMaster) { + IsMasterMetrics::get(getGlobalServiceContext())->incrementNumExhaustIsMaster(); + } else if (_inExhaustIsMaster && !inExhaustIsMaster) { + IsMasterMetrics::get(getGlobalServiceContext())->decrementNumExhaustIsMaster(); + } + _inExhaustIsMaster = inExhaustIsMaster; +} + +} // namespace mongo
\ No newline at end of file diff --git a/src/mongo/transport/ismaster_metrics.h b/src/mongo/transport/ismaster_metrics.h new file mode 100644 index 00000000000..514b2088551 --- /dev/null +++ b/src/mongo/transport/ismaster_metrics.h @@ -0,0 +1,93 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/operation_context.h" +#include "mongo/db/service_context.h" +#include "mongo/platform/atomic_word.h" + +namespace mongo { + +/** + * A decoration on the Session object used to track exhaust isMaster metrics. + */ +class InExhaustIsMaster { +public: + InExhaustIsMaster() = default; + + InExhaustIsMaster(const InExhaustIsMaster&) = delete; + InExhaustIsMaster& operator=(const InExhaustIsMaster&) = delete; + InExhaustIsMaster(InExhaustIsMaster&&) = delete; + InExhaustIsMaster& operator=(InExhaustIsMaster&&) = delete; + + static InExhaustIsMaster* get(transport::Session* session); + void setInExhaustIsMaster(bool inExhaustIsMaster); + bool getInExhaustIsMaster() const; + ~InExhaustIsMaster(); + +private: + bool _inExhaustIsMaster = false; +}; + +/** + * Container for awaitable isMaster statistics. + */ +class IsMasterMetrics { + IsMasterMetrics(const IsMasterMetrics&) = delete; + IsMasterMetrics& operator=(const IsMasterMetrics&) = delete; + IsMasterMetrics(IsMasterMetrics&&) = delete; + IsMasterMetrics& operator=(IsMasterMetrics&&) = delete; + +public: + IsMasterMetrics() = default; + + static IsMasterMetrics* get(ServiceContext* service); + static IsMasterMetrics* get(OperationContext* opCtx); + + size_t getNumExhaustIsMaster() const; + + size_t getNumAwaitingTopologyChanges() const; + void incrementNumAwaitingTopologyChanges(); + void decrementNumAwaitingTopologyChanges(); + + void resetNumAwaitingTopologyChanges(); + friend InExhaustIsMaster; + +private: + void incrementNumExhaustIsMaster(); + void decrementNumExhaustIsMaster(); + + // The number of clients currently waiting in isMaster for a topology change. + AtomicWord<size_t> _connectionsAwaitingTopologyChanges{0}; + // The number of connections whose last request was an isMaster with exhaustAllowed. + AtomicWord<size_t> _exhaustIsMasterConnections{0}; +}; + +} // namespace mongo diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp index 9381e1e7b95..b6379ca73bf 100644 --- a/src/mongo/transport/service_entry_point_impl.cpp +++ b/src/mongo/transport/service_entry_point_impl.cpp @@ -38,6 +38,7 @@ #include "mongo/db/auth/restriction_environment.h" #include "mongo/db/service_context.h" #include "mongo/logv2/log.h" +#include "mongo/transport/ismaster_metrics.h" #include "mongo/transport/service_state_machine.h" #include "mongo/transport/session.h" #include "mongo/util/log.h" @@ -272,6 +273,10 @@ void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const { bob->append("totalCreated", static_cast<int>(_createdConnections.load())); if (auto sc = getGlobalServiceContext()) { bob->append("active", static_cast<int>(sc->getActiveClientOperations())); + bob->append("exhaustIsMaster", + static_cast<int>(IsMasterMetrics::get(sc)->getNumExhaustIsMaster())); + bob->append("awaitingTopologyChanges", + static_cast<int>(IsMasterMetrics::get(sc)->getNumAwaitingTopologyChanges())); } if (_adminInternalPool) { |