summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Chan <jason.chan@mongodb.com>2022-09-26 18:42:52 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-26 19:41:03 +0000
commit352089310fdc9c71873bb8f7a06f9b23e59ecc7f (patch)
treec3275f1ac878583172904e6f9c6baca547b72dee
parenteb7433209268fcb25ee3fdb74b8f7b2f57cf213f (diff)
downloadmongo-352089310fdc9c71873bb8f7a06f9b23e59ecc7f.tar.gz
SERVER-64967 Measure how long it takes operations using egress connections to write to network
-rw-r--r--jstests/sharding/egress_connection_acquisition_to_wire_metrics.js73
-rw-r--r--src/mongo/client/SConscript2
-rw-r--r--src/mongo/client/async_client.cpp55
-rw-r--r--src/mongo/client/async_client.h18
-rw-r--r--src/mongo/client/async_client.idl42
-rw-r--r--src/mongo/executor/connection_metrics.h12
-rw-r--r--src/mongo/executor/connection_pool_tl.cpp9
-rw-r--r--src/mongo/executor/connection_pool_tl.h2
-rw-r--r--src/mongo/executor/network_interface_tl.cpp7
9 files changed, 206 insertions, 14 deletions
diff --git a/jstests/sharding/egress_connection_acquisition_to_wire_metrics.js b/jstests/sharding/egress_connection_acquisition_to_wire_metrics.js
new file mode 100644
index 00000000000..831305ee000
--- /dev/null
+++ b/jstests/sharding/egress_connection_acquisition_to_wire_metrics.js
@@ -0,0 +1,73 @@
+/**
+ * Tests that we are able to log the metrics corresponding to the time it takes from egress
+ * connection acquisition to writing to the wire.
+ *
+ * @tags: [requires_fcv_62, featureFlagConnHealthMetrics]
+ */
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/log.js");
+load("jstests/libs/parallel_shell_helpers.js");
+
+function getConnAcquiredToWireMicros(conn) {
+ return conn.adminCommand({serverStatus: 1})
+ .metrics.network.totalTimeForEgressConnectionAcquiredToWireMicros;
+}
+
+// Set it so that we log the intended metrics only on the mongos.
+const paramsDoc = {
+ mongosOptions: {setParameter: {connectionAcquisitionToWireLoggingRate: 1.0}},
+ shardOptions: {setParameter: {connectionAcquisitionToWireLoggingRate: 0.0}},
+ configOptions: {setParameter: {connectionAcquisitionToWireLoggingRate: 0.0}},
+};
+const st = new ShardingTest({shards: 1, mongos: 1, other: paramsDoc});
+let initialConnAcquiredToWireTime = getConnAcquiredToWireMicros(st.s);
+jsTestLog(`Initial metric value for mongos totalTimeForEgressConnectionAcquiredToWireMicros: ${
+ tojson(initialConnAcquiredToWireTime)}`);
+assert.commandWorked(st.s.adminCommand({clearLog: 'global'}));
+
+// The RSM will periodically acquire egress connections to ping the shard and config server nodes,
+// but we do an insert to speed up the wait and to be more explicit.
+assert.commandWorked(st.s.getDB(jsTestName())["test"].insert({x: 1}));
+checkLog.containsJson(st.s, 6496702);
+let afterConnAcquiredToWireTime = getConnAcquiredToWireMicros(st.s);
+jsTestLog(`End metric value for mongos totalTimeForEgressConnectionAcquiredToWireMicros: ${
+ tojson(afterConnAcquiredToWireTime)}`);
+assert.gt(afterConnAcquiredToWireTime,
+ initialConnAcquiredToWireTime,
+ st.s.adminCommand({serverStatus: 1}));
+
+// Test that setting the logging rate to 0 results in silencing of the logs.
+st.s.adminCommand({setParameter: 1, connectionAcquisitionToWireLoggingRate: 0.0});
+assert.commandWorked(st.s.adminCommand({clearLog: 'global'}));
+assert.commandWorked(st.s.getDB(jsTestName())["test"].insert({x: 2}));
+try {
+ checkLog.containsJson(st.s, 6496702, null, 5 * 1000);
+ assert(false);
+} catch (e) {
+ jsTestLog("Waited long enough to believe logs were correctly silenced.");
+}
+
+// Test with mirrored reads to execute the 'fireAndForget' path and verify logs are still correctly
+// printed.
+const shardPrimary = st.rs0.getPrimary();
+assert.commandWorked(
+ shardPrimary.adminCommand({setParameter: 1, connectionAcquisitionToWireLoggingRate: 1.0}));
+assert.commandWorked(shardPrimary.adminCommand({clearLog: 'global'}));
+initialConnAcquiredToWireTime = getConnAcquiredToWireMicros(shardPrimary);
+jsTestLog(`Initial metric value for mongod totalTimeForEgressConnectionAcquiredToWireMicros: ${
+ tojson(initialConnAcquiredToWireTime)}`);
+assert.commandWorked(
+ shardPrimary.adminCommand({setParameter: 1, mirrorReads: {samplingRate: 1.0}}));
+shardPrimary.getDB(jsTestName()).runCommand({find: "test", filter: {}});
+checkLog.containsJson(shardPrimary, 6496702);
+afterConnAcquiredToWireTime = getConnAcquiredToWireMicros(shardPrimary);
+jsTestLog(`End metric value for mongod totalTimeForEgressConnectionAcquiredToWireMicros: ${
+ tojson(afterConnAcquiredToWireTime)}`);
+assert.gt(afterConnAcquiredToWireTime,
+ initialConnAcquiredToWireTime,
+ shardPrimary.adminCommand({serverStatus: 1}));
+st.stop();
+})();
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript
index 296e0ec8b8f..1fafe772d16 100644
--- a/src/mongo/client/SConscript
+++ b/src/mongo/client/SConscript
@@ -254,6 +254,7 @@ env.Library(
target='async_client',
source=[
'async_client.cpp',
+ env.Idlc('async_client.idl')[0],
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/wire_version',
@@ -265,6 +266,7 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/commands/test_commands_enabled',
+ '$BUILD_DIR/mongo/db/server_feature_flags',
'$BUILD_DIR/mongo/executor/egress_tag_closer_manager',
'$BUILD_DIR/mongo/transport/message_compressor',
],
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp
index 26e2f488ed2..48368e139ee 100644
--- a/src/mongo/client/async_client.cpp
+++ b/src/mongo/client/async_client.cpp
@@ -35,12 +35,15 @@
#include <memory>
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/client/async_client_gen.h"
#include "mongo/client/authenticate.h"
#include "mongo/client/sasl_client_authenticate.h"
#include "mongo/config.h"
#include "mongo/db/auth/sasl_command_constants.h"
+#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/commands/test_commands_enabled.h"
#include "mongo/db/dbmessage.h"
+#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/db/server_options.h"
#include "mongo/db/wire_version.h"
#include "mongo/executor/egress_tag_closer_manager.h"
@@ -55,12 +58,20 @@
#include "mongo/util/net/ssl_peer_info.h"
#include "mongo/util/version.h"
-#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kNetwork
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kNetwork
namespace mongo {
MONGO_FAIL_POINT_DEFINE(pauseBeforeMarkKeepOpen);
+namespace {
+bool connHealthMetricsEnabled() {
+ return gFeatureFlagConnHealthMetrics.isEnabledAndIgnoreFCV();
+}
+CounterMetric totalTimeForEgressConnectionAcquiredToWireMicros(
+ "network.totalTimeForEgressConnectionAcquiredToWireMicros", connHealthMetricsEnabled);
+} // namespace
+
Future<AsyncDBClient::Handle> AsyncDBClient::connect(
const HostAndPort& peer,
transport::ConnectSSLMode sslMode,
@@ -288,18 +299,40 @@ Future<Message> AsyncDBClient::_waitForResponse(boost::optional<int32_t> msgId,
});
}
-Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request,
- const BatonHandle& baton,
- bool fireAndForget) {
+Future<rpc::UniqueReply> AsyncDBClient::runCommand(
+ OpMsgRequest request,
+ const BatonHandle& baton,
+ bool fireAndForget,
+ boost::optional<std::shared_ptr<Timer>> fromConnAcquiredTimer) {
auto requestMsg = request.serialize();
if (fireAndForget) {
OpMsg::setFlag(&requestMsg, OpMsg::kMoreToCome);
}
auto msgId = nextMessageId();
auto future = _call(std::move(requestMsg), msgId, baton);
+ auto logMetrics = [this, fromConnAcquiredTimer] {
+ if (fromConnAcquiredTimer) {
+ const auto timeElapsedMicros =
+ durationCount<Microseconds>(fromConnAcquiredTimer.get()->elapsed());
+ totalTimeForEgressConnectionAcquiredToWireMicros.increment(timeElapsedMicros);
+ if (timeElapsedMicros >= 1000 ||
+ _random.nextCanonicalDouble() <= gConnectionAcquisitionToWireLoggingRate.load()) {
+ LOGV2_INFO(6496702,
+ "Acquired connection for remote operation and completed writing to wire",
+ "durationMicros"_attr = timeElapsedMicros);
+ } else {
+ LOGV2_DEBUG(
+ 6496701,
+ 2,
+ "Acquired connection for remote operation and completed writing to wire",
+ "durationMicros"_attr = timeElapsedMicros);
+ }
+ }
+ };
if (fireAndForget) {
- return std::move(future).then([msgId, this]() -> Future<rpc::UniqueReply> {
+ return std::move(future).then([msgId, logMetrics, this]() -> Future<rpc::UniqueReply> {
+ logMetrics();
// Return a mock status OK response since we do not expect a real response.
OpMsgBuilder builder;
builder.setBody(BSON("ok" << 1));
@@ -311,19 +344,25 @@ Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request,
}
return std::move(future)
- .then([msgId, baton, this]() { return _waitForResponse(msgId, baton); })
+ .then([msgId, logMetrics, baton, this]() {
+ logMetrics();
+ return _waitForResponse(msgId, baton);
+ })
.then([this](Message response) -> Future<rpc::UniqueReply> {
return rpc::UniqueReply(response, rpc::makeReply(&response));
});
}
Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest(
- executor::RemoteCommandRequest request, const BatonHandle& baton) {
+ executor::RemoteCommandRequest request,
+ const BatonHandle& baton,
+ boost::optional<std::shared_ptr<Timer>> fromConnAcquiredTimer) {
auto startTimer = Timer();
auto opMsgRequest = OpMsgRequest::fromDBAndBody(
std::move(request.dbname), std::move(request.cmdObj), std::move(request.metadata));
opMsgRequest.validatedTenancyScope = request.validatedTenancyScope;
- return runCommand(std::move(opMsgRequest), baton, request.options.fireAndForget)
+ return runCommand(
+ std::move(opMsgRequest), baton, request.options.fireAndForget, fromConnAcquiredTimer)
.then([this, startTimer = std::move(startTimer)](rpc::UniqueReply response) {
return executor::RemoteCommandResponse(*response, startTimer.elapsed());
});
diff --git a/src/mongo/client/async_client.h b/src/mongo/client/async_client.h
index 7b7bcc39506..bef816727ac 100644
--- a/src/mongo/client/async_client.h
+++ b/src/mongo/client/async_client.h
@@ -51,7 +51,10 @@ public:
explicit AsyncDBClient(const HostAndPort& peer,
transport::SessionHandle session,
ServiceContext* svcCtx)
- : _peer(std::move(peer)), _session(std::move(session)), _svcCtx(svcCtx) {}
+ : _peer(std::move(peer)),
+ _session(std::move(session)),
+ _svcCtx(svcCtx),
+ _random{PseudoRandom(SecureRandom().nextInt64())} {}
using Handle = std::shared_ptr<AsyncDBClient>;
@@ -65,10 +68,14 @@ public:
std::shared_ptr<const transport::SSLConnectionContext> transientSSLContext = nullptr);
Future<executor::RemoteCommandResponse> runCommandRequest(
- executor::RemoteCommandRequest request, const BatonHandle& baton = nullptr);
- Future<rpc::UniqueReply> runCommand(OpMsgRequest request,
- const BatonHandle& baton = nullptr,
- bool fireAndForget = false);
+ executor::RemoteCommandRequest request,
+ const BatonHandle& baton = nullptr,
+ boost::optional<std::shared_ptr<Timer>> fromConnAcquiredTimer = boost::none);
+ Future<rpc::UniqueReply> runCommand(
+ OpMsgRequest request,
+ const BatonHandle& baton = nullptr,
+ bool fireAndForget = false,
+ boost::optional<std::shared_ptr<Timer>> fromConnAcquiredTimer = boost::none);
Future<executor::RemoteCommandResponse> beginExhaustCommandRequest(
executor::RemoteCommandRequest request, const BatonHandle& baton = nullptr);
@@ -116,6 +123,7 @@ private:
transport::SessionHandle _session;
ServiceContext* const _svcCtx;
MessageCompressorManager _compressorManager;
+ PseudoRandom _random;
};
} // namespace mongo
diff --git a/src/mongo/client/async_client.idl b/src/mongo/client/async_client.idl
new file mode 100644
index 00000000000..a800cc053e8
--- /dev/null
+++ b/src/mongo/client/async_client.idl
@@ -0,0 +1,42 @@
+# Copyright (C) 2022-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+global:
+ cpp_namespace: "mongo"
+
+server_parameters:
+ connectionAcquisitionToWireLoggingRate:
+ description: >-
+ The rate at which egress connection metrics below a certain time threshold will be logged at
+ info level. This only applies for the 'network.totalConnectionAcquiredToWireMillis'
+ server status metric.
+ set_at: [ startup, runtime ]
+ cpp_vartype: AtomicWord<double>
+ cpp_varname: gConnectionAcquisitionToWireLoggingRate
+ default: 0.2
+ validator: { gte: 0.0, lte: 1.0 }
diff --git a/src/mongo/executor/connection_metrics.h b/src/mongo/executor/connection_metrics.h
index 35954c0573c..0930ce81d22 100644
--- a/src/mongo/executor/connection_metrics.h
+++ b/src/mongo/executor/connection_metrics.h
@@ -34,6 +34,7 @@
#include "mongo/util/clock_source.h"
#include "mongo/util/duration.h"
#include "mongo/util/time_support.h"
+#include "mongo/util/timer.h"
namespace mongo {
@@ -111,6 +112,14 @@ public:
_connectionHook = _finishPhase();
}
+ void startConnAcquiredTimer() {
+ _fromConnAcquiredTimer.reset();
+ }
+
+ Timer* getConnAcquiredTimer() {
+ return &_fromConnAcquiredTimer;
+ }
+
private:
Milliseconds _finishPhase() {
auto elapsed = _stopWatch->elapsed();
@@ -126,6 +135,9 @@ private:
boost::optional<Milliseconds> _tlsHandshake;
boost::optional<Milliseconds> _auth;
boost::optional<Milliseconds> _connectionHook;
+ // A timer that is initialized from when an egress connection is acquired from the connection
+ // pool.
+ Timer _fromConnAcquiredTimer;
Milliseconds _total{0};
};
diff --git a/src/mongo/executor/connection_pool_tl.cpp b/src/mongo/executor/connection_pool_tl.cpp
index f4c7ff1f900..2ee85d449c1 100644
--- a/src/mongo/executor/connection_pool_tl.cpp
+++ b/src/mongo/executor/connection_pool_tl.cpp
@@ -501,6 +501,15 @@ void TLConnection::cancelAsync() {
_client->cancel();
}
+void TLConnection::startConnAcquiredTimer() {
+ _connMetrics.startConnAcquiredTimer();
+}
+
+std::shared_ptr<Timer> TLConnection::getConnAcquiredTimer() {
+ auto anchor = shared_from_this();
+ return std::shared_ptr<Timer>{anchor, _connMetrics.getConnAcquiredTimer()};
+}
+
auto TLTypeFactory::reactor() {
return checked_pointer_cast<transport::Reactor>(_executor);
}
diff --git a/src/mongo/executor/connection_pool_tl.h b/src/mongo/executor/connection_pool_tl.h
index 4a4c6228ea6..88b3e02a9bd 100644
--- a/src/mongo/executor/connection_pool_tl.h
+++ b/src/mongo/executor/connection_pool_tl.h
@@ -177,6 +177,8 @@ public:
bool maybeHealthy() override;
AsyncDBClient* client();
Date_t now() override;
+ void startConnAcquiredTimer();
+ std::shared_ptr<Timer> getConnAcquiredTimer();
private:
void setTimeout(Milliseconds timeout, TimeoutCallback cb) override;
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index ca69451b992..6d15519c1ac 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -707,8 +707,11 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::CommandState::sendRequest(
std::shared_ptr<RequestState> requestState) {
return makeReadyFutureWith([this, requestState] {
setTimer();
+ const auto connAcquiredTimer =
+ checked_cast<connection_pool_tl::TLConnection*>(requestState->conn.get())
+ ->getConnAcquiredTimer();
return RequestState::getClient(requestState->conn)
- ->runCommandRequest(*requestState->request, baton);
+ ->runCommandRequest(*requestState->request, baton, std::move(connAcquiredTimer));
})
.then([this, requestState](RemoteCommandResponse response) {
catchingInvoke(
@@ -865,6 +868,8 @@ void NetworkInterfaceTL::RequestManager::trySend(
return;
}
+ checked_cast<connection_pool_tl::TLConnection*>(swConn.getValue().get())
+ ->startConnAcquiredTimer();
std::shared_ptr<RequestState> requestState;
{