summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/noPassthrough/libs/max_conns_override_config.yaml6
-rw-r--r--jstests/noPassthrough/max_conns_override.js45
-rw-r--r--src/mongo/db/commands/server_status_servers.cpp11
-rw-r--r--src/mongo/db/db.cpp6
-rw-r--r--src/mongo/db/server_options.h4
-rw-r--r--src/mongo/db/server_options_server_helpers.cpp34
-rw-r--r--src/mongo/embedded/service_entry_point_embedded.cpp6
-rw-r--r--src/mongo/embedded/service_entry_point_embedded.h3
-rw-r--r--src/mongo/s/server.cpp6
-rw-r--r--src/mongo/transport/SConscript14
-rw-r--r--src/mongo/transport/max_conns_override_test.cpp86
-rw-r--r--src/mongo/transport/service_entry_point.h35
-rw-r--r--src/mongo/transport/service_entry_point_impl.cpp71
-rw-r--r--src/mongo/transport/service_entry_point_impl.h14
-rw-r--r--src/mongo/transport/service_executor_adaptive.cpp30
-rw-r--r--src/mongo/transport/service_executor_reserved.cpp212
-rw-r--r--src/mongo/transport/service_executor_reserved.h91
-rw-r--r--src/mongo/transport/service_executor_synchronous.cpp5
-rw-r--r--src/mongo/transport/service_state_machine.cpp8
-rw-r--r--src/mongo/transport/service_state_machine.h7
-rw-r--r--src/mongo/transport/service_state_machine_test.cpp8
-rw-r--r--src/mongo/transport/transport_layer_asio_test.cpp14
-rw-r--r--src/mongo/util/net/sockaddr.cpp2
23 files changed, 645 insertions, 73 deletions
diff --git a/jstests/noPassthrough/libs/max_conns_override_config.yaml b/jstests/noPassthrough/libs/max_conns_override_config.yaml
new file mode 100644
index 00000000000..c769d89d74c
--- /dev/null
+++ b/jstests/noPassthrough/libs/max_conns_override_config.yaml
@@ -0,0 +1,6 @@
+# Used in the max_conns_override.js test to set the maxConnsOverride settings
+net:
+ maxIncomingConnectionsOverride: [ "127.0.0.1" ]
+ reservedAdminThreads: 3
+ maxIncomingConnections: 5
+
diff --git a/jstests/noPassthrough/max_conns_override.js b/jstests/noPassthrough/max_conns_override.js
new file mode 100644
index 00000000000..5fa28804856
--- /dev/null
+++ b/jstests/noPassthrough/max_conns_override.js
@@ -0,0 +1,45 @@
+(function() {
+ 'use strict';
+ const configuredMaxConns = 5;
+ const configuredReadyAdminThreads = 3;
+ let conn = MongoRunner.runMongod({
+ config: "jstests/noPassthrough/libs/max_conns_override_config.yaml",
+ // We check a specific field in this executor's serverStatus section
+ serviceExecutor: "synchronous",
+ });
+
+ // Use up all the maxConns with junk connections, all of these should succeed
+ let maxConns = [];
+ for (let i = 0; i < 5; i++) {
+ maxConns.push(new Mongo(`127.0.0.1:${conn.port}`));
+ let tmpDb = maxConns[maxConns.length - 1].getDB("admin");
+ assert.commandWorked(tmpDb.runCommand({isMaster: 1}));
+ }
+
+ // Get serverStatus to check that we have the right number of threads in the right places
+ let status = conn.getDB("admin").runCommand({serverStatus: 1});
+ const connectionsStatus = status["connections"];
+ const reservedExecutorStatus = connectionsStatus["adminConnections"];
+ const normalExecutorStatus = status["network"]["serviceExecutorTaskStats"];
+
+ // Log these serverStatus sections so we can debug this easily
+ print("connections status section: ", tojson(connectionsStatus));
+ print("normal executor status section: ", tojson(normalExecutorStatus));
+
+ // The number of "available" connections should be less than zero, because we've used
+ // all of maxConns. We're over the limit!
+ assert.lt(connectionsStatus["available"], 0);
+ // The number of "current" connections should be greater than maxConns
+ assert.gt(connectionsStatus["current"], configuredMaxConns);
+ // The number of ready threads should be the number of readyThreads we configured, since
+ // every thread spawns a new thread on startup
+ assert.eq(reservedExecutorStatus["readyThreads"] + reservedExecutorStatus["startingThreads"],
+ configuredReadyAdminThreads);
+ // The number of running admin threads should be greater than the readyThreads, because
+ // one is being used right now
+ assert.gt(reservedExecutorStatus["threadsRunning"], reservedExecutorStatus["readyThreads"]);
+ // The normal serviceExecutor should only be running maxConns number of threads
+ assert.eq(normalExecutorStatus["threadsRunning"], configuredMaxConns);
+
+ MongoRunner.stopMongod(conn);
+})();
diff --git a/src/mongo/db/commands/server_status_servers.cpp b/src/mongo/db/commands/server_status_servers.cpp
index 96007301142..b7a0eca6895 100644
--- a/src/mongo/db/commands/server_status_servers.cpp
+++ b/src/mongo/db/commands/server_status_servers.cpp
@@ -62,10 +62,7 @@ public:
auto serviceEntryPoint = opCtx->getServiceContext()->getServiceEntryPoint();
invariant(serviceEntryPoint);
- auto stats = serviceEntryPoint->sessionStats();
- bb.append("current", static_cast<int>(stats.numOpenSessions));
- bb.append("available", static_cast<int>(stats.numAvailableSessions));
- bb.append("totalCreated", static_cast<int>(stats.numCreatedSessions));
+ serviceEntryPoint->appendStats(&bb);
return bb.obj();
}
@@ -83,8 +80,10 @@ public:
networkCounter.append(b);
appendMessageCompressionStats(&b);
auto executor = opCtx->getServiceContext()->getServiceExecutor();
- if (executor)
- executor->appendStats(&b);
+ if (executor) {
+ BSONObjBuilder section(b.subobjStart("serviceExecutorTaskStats"));
+ executor->appendStats(&section);
+ }
return b.obj();
}
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index d89eac1424e..78ffeb1a511 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -645,6 +645,12 @@ ExitCode _initAndListen(int listenPort) {
return EXIT_NET_ERROR;
}
+ start = serviceContext->getServiceEntryPoint()->start();
+ if (!start.isOK()) {
+ error() << "Failed to start the service entry point: " << start;
+ return EXIT_NET_ERROR;
+ }
+
start = serviceContext->getTransportLayer()->start();
if (!start.isOK()) {
error() << "Failed to start the listener: " << start.toString();
diff --git a/src/mongo/db/server_options.h b/src/mongo/db/server_options.h
index 25f7a7ccc42..1a3f3a96c3e 100644
--- a/src/mongo/db/server_options.h
+++ b/src/mongo/db/server_options.h
@@ -30,6 +30,8 @@
#include "mongo/db/jsobj.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/platform/process_id.h"
+#include "mongo/stdx/variant.h"
+#include "mongo/util/net/cidr.h"
namespace mongo {
@@ -79,6 +81,8 @@ struct ServerGlobalParams {
std::string serviceExecutor;
size_t maxConns = DEFAULT_MAX_CONN; // Maximum number of simultaneous open connections.
+ std::vector<stdx::variant<CIDR, std::string>> maxConnsOverride;
+ int reservedAdminThreads = 0;
int unixSocketPermissions = DEFAULT_UNIX_PERMS; // permissions for the UNIX domain socket
diff --git a/src/mongo/db/server_options_server_helpers.cpp b/src/mongo/db/server_options_server_helpers.cpp
index 9342ae91302..bae75613bdd 100644
--- a/src/mongo/db/server_options_server_helpers.cpp
+++ b/src/mongo/db/server_options_server_helpers.cpp
@@ -120,6 +120,24 @@ Status addGeneralServerOptions(moe::OptionSection* options) {
"net.maxIncomingConnections", "maxConns", moe::Int, maxConnInfoBuilder.str().c_str());
options
+ ->addOptionChaining(
+ "net.maxIncomingConnectionsOverride",
+ "",
+ moe::StringVector,
+ "CIDR ranges that do not count towards the maxIncomingConnections limit")
+ .hidden()
+ .setSources(moe::SourceYAMLConfig);
+
+ options
+ ->addOptionChaining(
+ "net.reservedAdminThreads",
+ "",
+ moe::Int,
+ "number of worker threads to reserve for admin and internal connections")
+ .hidden()
+ .setSources(moe::SourceYAMLConfig);
+
+ options
->addOptionChaining("net.transportLayer",
"transportLayer",
moe::String,
@@ -583,6 +601,22 @@ Status storeServerOptions(const moe::Environment& params) {
}
}
+ if (params.count("net.maxIncomingConnectionsOverride")) {
+ auto ranges = params["net.maxIncomingConnectionsOverride"].as<std::vector<std::string>>();
+ for (const auto& range : ranges) {
+ auto swr = CIDR::parse(range);
+ if (!swr.isOK()) {
+ serverGlobalParams.maxConnsOverride.push_back(range);
+ } else {
+ serverGlobalParams.maxConnsOverride.push_back(std::move(swr.getValue()));
+ }
+ }
+ }
+
+ if (params.count("net.reservedAdminThreads")) {
+ serverGlobalParams.reservedAdminThreads = params["net.reservedAdminThreads"].as<int>();
+ }
+
if (params.count("net.wireObjectCheck")) {
serverGlobalParams.objcheck = params["net.wireObjectCheck"].as<bool>();
}
diff --git a/src/mongo/embedded/service_entry_point_embedded.cpp b/src/mongo/embedded/service_entry_point_embedded.cpp
index 3efc2848b80..a1f6acc7382 100644
--- a/src/mongo/embedded/service_entry_point_embedded.cpp
+++ b/src/mongo/embedded/service_entry_point_embedded.cpp
@@ -75,11 +75,15 @@ void ServiceEntryPointEmbedded::startSession(transport::SessionHandle session) {
void ServiceEntryPointEmbedded::endAllSessions(transport::Session::TagMask tags) {}
+Status ServiceEntryPointEmbedded::start() {
+ UASSERT_NOT_IMPLEMENTED;
+}
+
bool ServiceEntryPointEmbedded::shutdown(Milliseconds timeout) {
UASSERT_NOT_IMPLEMENTED;
}
-ServiceEntryPoint::Stats ServiceEntryPointEmbedded::sessionStats() const {
+void ServiceEntryPointEmbedded::appendStats(BSONObjBuilder*) const {
UASSERT_NOT_IMPLEMENTED;
}
diff --git a/src/mongo/embedded/service_entry_point_embedded.h b/src/mongo/embedded/service_entry_point_embedded.h
index ba6ed39ce6d..eea4e48b86e 100644
--- a/src/mongo/embedded/service_entry_point_embedded.h
+++ b/src/mongo/embedded/service_entry_point_embedded.h
@@ -45,8 +45,9 @@ public:
void startSession(transport::SessionHandle session) override;
void endAllSessions(transport::Session::TagMask tags) override;
+ Status start() override;
bool shutdown(Milliseconds timeout) override;
- Stats sessionStats() const override;
+ void appendStats(BSONObjBuilder* bob) const override;
size_t numOpenSessions() const override;
private:
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index 0dc64211d36..0ac5e2c17e1 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -446,6 +446,12 @@ ExitCode runMongosServer(ServiceContext* serviceContext) {
return EXIT_NET_ERROR;
}
+ status = serviceContext->getServiceEntryPoint()->start();
+ if (!status.isOK()) {
+ error() << "Failed to start the service entry point: " << redact(status);
+ return EXIT_NET_ERROR;
+ }
+
status = serviceContext->getTransportLayer()->start();
if (!status.isOK()) {
error() << "Failed to start the transport layer: " << redact(status);
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index 2990aede9ea..87b3463ec9c 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -113,6 +113,7 @@ tlEnv.Library(
target='service_executor',
source=[
'service_executor_adaptive.cpp',
+ 'service_executor_reserved.cpp',
'service_executor_synchronous.cpp',
'thread_idle_callback.cpp',
],
@@ -169,6 +170,7 @@ env.Library(
'$BUILD_DIR/mongo/db/stats/counters',
'$BUILD_DIR/mongo/rpc/protocol',
'$BUILD_DIR/mongo/util/processinfo',
+ 'service_executor',
'transport_layer_common',
],
LIBDEPS_PRIVATE=[
@@ -177,6 +179,18 @@ env.Library(
)
env.CppUnitTest(
+ target='max_conns_override_test',
+ source=[
+ 'max_conns_override_test.cpp',
+ ],
+ LIBDEPS=[
+ 'service_entry_point',
+ 'transport_layer_mock',
+ '$BUILD_DIR/mongo/unittest/unittest'
+ ],
+)
+
+env.CppUnitTest(
target='service_state_machine_test',
source=[
'service_state_machine_test.cpp',
diff --git a/src/mongo/transport/max_conns_override_test.cpp b/src/mongo/transport/max_conns_override_test.cpp
new file mode 100644
index 00000000000..ea8ef0efdbf
--- /dev/null
+++ b/src/mongo/transport/max_conns_override_test.cpp
@@ -0,0 +1,86 @@
+/* Copyright 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/transport/mock_session.h"
+#include "mongo/transport/service_entry_point_impl.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+using ExemptionVector = std::vector<stdx::variant<CIDR, std::string>>;
+
+template <typename T>
+stdx::variant<CIDR, std::string> makeExemption(T exemption) {
+ auto swCIDR = CIDR::parse(exemption);
+ if (swCIDR.isOK()) {
+ return swCIDR.getValue();
+ } else {
+ return std::string{exemption};
+ }
+}
+
+transport::SessionHandle makeIPSession(StringData ip) {
+ return transport::MockSession::create(
+ HostAndPort(SockAddr(ip, 27017, AF_INET)), HostAndPort(), nullptr);
+}
+
+#ifndef _WIN32
+transport::SessionHandle makeUNIXSession(StringData path) {
+ return transport::MockSession::create(HostAndPort(SockAddr(""_sd, -1, AF_UNIX)),
+ HostAndPort(SockAddr(path, -1, AF_UNIX)),
+ nullptr);
+}
+#endif
+
+TEST(MaxConnsOverride, NormalCIDR) {
+ ExemptionVector cidrOnly{makeExemption("127.0.0.1"), makeExemption("10.0.0.0/24")};
+
+ ASSERT_TRUE(shouldOverrideMaxConns(makeIPSession("127.0.0.1"), cidrOnly));
+ ASSERT_TRUE(shouldOverrideMaxConns(makeIPSession("10.0.0.35"), cidrOnly));
+ ASSERT_FALSE(shouldOverrideMaxConns(makeIPSession("192.168.0.53"), cidrOnly));
+}
+
+#ifndef _WIN32
+TEST(MaxConnsOverride, UNIXPaths) {
+ ExemptionVector mixed{makeExemption("127.0.0.1"),
+ makeExemption("10.0.0.0/24"),
+ makeExemption("/tmp/mongod.sock")};
+
+ ASSERT_TRUE(shouldOverrideMaxConns(makeIPSession("127.0.0.1"), mixed));
+ ASSERT_TRUE(shouldOverrideMaxConns(makeIPSession("10.0.0.35"), mixed));
+ ASSERT_FALSE(shouldOverrideMaxConns(makeIPSession("192.168.0.53"), mixed));
+ ASSERT_FALSE(shouldOverrideMaxConns(makeUNIXSession("/tmp/mongod.sock"), mixed));
+ ASSERT_TRUE(shouldOverrideMaxConns(makeUNIXSession("/tmp/mongod.sock"), mixed));
+ ASSERT_FALSE(shouldOverrideMaxConns(makeUNIXSession("/tmp/other-mongod.sock"), mixed));
+}
+#endif
+
+} // namespace
+} // namespace
diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h
index e22135bdaf1..abb9c225843 100644
--- a/src/mongo/transport/service_entry_point.h
+++ b/src/mongo/transport/service_entry_point.h
@@ -29,6 +29,7 @@
#pragma once
#include "mongo/base/disallow_copying.h"
+#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/dbmessage.h"
#include "mongo/transport/session.h"
@@ -45,29 +46,6 @@ class ServiceEntryPoint {
MONGO_DISALLOW_COPYING(ServiceEntryPoint);
public:
- /**
- * Stats for sessions open.
- */
- struct Stats {
- /**
- * Returns the number of sessions currently open.
- */
- size_t numOpenSessions = 0;
-
- /**
- * Returns the total number of sessions that have ever been created.
- */
- size_t numCreatedSessions = 0;
-
- /**
- * Returns the number of available sessions we could still open. Only relevant
- * when we are operating under a transport::Session limit (for example, in the
- * legacy implementation, we respect a maximum number of connections). If there
- * is no session limit, returns std::numeric_limits<int>::max().
- */
- size_t numAvailableSessions = 0;
- };
-
virtual ~ServiceEntryPoint() = default;
/**
@@ -81,14 +59,19 @@ public:
virtual void endAllSessions(transport::Session::TagMask tags) = 0;
/**
+ * Starts the service entry point
+ */
+ virtual Status start() = 0;
+
+ /**
* Shuts down the service entry point.
*/
virtual bool shutdown(Milliseconds timeout) = 0;
/**
- * Returns high-level stats about current sessions.
- */
- virtual Stats sessionStats() const = 0;
+ * Append high-level stats to a BSONObjBuilder for serverStatus
+ */
+ virtual void appendStats(BSONObjBuilder* bob) const = 0;
/**
* Returns the number of sessions currently open.
diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp
index adbdc48a6ec..0113d1bdfc2 100644
--- a/src/mongo/transport/service_entry_point_impl.cpp
+++ b/src/mongo/transport/service_entry_point_impl.cpp
@@ -46,6 +46,38 @@
#endif
namespace mongo {
+
+bool shouldOverrideMaxConns(const transport::SessionHandle& session,
+ const std::vector<stdx::variant<CIDR, std::string>>& exemptions) {
+ const auto& remoteAddr = session->remote().sockAddr();
+ const auto& localAddr = session->local().sockAddr();
+ boost::optional<CIDR> remoteCIDR;
+
+ if (remoteAddr && remoteAddr->isIP()) {
+ remoteCIDR = uassertStatusOK(CIDR::parse(remoteAddr->getAddr()));
+ }
+ for (const auto& exemption : exemptions) {
+ // If this exemption is a CIDR range, then we check that the remote IP is in the
+ // CIDR range
+ if ((stdx::holds_alternative<CIDR>(exemption)) && (remoteCIDR)) {
+ if (stdx::get<CIDR>(exemption).contains(*remoteCIDR)) {
+ return true;
+ }
+// Otherwise the exemption is a UNIX path and we should check the local path
+// (the remoteAddr == "anonymous unix socket") against the exemption string
+//
+// On Windows we don't check this at all and only CIDR ranges are supported
+#ifndef _WIN32
+ } else if ((stdx::holds_alternative<std::string>(exemption)) && (localAddr) &&
+ (localAddr->getAddr() == stdx::get<std::string>(exemption))) {
+ return true;
+#endif
+ }
+ }
+
+ return false;
+}
+
ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(svcCtx) {
const auto supportedMax = [] {
@@ -71,6 +103,18 @@ ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(s
}
_maxNumConnections = supportedMax;
+
+ if (serverGlobalParams.reservedAdminThreads) {
+ _adminInternalPool = std::make_unique<transport::ServiceExecutorReserved>(
+ _svcCtx, "admin/internal connections", serverGlobalParams.reservedAdminThreads);
+ }
+}
+
+Status ServiceEntryPointImpl::start() {
+ if (_adminInternalPool)
+ return _adminInternalPool->start();
+ else
+ return Status::OK();
}
void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
@@ -89,10 +133,16 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
auto transportMode = _svcCtx->getServiceExecutor()->transportMode();
auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode);
+ auto usingMaxConnOverride = false;
{
stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex);
connectionCount = _sessions.size() + 1;
- if (connectionCount <= _maxNumConnections) {
+ if (connectionCount > _maxNumConnections) {
+ usingMaxConnOverride =
+ shouldOverrideMaxConns(session, serverGlobalParams.maxConnsOverride);
+ }
+
+ if (connectionCount <= _maxNumConnections || usingMaxConnOverride) {
ssmIt = _sessions.emplace(_sessions.begin(), ssm);
_currentConnections.store(connectionCount);
_createdConnections.addAndFetch(1);
@@ -101,11 +151,13 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
// Checking if we successfully added a connection above. Separated from the lock so we don't log
// while holding it.
- if (connectionCount > _maxNumConnections) {
+ if (connectionCount > _maxNumConnections && !usingMaxConnOverride) {
if (!quiet) {
log() << "connection refused because too many open connections: " << connectionCount;
}
return;
+ } else if (usingMaxConnOverride && _adminInternalPool) {
+ ssm->setServiceExecutor(_adminInternalPool.get());
}
if (!quiet) {
@@ -183,15 +235,18 @@ bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) {
return result;
}
-ServiceEntryPoint::Stats ServiceEntryPointImpl::sessionStats() const {
+void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const {
size_t sessionCount = _currentConnections.load();
- ServiceEntryPoint::Stats ret;
- ret.numOpenSessions = sessionCount;
- ret.numCreatedSessions = _createdConnections.load();
- ret.numAvailableSessions = _maxNumConnections - sessionCount;
- return ret;
+ bob->append("current", static_cast<int>(sessionCount));
+ bob->append("available", static_cast<int>(_maxNumConnections - sessionCount));
+ bob->append("totalCreated", static_cast<int>(_createdConnections.load()));
+
+ if (_adminInternalPool) {
+ BSONObjBuilder section(bob->subobjStart("adminConnections"));
+ _adminInternalPool->appendStats(&section);
+ }
}
} // namespace mongo
diff --git a/src/mongo/transport/service_entry_point_impl.h b/src/mongo/transport/service_entry_point_impl.h
index 6f47ccedccb..6181885c953 100644
--- a/src/mongo/transport/service_entry_point_impl.h
+++ b/src/mongo/transport/service_entry_point_impl.h
@@ -34,8 +34,11 @@
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/list.h"
#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/variant.h"
#include "mongo/transport/service_entry_point.h"
+#include "mongo/transport/service_executor_reserved.h"
#include "mongo/transport/service_state_machine.h"
+#include "mongo/util/net/cidr.h"
namespace mongo {
class ServiceContext;
@@ -61,9 +64,10 @@ public:
void endAllSessions(transport::Session::TagMask tags) final;
+ Status start() final;
bool shutdown(Milliseconds timeout) final;
- Stats sessionStats() const final;
+ void appendStats(BSONObjBuilder* bob) const final;
size_t numOpenSessions() const final {
return _currentConnections.load();
@@ -83,6 +87,14 @@ private:
size_t _maxNumConnections{DEFAULT_MAX_CONN};
AtomicWord<size_t> _currentConnections{0};
AtomicWord<size_t> _createdConnections{0};
+
+ std::unique_ptr<transport::ServiceExecutorReserved> _adminInternalPool;
};
+/*
+ * Returns true if a session with remote/local addresses should be exempted from maxConns
+ */
+bool shouldOverrideMaxConns(const transport::SessionHandle& session,
+ const std::vector<stdx::variant<CIDR, std::string>>& exemptions);
+
} // namespace mongo
diff --git a/src/mongo/transport/service_executor_adaptive.cpp b/src/mongo/transport/service_executor_adaptive.cpp
index d0014485cc4..5d928bd0520 100644
--- a/src/mongo/transport/service_executor_adaptive.cpp
+++ b/src/mongo/transport/service_executor_adaptive.cpp
@@ -664,20 +664,19 @@ StringData ServiceExecutorAdaptive::_threadStartedByToString(
void ServiceExecutorAdaptive::appendStats(BSONObjBuilder* bob) const {
stdx::unique_lock<stdx::mutex> lk(_threadsMutex);
- BSONObjBuilder section(bob->subobjStart("serviceExecutorTaskStats"));
- section << kExecutorLabel << kExecutorName //
- << kTotalQueued << _totalQueued.load() //
- << kTotalExecuted << _totalExecuted.load() //
- << kThreadsInUse << _threadsInUse.load() //
- << kTotalTimeRunningUs //
- << ticksToMicros(_getThreadTimerTotal(ThreadTimer::kRunning, lk), _tickSource) //
- << kTotalTimeExecutingUs //
- << ticksToMicros(_getThreadTimerTotal(ThreadTimer::kExecuting, lk), _tickSource) //
- << kTotalTimeQueuedUs << ticksToMicros(_totalSpentQueued.load(), _tickSource) //
- << kThreadsRunning << _threadsRunning.load() //
- << kThreadsPending << _threadsPending.load();
-
- BSONObjBuilder threadStartReasons(section.subobjStart(kThreadReasons));
+ *bob << kExecutorLabel << kExecutorName //
+ << kTotalQueued << _totalQueued.load() //
+ << kTotalExecuted << _totalExecuted.load() //
+ << kThreadsInUse << _threadsInUse.load() //
+ << kTotalTimeRunningUs //
+ << ticksToMicros(_getThreadTimerTotal(ThreadTimer::kRunning, lk), _tickSource) //
+ << kTotalTimeExecutingUs //
+ << ticksToMicros(_getThreadTimerTotal(ThreadTimer::kExecuting, lk), _tickSource) //
+ << kTotalTimeQueuedUs << ticksToMicros(_totalSpentQueued.load(), _tickSource) //
+ << kThreadsRunning << _threadsRunning.load() //
+ << kThreadsPending << _threadsPending.load();
+
+ BSONObjBuilder threadStartReasons(bob->subobjStart(kThreadReasons));
for (size_t i = 0; i < _threadStartCounters.size(); i++) {
threadStartReasons << _threadStartedByToString(static_cast<ThreadCreationReason>(i))
<< _threadStartCounters[i];
@@ -685,7 +684,7 @@ void ServiceExecutorAdaptive::appendStats(BSONObjBuilder* bob) const {
threadStartReasons.doneFast();
- BSONObjBuilder metricsByTask(section.subobjStart("metricsByTask"));
+ BSONObjBuilder metricsByTask(bob->subobjStart("metricsByTask"));
MetricsArray totalMetrics;
_accumulateAllTaskMetrics(&totalMetrics, lk);
lk.unlock();
@@ -703,7 +702,6 @@ void ServiceExecutorAdaptive::appendStats(BSONObjBuilder* bob) const {
subSection.doneFast();
}
metricsByTask.doneFast();
- section.doneFast();
}
} // namespace transport
diff --git a/src/mongo/transport/service_executor_reserved.cpp b/src/mongo/transport/service_executor_reserved.cpp
new file mode 100644
index 00000000000..7d8e39c5a7d
--- /dev/null
+++ b/src/mongo/transport/service_executor_reserved.cpp
@@ -0,0 +1,212 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor;
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/transport/service_executor_reserved.h"
+
+#include "mongo/db/server_parameters.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/transport/service_entry_point_utils.h"
+#include "mongo/transport/service_executor_task_names.h"
+#include "mongo/transport/thread_idle_callback.h"
+#include "mongo/util/log.h"
+#include "mongo/util/processinfo.h"
+
+namespace mongo {
+namespace transport {
+namespace {
+
+// Tasks scheduled with MayRecurse may be called recursively if the recursion depth is below this
+// value.
+MONGO_EXPORT_SERVER_PARAMETER(reservedServiceExecutorRecursionLimit, int, 8);
+
+constexpr auto kThreadsRunning = "threadsRunning"_sd;
+constexpr auto kExecutorLabel = "executor"_sd;
+constexpr auto kExecutorName = "reserved"_sd;
+constexpr auto kReadyThreads = "readyThreads"_sd;
+constexpr auto kStartingThreads = "startingThreads"_sd;
+} // namespace
+
+thread_local std::deque<ServiceExecutor::Task> ServiceExecutorReserved::_localWorkQueue = {};
+thread_local int ServiceExecutorReserved::_localRecursionDepth = 0;
+thread_local int64_t ServiceExecutorReserved::_localThreadIdleCounter = 0;
+
+ServiceExecutorReserved::ServiceExecutorReserved(ServiceContext* ctx,
+ std::string name,
+ size_t reservedThreads)
+ : _name(std::move(name)), _reservedThreads(reservedThreads) {}
+
+Status ServiceExecutorReserved::start() {
+ {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _stillRunning.store(true);
+ _numStartingThreads = _reservedThreads;
+ }
+
+ for (size_t i = 0; i < _reservedThreads; i++) {
+ auto status = _startWorker();
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+
+ return Status::OK();
+}
+
+Status ServiceExecutorReserved::_startWorker() {
+ log() << "Starting new worker thread for " << _name << " service executor";
+ return launchServiceWorkerThread([this] {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _numRunningWorkerThreads.addAndFetch(1);
+ auto numRunningGuard = MakeGuard([&] {
+ _numRunningWorkerThreads.subtractAndFetch(1);
+ _shutdownCondition.notify_one();
+ });
+
+ _numStartingThreads--;
+ _numReadyThreads++;
+
+ while (_stillRunning.load()) {
+ _threadWakeup.wait(lk, [&] { return (!_stillRunning.load() || !_readyTasks.empty()); });
+
+ if (!_stillRunning.loadRelaxed()) {
+ break;
+ }
+
+ if (_readyTasks.empty()) {
+ continue;
+ }
+
+ auto task = std::move(_readyTasks.front());
+ _readyTasks.pop_front();
+ _numReadyThreads -= 1;
+ bool launchReplacement = false;
+ if (_numReadyThreads + _numStartingThreads < _reservedThreads) {
+ _numStartingThreads++;
+ launchReplacement = true;
+ }
+
+ lk.unlock();
+
+ if (launchReplacement) {
+ auto threadStartStatus = _startWorker();
+ if (!threadStartStatus.isOK()) {
+ warning() << "Could not start new reserve worker thread: " << threadStartStatus;
+ }
+ }
+
+ _localWorkQueue.emplace_back(std::move(task));
+ while (!_localWorkQueue.empty() && _stillRunning.loadRelaxed()) {
+ _localRecursionDepth = 1;
+ _localWorkQueue.front()();
+ _localWorkQueue.pop_front();
+ }
+
+ lk.lock();
+ if (_numReadyThreads + 1 > _reservedThreads) {
+ break;
+ } else {
+ _numReadyThreads += 1;
+ }
+ }
+
+ LOG(3) << "Exiting worker thread in " << _name << " service executor";
+ });
+}
+
+
+Status ServiceExecutorReserved::shutdown(Milliseconds timeout) {
+ LOG(3) << "Shutting down reserved executor";
+
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ _stillRunning.store(false);
+ _threadWakeup.notify_all();
+
+ bool result = _shutdownCondition.wait_for(lock, timeout.toSystemDuration(), [this]() {
+ return _numRunningWorkerThreads.load() == 0;
+ });
+
+ return result
+ ? Status::OK()
+ : Status(ErrorCodes::Error::ExceededTimeLimit,
+ "reserved executor couldn't shutdown all worker threads within time limit.");
+}
+
+Status ServiceExecutorReserved::schedule(Task task,
+ ScheduleFlags flags,
+ ServiceExecutorTaskName taskName) {
+ if (!_stillRunning.load()) {
+ return Status{ErrorCodes::ShutdownInProgress, "Executor is not running"};
+ }
+
+ if (!_localWorkQueue.empty()) {
+ /*
+ * In perf testing we found that yielding after running a each request produced
+ * at 5% performance boost in microbenchmarks if the number of worker threads
+ * was greater than the number of available cores.
+ */
+ if (flags & ScheduleFlags::kMayYieldBeforeSchedule) {
+ if ((_localThreadIdleCounter++ & 0xf) == 0) {
+ markThreadIdle();
+ }
+ }
+
+ // Execute task directly (recurse) if allowed by the caller as it produced better
+ // performance in testing. Try to limit the amount of recursion so we don't blow up the
+ // stack, even though this shouldn't happen with this executor that uses blocking network
+ // I/O.
+ if ((flags & ScheduleFlags::kMayRecurse) &&
+ (_localRecursionDepth < reservedServiceExecutorRecursionLimit.loadRelaxed())) {
+ ++_localRecursionDepth;
+ task();
+ } else {
+ _localWorkQueue.emplace_back(std::move(task));
+ }
+ return Status::OK();
+ }
+
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _readyTasks.push_back(std::move(task));
+ _threadWakeup.notify_one();
+
+ return Status::OK();
+}
+
+void ServiceExecutorReserved::appendStats(BSONObjBuilder* bob) const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ *bob << kExecutorLabel << kExecutorName << kThreadsRunning
+ << static_cast<int>(_numRunningWorkerThreads.loadRelaxed()) << kReadyThreads
+ << static_cast<int>(_numReadyThreads) << kStartingThreads
+ << static_cast<int>(_numStartingThreads);
+}
+
+} // namespace transport
+} // namespace mongo
diff --git a/src/mongo/transport/service_executor_reserved.h b/src/mongo/transport/service_executor_reserved.h
new file mode 100644
index 00000000000..235e86e5930
--- /dev/null
+++ b/src/mongo/transport/service_executor_reserved.h
@@ -0,0 +1,91 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <deque>
+
+#include "mongo/base/status.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/transport/service_executor.h"
+#include "mongo/transport/service_executor_task_names.h"
+
+namespace mongo {
+namespace transport {
+
+/**
+ * The reserved service executor emulates a thread per connection.
+ * Each connection has its own worker thread where jobs get scheduled.
+ *
+ * The executor will start reservedThreads on start, and create a new thread every time it
+ * starts a new thread, ensuring there are always reservedThreads available for work - this
+ * means that even when you hit the NPROC ulimit, there will still be threads ready to
+ * accept work. When threads exit, they will go back to waiting for work if there are fewer
+ * than reservedThreads available.
+ */
+class ServiceExecutorReserved final : public ServiceExecutor {
+public:
+ explicit ServiceExecutorReserved(ServiceContext* ctx, std::string name, size_t reservedThreads);
+
+ Status start() override;
+ Status shutdown(Milliseconds timeout) override;
+ Status schedule(Task task, ScheduleFlags flags, ServiceExecutorTaskName taskName) override;
+
+ Mode transportMode() const override {
+ return Mode::kSynchronous;
+ }
+
+ void appendStats(BSONObjBuilder* bob) const override;
+
+private:
+ Status _startWorker();
+
+ static thread_local std::deque<Task> _localWorkQueue;
+ static thread_local int _localRecursionDepth;
+ static thread_local int64_t _localThreadIdleCounter;
+
+ AtomicBool _stillRunning{false};
+
+ mutable stdx::mutex _mutex;
+ stdx::condition_variable _threadWakeup;
+ stdx::condition_variable _shutdownCondition;
+
+ std::deque<Task> _readyTasks;
+
+ AtomicUInt32 _numRunningWorkerThreads{0};
+ size_t _numReadyThreads{0};
+ size_t _numStartingThreads{0};
+
+ const std::string _name;
+ const size_t _reservedThreads;
+};
+
+} // namespace transport
+} // namespace mongo
diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp
index a3bd83a0f06..7b1b6509537 100644
--- a/src/mongo/transport/service_executor_synchronous.cpp
+++ b/src/mongo/transport/service_executor_synchronous.cpp
@@ -142,9 +142,8 @@ Status ServiceExecutorSynchronous::schedule(Task task,
}
void ServiceExecutorSynchronous::appendStats(BSONObjBuilder* bob) const {
- BSONObjBuilder section(bob->subobjStart("serviceExecutorTaskStats"));
- section << kExecutorLabel << kExecutorName << kThreadsRunning
- << static_cast<int>(_numRunningWorkerThreads.loadRelaxed());
+ *bob << kExecutorLabel << kExecutorName << kThreadsRunning
+ << static_cast<int>(_numRunningWorkerThreads.loadRelaxed());
}
} // namespace transport
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index 0ecbfcca6d1..83eb411c7b6 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -226,6 +226,7 @@ ServiceStateMachine::ServiceStateMachine(ServiceContext* svcContext,
_sep{svcContext->getServiceEntryPoint()},
_transportMode(transportMode),
_serviceContext(svcContext),
+ _serviceExecutor(_serviceContext->getServiceExecutor()),
_sessionHandle(session),
_threadName{str::stream() << "conn" << _session()->id()},
_dbClient{svcContext->makeClient(_threadName, std::move(session))},
@@ -469,6 +470,10 @@ void ServiceStateMachine::start(Ownership ownershipModel) {
ownershipModel);
}
+void ServiceStateMachine::setServiceExecutor(ServiceExecutor* executor) {
+ _serviceExecutor = executor;
+}
+
void ServiceStateMachine::_scheduleNextWithGuard(ThreadGuard guard,
transport::ServiceExecutor::ScheduleFlags flags,
transport::ServiceExecutorTaskName taskName,
@@ -480,8 +485,7 @@ void ServiceStateMachine::_scheduleNextWithGuard(ThreadGuard guard,
ssm->_runNextInGuard(std::move(guard));
};
guard.release();
- Status status =
- _serviceContext->getServiceExecutor()->schedule(std::move(func), flags, taskName);
+ Status status = _serviceExecutor->schedule(std::move(func), flags, taskName);
if (status.isOK()) {
return;
}
diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h
index bac5417dd67..ee54eb99cc8 100644
--- a/src/mongo/transport/service_state_machine.h
+++ b/src/mongo/transport/service_state_machine.h
@@ -126,6 +126,12 @@ public:
void start(Ownership ownershipModel);
/*
+ * Set the executor to be used for the next call to runNext(). This allows switching between
+ * thread models after the SSM has started.
+ */
+ void setServiceExecutor(transport::ServiceExecutor* executor);
+
+ /*
* Gets the current state of connection for testing/diagnostic purposes.
*/
State state();
@@ -219,6 +225,7 @@ private:
transport::Mode _transportMode;
ServiceContext* const _serviceContext;
+ transport::ServiceExecutor* _serviceExecutor;
transport::SessionHandle _sessionHandle;
const std::string _threadName;
diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp
index e29c33627a6..6d37d63b165 100644
--- a/src/mongo/transport/service_state_machine_test.cpp
+++ b/src/mongo/transport/service_state_machine_test.cpp
@@ -84,13 +84,15 @@ public:
void endAllSessions(transport::Session::TagMask tags) override {}
+ Status start() override {
+ return Status::OK();
+ }
+
bool shutdown(Milliseconds timeout) override {
return true;
}
- Stats sessionStats() const override {
- return {};
- }
+ void appendStats(BSONObjBuilder*) const override {}
size_t numOpenSessions() const override {
return 0ULL;
diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp
index b9ff5580dc4..abf49b48690 100644
--- a/src/mongo/transport/transport_layer_asio_test.cpp
+++ b/src/mongo/transport/transport_layer_asio_test.cpp
@@ -63,13 +63,15 @@ public:
old_sessions.clear();
}
+ Status start() override {
+ return Status::OK();
+ }
+
bool shutdown(Milliseconds timeout) override {
return true;
}
- Stats sessionStats() const override {
- return {};
- }
+ void appendStats(BSONObjBuilder*) const override {}
size_t numOpenSessions() const override {
stdx::unique_lock<stdx::mutex> lock(_mutex);
@@ -169,10 +171,12 @@ public:
return true;
}
- Stats sessionStats() const override {
- return {};
+ Status start() override {
+ return Status::OK();
}
+ void appendStats(BSONObjBuilder*) const override {}
+
size_t numOpenSessions() const override {
return 0;
}
diff --git a/src/mongo/util/net/sockaddr.cpp b/src/mongo/util/net/sockaddr.cpp
index 0cf024b6f44..00470af3ea6 100644
--- a/src/mongo/util/net/sockaddr.cpp
+++ b/src/mongo/util/net/sockaddr.cpp
@@ -133,7 +133,7 @@ SockAddr::SockAddr(StringData target, int port, sa_family_t familyHint)
_hostOrIp = "127.0.0.1";
}
- if (mongoutils::str::contains(_hostOrIp, '/')) {
+ if (mongoutils::str::contains(_hostOrIp, '/') || familyHint == AF_UNIX) {
initUnixDomainSocket(_hostOrIp, port);
return;
}