diff options
23 files changed, 645 insertions, 73 deletions
diff --git a/jstests/noPassthrough/libs/max_conns_override_config.yaml b/jstests/noPassthrough/libs/max_conns_override_config.yaml new file mode 100644 index 00000000000..c769d89d74c --- /dev/null +++ b/jstests/noPassthrough/libs/max_conns_override_config.yaml @@ -0,0 +1,6 @@ +# Used in the max_conns_override.js test to set the maxConnsOverride settings +net: + maxIncomingConnectionsOverride: [ "127.0.0.1" ] + reservedAdminThreads: 3 + maxIncomingConnections: 5 + diff --git a/jstests/noPassthrough/max_conns_override.js b/jstests/noPassthrough/max_conns_override.js new file mode 100644 index 00000000000..5fa28804856 --- /dev/null +++ b/jstests/noPassthrough/max_conns_override.js @@ -0,0 +1,45 @@ +(function() { + 'use strict'; + const configuredMaxConns = 5; + const configuredReadyAdminThreads = 3; + let conn = MongoRunner.runMongod({ + config: "jstests/noPassthrough/libs/max_conns_override_config.yaml", + // We check a specific field in this executor's serverStatus section + serviceExecutor: "synchronous", + }); + + // Use up all the maxConns with junk connections, all of these should succeed + let maxConns = []; + for (let i = 0; i < 5; i++) { + maxConns.push(new Mongo(`127.0.0.1:${conn.port}`)); + let tmpDb = maxConns[maxConns.length - 1].getDB("admin"); + assert.commandWorked(tmpDb.runCommand({isMaster: 1})); + } + + // Get serverStatus to check that we have the right number of threads in the right places + let status = conn.getDB("admin").runCommand({serverStatus: 1}); + const connectionsStatus = status["connections"]; + const reservedExecutorStatus = connectionsStatus["adminConnections"]; + const normalExecutorStatus = status["network"]["serviceExecutorTaskStats"]; + + // Log these serverStatus sections so we can debug this easily + print("connections status section: ", tojson(connectionsStatus)); + print("normal executor status section: ", tojson(normalExecutorStatus)); + + // The number of "available" connections should be less than zero, because we've used + // all of maxConns. We're over the limit! + assert.lt(connectionsStatus["available"], 0); + // The number of "current" connections should be greater than maxConns + assert.gt(connectionsStatus["current"], configuredMaxConns); + // The number of ready threads should be the number of readyThreads we configured, since + // every thread spawns a new thread on startup + assert.eq(reservedExecutorStatus["readyThreads"] + reservedExecutorStatus["startingThreads"], + configuredReadyAdminThreads); + // The number of running admin threads should be greater than the readyThreads, because + // one is being used right now + assert.gt(reservedExecutorStatus["threadsRunning"], reservedExecutorStatus["readyThreads"]); + // The normal serviceExecutor should only be running maxConns number of threads + assert.eq(normalExecutorStatus["threadsRunning"], configuredMaxConns); + + MongoRunner.stopMongod(conn); +})(); diff --git a/src/mongo/db/commands/server_status_servers.cpp b/src/mongo/db/commands/server_status_servers.cpp index 96007301142..b7a0eca6895 100644 --- a/src/mongo/db/commands/server_status_servers.cpp +++ b/src/mongo/db/commands/server_status_servers.cpp @@ -62,10 +62,7 @@ public: auto serviceEntryPoint = opCtx->getServiceContext()->getServiceEntryPoint(); invariant(serviceEntryPoint); - auto stats = serviceEntryPoint->sessionStats(); - bb.append("current", static_cast<int>(stats.numOpenSessions)); - bb.append("available", static_cast<int>(stats.numAvailableSessions)); - bb.append("totalCreated", static_cast<int>(stats.numCreatedSessions)); + serviceEntryPoint->appendStats(&bb); return bb.obj(); } @@ -83,8 +80,10 @@ public: networkCounter.append(b); appendMessageCompressionStats(&b); auto executor = opCtx->getServiceContext()->getServiceExecutor(); - if (executor) - executor->appendStats(&b); + if (executor) { + BSONObjBuilder section(b.subobjStart("serviceExecutorTaskStats")); + executor->appendStats(§ion); + } return b.obj(); } diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index d89eac1424e..78ffeb1a511 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -645,6 +645,12 @@ ExitCode _initAndListen(int listenPort) { return EXIT_NET_ERROR; } + start = serviceContext->getServiceEntryPoint()->start(); + if (!start.isOK()) { + error() << "Failed to start the service entry point: " << start; + return EXIT_NET_ERROR; + } + start = serviceContext->getTransportLayer()->start(); if (!start.isOK()) { error() << "Failed to start the listener: " << start.toString(); diff --git a/src/mongo/db/server_options.h b/src/mongo/db/server_options.h index 25f7a7ccc42..1a3f3a96c3e 100644 --- a/src/mongo/db/server_options.h +++ b/src/mongo/db/server_options.h @@ -30,6 +30,8 @@ #include "mongo/db/jsobj.h" #include "mongo/platform/atomic_word.h" #include "mongo/platform/process_id.h" +#include "mongo/stdx/variant.h" +#include "mongo/util/net/cidr.h" namespace mongo { @@ -79,6 +81,8 @@ struct ServerGlobalParams { std::string serviceExecutor; size_t maxConns = DEFAULT_MAX_CONN; // Maximum number of simultaneous open connections. + std::vector<stdx::variant<CIDR, std::string>> maxConnsOverride; + int reservedAdminThreads = 0; int unixSocketPermissions = DEFAULT_UNIX_PERMS; // permissions for the UNIX domain socket diff --git a/src/mongo/db/server_options_server_helpers.cpp b/src/mongo/db/server_options_server_helpers.cpp index 9342ae91302..bae75613bdd 100644 --- a/src/mongo/db/server_options_server_helpers.cpp +++ b/src/mongo/db/server_options_server_helpers.cpp @@ -120,6 +120,24 @@ Status addGeneralServerOptions(moe::OptionSection* options) { "net.maxIncomingConnections", "maxConns", moe::Int, maxConnInfoBuilder.str().c_str()); options + ->addOptionChaining( + "net.maxIncomingConnectionsOverride", + "", + moe::StringVector, + "CIDR ranges that do not count towards the maxIncomingConnections limit") + .hidden() + .setSources(moe::SourceYAMLConfig); + + options + ->addOptionChaining( + "net.reservedAdminThreads", + "", + moe::Int, + "number of worker threads to reserve for admin and internal connections") + .hidden() + .setSources(moe::SourceYAMLConfig); + + options ->addOptionChaining("net.transportLayer", "transportLayer", moe::String, @@ -583,6 +601,22 @@ Status storeServerOptions(const moe::Environment& params) { } } + if (params.count("net.maxIncomingConnectionsOverride")) { + auto ranges = params["net.maxIncomingConnectionsOverride"].as<std::vector<std::string>>(); + for (const auto& range : ranges) { + auto swr = CIDR::parse(range); + if (!swr.isOK()) { + serverGlobalParams.maxConnsOverride.push_back(range); + } else { + serverGlobalParams.maxConnsOverride.push_back(std::move(swr.getValue())); + } + } + } + + if (params.count("net.reservedAdminThreads")) { + serverGlobalParams.reservedAdminThreads = params["net.reservedAdminThreads"].as<int>(); + } + if (params.count("net.wireObjectCheck")) { serverGlobalParams.objcheck = params["net.wireObjectCheck"].as<bool>(); } diff --git a/src/mongo/embedded/service_entry_point_embedded.cpp b/src/mongo/embedded/service_entry_point_embedded.cpp index 3efc2848b80..a1f6acc7382 100644 --- a/src/mongo/embedded/service_entry_point_embedded.cpp +++ b/src/mongo/embedded/service_entry_point_embedded.cpp @@ -75,11 +75,15 @@ void ServiceEntryPointEmbedded::startSession(transport::SessionHandle session) { void ServiceEntryPointEmbedded::endAllSessions(transport::Session::TagMask tags) {} +Status ServiceEntryPointEmbedded::start() { + UASSERT_NOT_IMPLEMENTED; +} + bool ServiceEntryPointEmbedded::shutdown(Milliseconds timeout) { UASSERT_NOT_IMPLEMENTED; } -ServiceEntryPoint::Stats ServiceEntryPointEmbedded::sessionStats() const { +void ServiceEntryPointEmbedded::appendStats(BSONObjBuilder*) const { UASSERT_NOT_IMPLEMENTED; } diff --git a/src/mongo/embedded/service_entry_point_embedded.h b/src/mongo/embedded/service_entry_point_embedded.h index ba6ed39ce6d..eea4e48b86e 100644 --- a/src/mongo/embedded/service_entry_point_embedded.h +++ b/src/mongo/embedded/service_entry_point_embedded.h @@ -45,8 +45,9 @@ public: void startSession(transport::SessionHandle session) override; void endAllSessions(transport::Session::TagMask tags) override; + Status start() override; bool shutdown(Milliseconds timeout) override; - Stats sessionStats() const override; + void appendStats(BSONObjBuilder* bob) const override; size_t numOpenSessions() const override; private: diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 0dc64211d36..0ac5e2c17e1 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -446,6 +446,12 @@ ExitCode runMongosServer(ServiceContext* serviceContext) { return EXIT_NET_ERROR; } + status = serviceContext->getServiceEntryPoint()->start(); + if (!status.isOK()) { + error() << "Failed to start the service entry point: " << redact(status); + return EXIT_NET_ERROR; + } + status = serviceContext->getTransportLayer()->start(); if (!status.isOK()) { error() << "Failed to start the transport layer: " << redact(status); diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 2990aede9ea..87b3463ec9c 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -113,6 +113,7 @@ tlEnv.Library( target='service_executor', source=[ 'service_executor_adaptive.cpp', + 'service_executor_reserved.cpp', 'service_executor_synchronous.cpp', 'thread_idle_callback.cpp', ], @@ -169,6 +170,7 @@ env.Library( '$BUILD_DIR/mongo/db/stats/counters', '$BUILD_DIR/mongo/rpc/protocol', '$BUILD_DIR/mongo/util/processinfo', + 'service_executor', 'transport_layer_common', ], LIBDEPS_PRIVATE=[ @@ -177,6 +179,18 @@ env.Library( ) env.CppUnitTest( + target='max_conns_override_test', + source=[ + 'max_conns_override_test.cpp', + ], + LIBDEPS=[ + 'service_entry_point', + 'transport_layer_mock', + '$BUILD_DIR/mongo/unittest/unittest' + ], +) + +env.CppUnitTest( target='service_state_machine_test', source=[ 'service_state_machine_test.cpp', diff --git a/src/mongo/transport/max_conns_override_test.cpp b/src/mongo/transport/max_conns_override_test.cpp new file mode 100644 index 00000000000..ea8ef0efdbf --- /dev/null +++ b/src/mongo/transport/max_conns_override_test.cpp @@ -0,0 +1,86 @@ +/* Copyright 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/transport/mock_session.h" +#include "mongo/transport/service_entry_point_impl.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +using ExemptionVector = std::vector<stdx::variant<CIDR, std::string>>; + +template <typename T> +stdx::variant<CIDR, std::string> makeExemption(T exemption) { + auto swCIDR = CIDR::parse(exemption); + if (swCIDR.isOK()) { + return swCIDR.getValue(); + } else { + return std::string{exemption}; + } +} + +transport::SessionHandle makeIPSession(StringData ip) { + return transport::MockSession::create( + HostAndPort(SockAddr(ip, 27017, AF_INET)), HostAndPort(), nullptr); +} + +#ifndef _WIN32 +transport::SessionHandle makeUNIXSession(StringData path) { + return transport::MockSession::create(HostAndPort(SockAddr(""_sd, -1, AF_UNIX)), + HostAndPort(SockAddr(path, -1, AF_UNIX)), + nullptr); +} +#endif + +TEST(MaxConnsOverride, NormalCIDR) { + ExemptionVector cidrOnly{makeExemption("127.0.0.1"), makeExemption("10.0.0.0/24")}; + + ASSERT_TRUE(shouldOverrideMaxConns(makeIPSession("127.0.0.1"), cidrOnly)); + ASSERT_TRUE(shouldOverrideMaxConns(makeIPSession("10.0.0.35"), cidrOnly)); + ASSERT_FALSE(shouldOverrideMaxConns(makeIPSession("192.168.0.53"), cidrOnly)); +} + +#ifndef _WIN32 +TEST(MaxConnsOverride, UNIXPaths) { + ExemptionVector mixed{makeExemption("127.0.0.1"), + makeExemption("10.0.0.0/24"), + makeExemption("/tmp/mongod.sock")}; + + ASSERT_TRUE(shouldOverrideMaxConns(makeIPSession("127.0.0.1"), mixed)); + ASSERT_TRUE(shouldOverrideMaxConns(makeIPSession("10.0.0.35"), mixed)); + ASSERT_FALSE(shouldOverrideMaxConns(makeIPSession("192.168.0.53"), mixed)); + ASSERT_FALSE(shouldOverrideMaxConns(makeUNIXSession("/tmp/mongod.sock"), mixed)); + ASSERT_TRUE(shouldOverrideMaxConns(makeUNIXSession("/tmp/mongod.sock"), mixed)); + ASSERT_FALSE(shouldOverrideMaxConns(makeUNIXSession("/tmp/other-mongod.sock"), mixed)); +} +#endif + +} // namespace +} // namespace diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h index e22135bdaf1..abb9c225843 100644 --- a/src/mongo/transport/service_entry_point.h +++ b/src/mongo/transport/service_entry_point.h @@ -29,6 +29,7 @@ #pragma once #include "mongo/base/disallow_copying.h" +#include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/dbmessage.h" #include "mongo/transport/session.h" @@ -45,29 +46,6 @@ class ServiceEntryPoint { MONGO_DISALLOW_COPYING(ServiceEntryPoint); public: - /** - * Stats for sessions open. - */ - struct Stats { - /** - * Returns the number of sessions currently open. - */ - size_t numOpenSessions = 0; - - /** - * Returns the total number of sessions that have ever been created. - */ - size_t numCreatedSessions = 0; - - /** - * Returns the number of available sessions we could still open. Only relevant - * when we are operating under a transport::Session limit (for example, in the - * legacy implementation, we respect a maximum number of connections). If there - * is no session limit, returns std::numeric_limits<int>::max(). - */ - size_t numAvailableSessions = 0; - }; - virtual ~ServiceEntryPoint() = default; /** @@ -81,14 +59,19 @@ public: virtual void endAllSessions(transport::Session::TagMask tags) = 0; /** + * Starts the service entry point + */ + virtual Status start() = 0; + + /** * Shuts down the service entry point. */ virtual bool shutdown(Milliseconds timeout) = 0; /** - * Returns high-level stats about current sessions. - */ - virtual Stats sessionStats() const = 0; + * Append high-level stats to a BSONObjBuilder for serverStatus + */ + virtual void appendStats(BSONObjBuilder* bob) const = 0; /** * Returns the number of sessions currently open. diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp index adbdc48a6ec..0113d1bdfc2 100644 --- a/src/mongo/transport/service_entry_point_impl.cpp +++ b/src/mongo/transport/service_entry_point_impl.cpp @@ -46,6 +46,38 @@ #endif namespace mongo { + +bool shouldOverrideMaxConns(const transport::SessionHandle& session, + const std::vector<stdx::variant<CIDR, std::string>>& exemptions) { + const auto& remoteAddr = session->remote().sockAddr(); + const auto& localAddr = session->local().sockAddr(); + boost::optional<CIDR> remoteCIDR; + + if (remoteAddr && remoteAddr->isIP()) { + remoteCIDR = uassertStatusOK(CIDR::parse(remoteAddr->getAddr())); + } + for (const auto& exemption : exemptions) { + // If this exemption is a CIDR range, then we check that the remote IP is in the + // CIDR range + if ((stdx::holds_alternative<CIDR>(exemption)) && (remoteCIDR)) { + if (stdx::get<CIDR>(exemption).contains(*remoteCIDR)) { + return true; + } +// Otherwise the exemption is a UNIX path and we should check the local path +// (the remoteAddr == "anonymous unix socket") against the exemption string +// +// On Windows we don't check this at all and only CIDR ranges are supported +#ifndef _WIN32 + } else if ((stdx::holds_alternative<std::string>(exemption)) && (localAddr) && + (localAddr->getAddr() == stdx::get<std::string>(exemption))) { + return true; +#endif + } + } + + return false; +} + ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(svcCtx) { const auto supportedMax = [] { @@ -71,6 +103,18 @@ ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(s } _maxNumConnections = supportedMax; + + if (serverGlobalParams.reservedAdminThreads) { + _adminInternalPool = std::make_unique<transport::ServiceExecutorReserved>( + _svcCtx, "admin/internal connections", serverGlobalParams.reservedAdminThreads); + } +} + +Status ServiceEntryPointImpl::start() { + if (_adminInternalPool) + return _adminInternalPool->start(); + else + return Status::OK(); } void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { @@ -89,10 +133,16 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { auto transportMode = _svcCtx->getServiceExecutor()->transportMode(); auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode); + auto usingMaxConnOverride = false; { stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex); connectionCount = _sessions.size() + 1; - if (connectionCount <= _maxNumConnections) { + if (connectionCount > _maxNumConnections) { + usingMaxConnOverride = + shouldOverrideMaxConns(session, serverGlobalParams.maxConnsOverride); + } + + if (connectionCount <= _maxNumConnections || usingMaxConnOverride) { ssmIt = _sessions.emplace(_sessions.begin(), ssm); _currentConnections.store(connectionCount); _createdConnections.addAndFetch(1); @@ -101,11 +151,13 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { // Checking if we successfully added a connection above. Separated from the lock so we don't log // while holding it. - if (connectionCount > _maxNumConnections) { + if (connectionCount > _maxNumConnections && !usingMaxConnOverride) { if (!quiet) { log() << "connection refused because too many open connections: " << connectionCount; } return; + } else if (usingMaxConnOverride && _adminInternalPool) { + ssm->setServiceExecutor(_adminInternalPool.get()); } if (!quiet) { @@ -183,15 +235,18 @@ bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) { return result; } -ServiceEntryPoint::Stats ServiceEntryPointImpl::sessionStats() const { +void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const { size_t sessionCount = _currentConnections.load(); - ServiceEntryPoint::Stats ret; - ret.numOpenSessions = sessionCount; - ret.numCreatedSessions = _createdConnections.load(); - ret.numAvailableSessions = _maxNumConnections - sessionCount; - return ret; + bob->append("current", static_cast<int>(sessionCount)); + bob->append("available", static_cast<int>(_maxNumConnections - sessionCount)); + bob->append("totalCreated", static_cast<int>(_createdConnections.load())); + + if (_adminInternalPool) { + BSONObjBuilder section(bob->subobjStart("adminConnections")); + _adminInternalPool->appendStats(§ion); + } } } // namespace mongo diff --git a/src/mongo/transport/service_entry_point_impl.h b/src/mongo/transport/service_entry_point_impl.h index 6f47ccedccb..6181885c953 100644 --- a/src/mongo/transport/service_entry_point_impl.h +++ b/src/mongo/transport/service_entry_point_impl.h @@ -34,8 +34,11 @@ #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/list.h" #include "mongo/stdx/mutex.h" +#include "mongo/stdx/variant.h" #include "mongo/transport/service_entry_point.h" +#include "mongo/transport/service_executor_reserved.h" #include "mongo/transport/service_state_machine.h" +#include "mongo/util/net/cidr.h" namespace mongo { class ServiceContext; @@ -61,9 +64,10 @@ public: void endAllSessions(transport::Session::TagMask tags) final; + Status start() final; bool shutdown(Milliseconds timeout) final; - Stats sessionStats() const final; + void appendStats(BSONObjBuilder* bob) const final; size_t numOpenSessions() const final { return _currentConnections.load(); @@ -83,6 +87,14 @@ private: size_t _maxNumConnections{DEFAULT_MAX_CONN}; AtomicWord<size_t> _currentConnections{0}; AtomicWord<size_t> _createdConnections{0}; + + std::unique_ptr<transport::ServiceExecutorReserved> _adminInternalPool; }; +/* + * Returns true if a session with remote/local addresses should be exempted from maxConns + */ +bool shouldOverrideMaxConns(const transport::SessionHandle& session, + const std::vector<stdx::variant<CIDR, std::string>>& exemptions); + } // namespace mongo diff --git a/src/mongo/transport/service_executor_adaptive.cpp b/src/mongo/transport/service_executor_adaptive.cpp index d0014485cc4..5d928bd0520 100644 --- a/src/mongo/transport/service_executor_adaptive.cpp +++ b/src/mongo/transport/service_executor_adaptive.cpp @@ -664,20 +664,19 @@ StringData ServiceExecutorAdaptive::_threadStartedByToString( void ServiceExecutorAdaptive::appendStats(BSONObjBuilder* bob) const { stdx::unique_lock<stdx::mutex> lk(_threadsMutex); - BSONObjBuilder section(bob->subobjStart("serviceExecutorTaskStats")); - section << kExecutorLabel << kExecutorName // - << kTotalQueued << _totalQueued.load() // - << kTotalExecuted << _totalExecuted.load() // - << kThreadsInUse << _threadsInUse.load() // - << kTotalTimeRunningUs // - << ticksToMicros(_getThreadTimerTotal(ThreadTimer::kRunning, lk), _tickSource) // - << kTotalTimeExecutingUs // - << ticksToMicros(_getThreadTimerTotal(ThreadTimer::kExecuting, lk), _tickSource) // - << kTotalTimeQueuedUs << ticksToMicros(_totalSpentQueued.load(), _tickSource) // - << kThreadsRunning << _threadsRunning.load() // - << kThreadsPending << _threadsPending.load(); - - BSONObjBuilder threadStartReasons(section.subobjStart(kThreadReasons)); + *bob << kExecutorLabel << kExecutorName // + << kTotalQueued << _totalQueued.load() // + << kTotalExecuted << _totalExecuted.load() // + << kThreadsInUse << _threadsInUse.load() // + << kTotalTimeRunningUs // + << ticksToMicros(_getThreadTimerTotal(ThreadTimer::kRunning, lk), _tickSource) // + << kTotalTimeExecutingUs // + << ticksToMicros(_getThreadTimerTotal(ThreadTimer::kExecuting, lk), _tickSource) // + << kTotalTimeQueuedUs << ticksToMicros(_totalSpentQueued.load(), _tickSource) // + << kThreadsRunning << _threadsRunning.load() // + << kThreadsPending << _threadsPending.load(); + + BSONObjBuilder threadStartReasons(bob->subobjStart(kThreadReasons)); for (size_t i = 0; i < _threadStartCounters.size(); i++) { threadStartReasons << _threadStartedByToString(static_cast<ThreadCreationReason>(i)) << _threadStartCounters[i]; @@ -685,7 +684,7 @@ void ServiceExecutorAdaptive::appendStats(BSONObjBuilder* bob) const { threadStartReasons.doneFast(); - BSONObjBuilder metricsByTask(section.subobjStart("metricsByTask")); + BSONObjBuilder metricsByTask(bob->subobjStart("metricsByTask")); MetricsArray totalMetrics; _accumulateAllTaskMetrics(&totalMetrics, lk); lk.unlock(); @@ -703,7 +702,6 @@ void ServiceExecutorAdaptive::appendStats(BSONObjBuilder* bob) const { subSection.doneFast(); } metricsByTask.doneFast(); - section.doneFast(); } } // namespace transport diff --git a/src/mongo/transport/service_executor_reserved.cpp b/src/mongo/transport/service_executor_reserved.cpp new file mode 100644 index 00000000000..7d8e39c5a7d --- /dev/null +++ b/src/mongo/transport/service_executor_reserved.cpp @@ -0,0 +1,212 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor; + +#include "mongo/platform/basic.h" + +#include "mongo/transport/service_executor_reserved.h" + +#include "mongo/db/server_parameters.h" +#include "mongo/stdx/thread.h" +#include "mongo/transport/service_entry_point_utils.h" +#include "mongo/transport/service_executor_task_names.h" +#include "mongo/transport/thread_idle_callback.h" +#include "mongo/util/log.h" +#include "mongo/util/processinfo.h" + +namespace mongo { +namespace transport { +namespace { + +// Tasks scheduled with MayRecurse may be called recursively if the recursion depth is below this +// value. +MONGO_EXPORT_SERVER_PARAMETER(reservedServiceExecutorRecursionLimit, int, 8); + +constexpr auto kThreadsRunning = "threadsRunning"_sd; +constexpr auto kExecutorLabel = "executor"_sd; +constexpr auto kExecutorName = "reserved"_sd; +constexpr auto kReadyThreads = "readyThreads"_sd; +constexpr auto kStartingThreads = "startingThreads"_sd; +} // namespace + +thread_local std::deque<ServiceExecutor::Task> ServiceExecutorReserved::_localWorkQueue = {}; +thread_local int ServiceExecutorReserved::_localRecursionDepth = 0; +thread_local int64_t ServiceExecutorReserved::_localThreadIdleCounter = 0; + +ServiceExecutorReserved::ServiceExecutorReserved(ServiceContext* ctx, + std::string name, + size_t reservedThreads) + : _name(std::move(name)), _reservedThreads(reservedThreads) {} + +Status ServiceExecutorReserved::start() { + { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _stillRunning.store(true); + _numStartingThreads = _reservedThreads; + } + + for (size_t i = 0; i < _reservedThreads; i++) { + auto status = _startWorker(); + if (!status.isOK()) { + return status; + } + } + + return Status::OK(); +} + +Status ServiceExecutorReserved::_startWorker() { + log() << "Starting new worker thread for " << _name << " service executor"; + return launchServiceWorkerThread([this] { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _numRunningWorkerThreads.addAndFetch(1); + auto numRunningGuard = MakeGuard([&] { + _numRunningWorkerThreads.subtractAndFetch(1); + _shutdownCondition.notify_one(); + }); + + _numStartingThreads--; + _numReadyThreads++; + + while (_stillRunning.load()) { + _threadWakeup.wait(lk, [&] { return (!_stillRunning.load() || !_readyTasks.empty()); }); + + if (!_stillRunning.loadRelaxed()) { + break; + } + + if (_readyTasks.empty()) { + continue; + } + + auto task = std::move(_readyTasks.front()); + _readyTasks.pop_front(); + _numReadyThreads -= 1; + bool launchReplacement = false; + if (_numReadyThreads + _numStartingThreads < _reservedThreads) { + _numStartingThreads++; + launchReplacement = true; + } + + lk.unlock(); + + if (launchReplacement) { + auto threadStartStatus = _startWorker(); + if (!threadStartStatus.isOK()) { + warning() << "Could not start new reserve worker thread: " << threadStartStatus; + } + } + + _localWorkQueue.emplace_back(std::move(task)); + while (!_localWorkQueue.empty() && _stillRunning.loadRelaxed()) { + _localRecursionDepth = 1; + _localWorkQueue.front()(); + _localWorkQueue.pop_front(); + } + + lk.lock(); + if (_numReadyThreads + 1 > _reservedThreads) { + break; + } else { + _numReadyThreads += 1; + } + } + + LOG(3) << "Exiting worker thread in " << _name << " service executor"; + }); +} + + +Status ServiceExecutorReserved::shutdown(Milliseconds timeout) { + LOG(3) << "Shutting down reserved executor"; + + stdx::unique_lock<stdx::mutex> lock(_mutex); + _stillRunning.store(false); + _threadWakeup.notify_all(); + + bool result = _shutdownCondition.wait_for(lock, timeout.toSystemDuration(), [this]() { + return _numRunningWorkerThreads.load() == 0; + }); + + return result + ? Status::OK() + : Status(ErrorCodes::Error::ExceededTimeLimit, + "reserved executor couldn't shutdown all worker threads within time limit."); +} + +Status ServiceExecutorReserved::schedule(Task task, + ScheduleFlags flags, + ServiceExecutorTaskName taskName) { + if (!_stillRunning.load()) { + return Status{ErrorCodes::ShutdownInProgress, "Executor is not running"}; + } + + if (!_localWorkQueue.empty()) { + /* + * In perf testing we found that yielding after running a each request produced + * at 5% performance boost in microbenchmarks if the number of worker threads + * was greater than the number of available cores. + */ + if (flags & ScheduleFlags::kMayYieldBeforeSchedule) { + if ((_localThreadIdleCounter++ & 0xf) == 0) { + markThreadIdle(); + } + } + + // Execute task directly (recurse) if allowed by the caller as it produced better + // performance in testing. Try to limit the amount of recursion so we don't blow up the + // stack, even though this shouldn't happen with this executor that uses blocking network + // I/O. + if ((flags & ScheduleFlags::kMayRecurse) && + (_localRecursionDepth < reservedServiceExecutorRecursionLimit.loadRelaxed())) { + ++_localRecursionDepth; + task(); + } else { + _localWorkQueue.emplace_back(std::move(task)); + } + return Status::OK(); + } + + stdx::lock_guard<stdx::mutex> lk(_mutex); + _readyTasks.push_back(std::move(task)); + _threadWakeup.notify_one(); + + return Status::OK(); +} + +void ServiceExecutorReserved::appendStats(BSONObjBuilder* bob) const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + *bob << kExecutorLabel << kExecutorName << kThreadsRunning + << static_cast<int>(_numRunningWorkerThreads.loadRelaxed()) << kReadyThreads + << static_cast<int>(_numReadyThreads) << kStartingThreads + << static_cast<int>(_numStartingThreads); +} + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/service_executor_reserved.h b/src/mongo/transport/service_executor_reserved.h new file mode 100644 index 00000000000..235e86e5930 --- /dev/null +++ b/src/mongo/transport/service_executor_reserved.h @@ -0,0 +1,91 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <deque> + +#include "mongo/base/status.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" +#include "mongo/transport/service_executor.h" +#include "mongo/transport/service_executor_task_names.h" + +namespace mongo { +namespace transport { + +/** + * The reserved service executor emulates a thread per connection. + * Each connection has its own worker thread where jobs get scheduled. + * + * The executor will start reservedThreads on start, and create a new thread every time it + * starts a new thread, ensuring there are always reservedThreads available for work - this + * means that even when you hit the NPROC ulimit, there will still be threads ready to + * accept work. When threads exit, they will go back to waiting for work if there are fewer + * than reservedThreads available. + */ +class ServiceExecutorReserved final : public ServiceExecutor { +public: + explicit ServiceExecutorReserved(ServiceContext* ctx, std::string name, size_t reservedThreads); + + Status start() override; + Status shutdown(Milliseconds timeout) override; + Status schedule(Task task, ScheduleFlags flags, ServiceExecutorTaskName taskName) override; + + Mode transportMode() const override { + return Mode::kSynchronous; + } + + void appendStats(BSONObjBuilder* bob) const override; + +private: + Status _startWorker(); + + static thread_local std::deque<Task> _localWorkQueue; + static thread_local int _localRecursionDepth; + static thread_local int64_t _localThreadIdleCounter; + + AtomicBool _stillRunning{false}; + + mutable stdx::mutex _mutex; + stdx::condition_variable _threadWakeup; + stdx::condition_variable _shutdownCondition; + + std::deque<Task> _readyTasks; + + AtomicUInt32 _numRunningWorkerThreads{0}; + size_t _numReadyThreads{0}; + size_t _numStartingThreads{0}; + + const std::string _name; + const size_t _reservedThreads; +}; + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp index a3bd83a0f06..7b1b6509537 100644 --- a/src/mongo/transport/service_executor_synchronous.cpp +++ b/src/mongo/transport/service_executor_synchronous.cpp @@ -142,9 +142,8 @@ Status ServiceExecutorSynchronous::schedule(Task task, } void ServiceExecutorSynchronous::appendStats(BSONObjBuilder* bob) const { - BSONObjBuilder section(bob->subobjStart("serviceExecutorTaskStats")); - section << kExecutorLabel << kExecutorName << kThreadsRunning - << static_cast<int>(_numRunningWorkerThreads.loadRelaxed()); + *bob << kExecutorLabel << kExecutorName << kThreadsRunning + << static_cast<int>(_numRunningWorkerThreads.loadRelaxed()); } } // namespace transport diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 0ecbfcca6d1..83eb411c7b6 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -226,6 +226,7 @@ ServiceStateMachine::ServiceStateMachine(ServiceContext* svcContext, _sep{svcContext->getServiceEntryPoint()}, _transportMode(transportMode), _serviceContext(svcContext), + _serviceExecutor(_serviceContext->getServiceExecutor()), _sessionHandle(session), _threadName{str::stream() << "conn" << _session()->id()}, _dbClient{svcContext->makeClient(_threadName, std::move(session))}, @@ -469,6 +470,10 @@ void ServiceStateMachine::start(Ownership ownershipModel) { ownershipModel); } +void ServiceStateMachine::setServiceExecutor(ServiceExecutor* executor) { + _serviceExecutor = executor; +} + void ServiceStateMachine::_scheduleNextWithGuard(ThreadGuard guard, transport::ServiceExecutor::ScheduleFlags flags, transport::ServiceExecutorTaskName taskName, @@ -480,8 +485,7 @@ void ServiceStateMachine::_scheduleNextWithGuard(ThreadGuard guard, ssm->_runNextInGuard(std::move(guard)); }; guard.release(); - Status status = - _serviceContext->getServiceExecutor()->schedule(std::move(func), flags, taskName); + Status status = _serviceExecutor->schedule(std::move(func), flags, taskName); if (status.isOK()) { return; } diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h index bac5417dd67..ee54eb99cc8 100644 --- a/src/mongo/transport/service_state_machine.h +++ b/src/mongo/transport/service_state_machine.h @@ -126,6 +126,12 @@ public: void start(Ownership ownershipModel); /* + * Set the executor to be used for the next call to runNext(). This allows switching between + * thread models after the SSM has started. + */ + void setServiceExecutor(transport::ServiceExecutor* executor); + + /* * Gets the current state of connection for testing/diagnostic purposes. */ State state(); @@ -219,6 +225,7 @@ private: transport::Mode _transportMode; ServiceContext* const _serviceContext; + transport::ServiceExecutor* _serviceExecutor; transport::SessionHandle _sessionHandle; const std::string _threadName; diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp index e29c33627a6..6d37d63b165 100644 --- a/src/mongo/transport/service_state_machine_test.cpp +++ b/src/mongo/transport/service_state_machine_test.cpp @@ -84,13 +84,15 @@ public: void endAllSessions(transport::Session::TagMask tags) override {} + Status start() override { + return Status::OK(); + } + bool shutdown(Milliseconds timeout) override { return true; } - Stats sessionStats() const override { - return {}; - } + void appendStats(BSONObjBuilder*) const override {} size_t numOpenSessions() const override { return 0ULL; diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp index b9ff5580dc4..abf49b48690 100644 --- a/src/mongo/transport/transport_layer_asio_test.cpp +++ b/src/mongo/transport/transport_layer_asio_test.cpp @@ -63,13 +63,15 @@ public: old_sessions.clear(); } + Status start() override { + return Status::OK(); + } + bool shutdown(Milliseconds timeout) override { return true; } - Stats sessionStats() const override { - return {}; - } + void appendStats(BSONObjBuilder*) const override {} size_t numOpenSessions() const override { stdx::unique_lock<stdx::mutex> lock(_mutex); @@ -169,10 +171,12 @@ public: return true; } - Stats sessionStats() const override { - return {}; + Status start() override { + return Status::OK(); } + void appendStats(BSONObjBuilder*) const override {} + size_t numOpenSessions() const override { return 0; } diff --git a/src/mongo/util/net/sockaddr.cpp b/src/mongo/util/net/sockaddr.cpp index 0cf024b6f44..00470af3ea6 100644 --- a/src/mongo/util/net/sockaddr.cpp +++ b/src/mongo/util/net/sockaddr.cpp @@ -133,7 +133,7 @@ SockAddr::SockAddr(StringData target, int port, sa_family_t familyHint) _hostOrIp = "127.0.0.1"; } - if (mongoutils::str::contains(_hostOrIp, '/')) { + if (mongoutils::str::contains(_hostOrIp, '/') || familyHint == AF_UNIX) { initUnixDomainSocket(_hostOrIp, port); return; } |