summaryrefslogtreecommitdiff
path: root/src/mongo/transport
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2018-08-16 18:58:49 -0400
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2018-08-16 18:58:49 -0400
commit5e461534b339c957d2e6dcf16015949a496a1110 (patch)
treec688bcfa73c9b4eacbb42fa778df3ccf7e65c02c /src/mongo/transport
parent57be1f7097ceed41b156a826c541aa276f35dd61 (diff)
downloadmongo-5e461534b339c957d2e6dcf16015949a496a1110.tar.gz
Revert "SERVER-34986 Allow connections to override maxConns based on CIDR range"
This reverts commit 1a643ba65b070c167cdfdd5056c7d2ac79dd5371.
Diffstat (limited to 'src/mongo/transport')
-rw-r--r--src/mongo/transport/SConscript14
-rw-r--r--src/mongo/transport/max_conns_override_test.cpp84
-rw-r--r--src/mongo/transport/service_entry_point.h35
-rw-r--r--src/mongo/transport/service_entry_point_impl.cpp71
-rw-r--r--src/mongo/transport/service_entry_point_impl.h14
-rw-r--r--src/mongo/transport/service_executor_adaptive.cpp30
-rw-r--r--src/mongo/transport/service_executor_reserved.cpp212
-rw-r--r--src/mongo/transport/service_executor_reserved.h91
-rw-r--r--src/mongo/transport/service_executor_synchronous.cpp5
-rw-r--r--src/mongo/transport/service_state_machine.cpp8
-rw-r--r--src/mongo/transport/service_state_machine.h7
-rw-r--r--src/mongo/transport/service_state_machine_test.cpp8
-rw-r--r--src/mongo/transport/transport_layer_asio_test.cpp14
13 files changed, 64 insertions, 529 deletions
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript
index 4312fc6f05a..2990aede9ea 100644
--- a/src/mongo/transport/SConscript
+++ b/src/mongo/transport/SConscript
@@ -113,7 +113,6 @@ tlEnv.Library(
target='service_executor',
source=[
'service_executor_adaptive.cpp',
- 'service_executor_reserved.cpp',
'service_executor_synchronous.cpp',
'thread_idle_callback.cpp',
],
@@ -174,19 +173,6 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/transport/message_compressor',
- '$BUILD_DIR/mongo/transport/service_executor',
- ],
-)
-
-env.CppUnitTest(
- target='max_conns_override_test',
- source=[
- 'max_conns_override_test.cpp',
- ],
- LIBDEPS=[
- 'service_entry_point',
- 'transport_layer_mock',
- '$BUILD_DIR/mongo/unittest/unittest'
],
)
diff --git a/src/mongo/transport/max_conns_override_test.cpp b/src/mongo/transport/max_conns_override_test.cpp
deleted file mode 100644
index 195a4c030c6..00000000000
--- a/src/mongo/transport/max_conns_override_test.cpp
+++ /dev/null
@@ -1,84 +0,0 @@
-/* Copyright 2018 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#include "mongo/transport/mock_session.h"
-#include "mongo/transport/service_entry_point_impl.h"
-#include "mongo/unittest/unittest.h"
-
-namespace mongo {
-namespace {
-
-using ExemptionVector = std::vector<stdx::variant<CIDR, std::string>>;
-
-template <typename T>
-stdx::variant<CIDR, std::string> makeExemption(T exemption) {
- auto swCIDR = CIDR::parse(exemption);
- if (swCIDR.isOK()) {
- return swCIDR.getValue();
- } else {
- return std::string{exemption};
- }
-}
-
-transport::SessionHandle makeIPSession(StringData ip) {
- return transport::MockSession::create(
- HostAndPort(SockAddr(ip, 27017, AF_INET)), HostAndPort(), nullptr);
-}
-
-transport::SessionHandle makeUNIXSession(StringData path) {
- return transport::MockSession::create(HostAndPort(SockAddr(""_sd, -1, AF_UNIX)),
- HostAndPort(SockAddr(path, -1, AF_UNIX)),
- nullptr);
-}
-
-TEST(MaxConnsOverride, NormalCIDR) {
- ExemptionVector cidrOnly{makeExemption("127.0.0.1"), makeExemption("10.0.0.0/24")};
-
- ASSERT_TRUE(shouldOverrideMaxConns(makeIPSession("127.0.0.1"), cidrOnly));
- ASSERT_TRUE(shouldOverrideMaxConns(makeIPSession("10.0.0.35"), cidrOnly));
- ASSERT_FALSE(shouldOverrideMaxConns(makeIPSession("192.168.0.53"), cidrOnly));
-}
-
-TEST(MaxConnsOverride, UNIXPaths) {
- ExemptionVector mixed{makeExemption("127.0.0.1"),
- makeExemption("10.0.0.0/24"),
- makeExemption("/tmp/mongod.sock")};
-
- ASSERT_TRUE(shouldOverrideMaxConns(makeIPSession("127.0.0.1"), mixed));
- ASSERT_TRUE(shouldOverrideMaxConns(makeIPSession("10.0.0.35"), mixed));
- ASSERT_FALSE(shouldOverrideMaxConns(makeIPSession("192.168.0.53"), mixed));
-#ifdef _WIN32
- ASSERT_FALSE(shouldOverrideMaxConns(makeUNIXSession("/tmp/mongod.sock"), mixed));
-#else
- ASSERT_TRUE(shouldOverrideMaxConns(makeUNIXSession("/tmp/mongod.sock"), mixed));
-#endif
- ASSERT_FALSE(shouldOverrideMaxConns(makeUNIXSession("/tmp/other-mongod.sock"), mixed));
-}
-
-
-} // namespace
-} // namespace
diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h
index abb9c225843..e22135bdaf1 100644
--- a/src/mongo/transport/service_entry_point.h
+++ b/src/mongo/transport/service_entry_point.h
@@ -29,7 +29,6 @@
#pragma once
#include "mongo/base/disallow_copying.h"
-#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/dbmessage.h"
#include "mongo/transport/session.h"
@@ -46,6 +45,29 @@ class ServiceEntryPoint {
MONGO_DISALLOW_COPYING(ServiceEntryPoint);
public:
+ /**
+ * Stats for sessions open.
+ */
+ struct Stats {
+ /**
+ * Returns the number of sessions currently open.
+ */
+ size_t numOpenSessions = 0;
+
+ /**
+ * Returns the total number of sessions that have ever been created.
+ */
+ size_t numCreatedSessions = 0;
+
+ /**
+ * Returns the number of available sessions we could still open. Only relevant
+ * when we are operating under a transport::Session limit (for example, in the
+ * legacy implementation, we respect a maximum number of connections). If there
+ * is no session limit, returns std::numeric_limits<int>::max().
+ */
+ size_t numAvailableSessions = 0;
+ };
+
virtual ~ServiceEntryPoint() = default;
/**
@@ -59,19 +81,14 @@ public:
virtual void endAllSessions(transport::Session::TagMask tags) = 0;
/**
- * Starts the service entry point
- */
- virtual Status start() = 0;
-
- /**
* Shuts down the service entry point.
*/
virtual bool shutdown(Milliseconds timeout) = 0;
/**
- * Append high-level stats to a BSONObjBuilder for serverStatus
- */
- virtual void appendStats(BSONObjBuilder* bob) const = 0;
+ * Returns high-level stats about current sessions.
+ */
+ virtual Stats sessionStats() const = 0;
/**
* Returns the number of sessions currently open.
diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp
index 0113d1bdfc2..adbdc48a6ec 100644
--- a/src/mongo/transport/service_entry_point_impl.cpp
+++ b/src/mongo/transport/service_entry_point_impl.cpp
@@ -46,38 +46,6 @@
#endif
namespace mongo {
-
-bool shouldOverrideMaxConns(const transport::SessionHandle& session,
- const std::vector<stdx::variant<CIDR, std::string>>& exemptions) {
- const auto& remoteAddr = session->remote().sockAddr();
- const auto& localAddr = session->local().sockAddr();
- boost::optional<CIDR> remoteCIDR;
-
- if (remoteAddr && remoteAddr->isIP()) {
- remoteCIDR = uassertStatusOK(CIDR::parse(remoteAddr->getAddr()));
- }
- for (const auto& exemption : exemptions) {
- // If this exemption is a CIDR range, then we check that the remote IP is in the
- // CIDR range
- if ((stdx::holds_alternative<CIDR>(exemption)) && (remoteCIDR)) {
- if (stdx::get<CIDR>(exemption).contains(*remoteCIDR)) {
- return true;
- }
-// Otherwise the exemption is a UNIX path and we should check the local path
-// (the remoteAddr == "anonymous unix socket") against the exemption string
-//
-// On Windows we don't check this at all and only CIDR ranges are supported
-#ifndef _WIN32
- } else if ((stdx::holds_alternative<std::string>(exemption)) && (localAddr) &&
- (localAddr->getAddr() == stdx::get<std::string>(exemption))) {
- return true;
-#endif
- }
- }
-
- return false;
-}
-
ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(svcCtx) {
const auto supportedMax = [] {
@@ -103,18 +71,6 @@ ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(s
}
_maxNumConnections = supportedMax;
-
- if (serverGlobalParams.reservedAdminThreads) {
- _adminInternalPool = std::make_unique<transport::ServiceExecutorReserved>(
- _svcCtx, "admin/internal connections", serverGlobalParams.reservedAdminThreads);
- }
-}
-
-Status ServiceEntryPointImpl::start() {
- if (_adminInternalPool)
- return _adminInternalPool->start();
- else
- return Status::OK();
}
void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
@@ -133,16 +89,10 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
auto transportMode = _svcCtx->getServiceExecutor()->transportMode();
auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode);
- auto usingMaxConnOverride = false;
{
stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex);
connectionCount = _sessions.size() + 1;
- if (connectionCount > _maxNumConnections) {
- usingMaxConnOverride =
- shouldOverrideMaxConns(session, serverGlobalParams.maxConnsOverride);
- }
-
- if (connectionCount <= _maxNumConnections || usingMaxConnOverride) {
+ if (connectionCount <= _maxNumConnections) {
ssmIt = _sessions.emplace(_sessions.begin(), ssm);
_currentConnections.store(connectionCount);
_createdConnections.addAndFetch(1);
@@ -151,13 +101,11 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) {
// Checking if we successfully added a connection above. Separated from the lock so we don't log
// while holding it.
- if (connectionCount > _maxNumConnections && !usingMaxConnOverride) {
+ if (connectionCount > _maxNumConnections) {
if (!quiet) {
log() << "connection refused because too many open connections: " << connectionCount;
}
return;
- } else if (usingMaxConnOverride && _adminInternalPool) {
- ssm->setServiceExecutor(_adminInternalPool.get());
}
if (!quiet) {
@@ -235,18 +183,15 @@ bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) {
return result;
}
-void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const {
+ServiceEntryPoint::Stats ServiceEntryPointImpl::sessionStats() const {
size_t sessionCount = _currentConnections.load();
- bob->append("current", static_cast<int>(sessionCount));
- bob->append("available", static_cast<int>(_maxNumConnections - sessionCount));
- bob->append("totalCreated", static_cast<int>(_createdConnections.load()));
-
- if (_adminInternalPool) {
- BSONObjBuilder section(bob->subobjStart("adminConnections"));
- _adminInternalPool->appendStats(&section);
- }
+ ServiceEntryPoint::Stats ret;
+ ret.numOpenSessions = sessionCount;
+ ret.numCreatedSessions = _createdConnections.load();
+ ret.numAvailableSessions = _maxNumConnections - sessionCount;
+ return ret;
}
} // namespace mongo
diff --git a/src/mongo/transport/service_entry_point_impl.h b/src/mongo/transport/service_entry_point_impl.h
index 6181885c953..6f47ccedccb 100644
--- a/src/mongo/transport/service_entry_point_impl.h
+++ b/src/mongo/transport/service_entry_point_impl.h
@@ -34,11 +34,8 @@
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/list.h"
#include "mongo/stdx/mutex.h"
-#include "mongo/stdx/variant.h"
#include "mongo/transport/service_entry_point.h"
-#include "mongo/transport/service_executor_reserved.h"
#include "mongo/transport/service_state_machine.h"
-#include "mongo/util/net/cidr.h"
namespace mongo {
class ServiceContext;
@@ -64,10 +61,9 @@ public:
void endAllSessions(transport::Session::TagMask tags) final;
- Status start() final;
bool shutdown(Milliseconds timeout) final;
- void appendStats(BSONObjBuilder* bob) const final;
+ Stats sessionStats() const final;
size_t numOpenSessions() const final {
return _currentConnections.load();
@@ -87,14 +83,6 @@ private:
size_t _maxNumConnections{DEFAULT_MAX_CONN};
AtomicWord<size_t> _currentConnections{0};
AtomicWord<size_t> _createdConnections{0};
-
- std::unique_ptr<transport::ServiceExecutorReserved> _adminInternalPool;
};
-/*
- * Returns true if a session with remote/local addresses should be exempted from maxConns
- */
-bool shouldOverrideMaxConns(const transport::SessionHandle& session,
- const std::vector<stdx::variant<CIDR, std::string>>& exemptions);
-
} // namespace mongo
diff --git a/src/mongo/transport/service_executor_adaptive.cpp b/src/mongo/transport/service_executor_adaptive.cpp
index 5d928bd0520..d0014485cc4 100644
--- a/src/mongo/transport/service_executor_adaptive.cpp
+++ b/src/mongo/transport/service_executor_adaptive.cpp
@@ -664,19 +664,20 @@ StringData ServiceExecutorAdaptive::_threadStartedByToString(
void ServiceExecutorAdaptive::appendStats(BSONObjBuilder* bob) const {
stdx::unique_lock<stdx::mutex> lk(_threadsMutex);
- *bob << kExecutorLabel << kExecutorName //
- << kTotalQueued << _totalQueued.load() //
- << kTotalExecuted << _totalExecuted.load() //
- << kThreadsInUse << _threadsInUse.load() //
- << kTotalTimeRunningUs //
- << ticksToMicros(_getThreadTimerTotal(ThreadTimer::kRunning, lk), _tickSource) //
- << kTotalTimeExecutingUs //
- << ticksToMicros(_getThreadTimerTotal(ThreadTimer::kExecuting, lk), _tickSource) //
- << kTotalTimeQueuedUs << ticksToMicros(_totalSpentQueued.load(), _tickSource) //
- << kThreadsRunning << _threadsRunning.load() //
- << kThreadsPending << _threadsPending.load();
-
- BSONObjBuilder threadStartReasons(bob->subobjStart(kThreadReasons));
+ BSONObjBuilder section(bob->subobjStart("serviceExecutorTaskStats"));
+ section << kExecutorLabel << kExecutorName //
+ << kTotalQueued << _totalQueued.load() //
+ << kTotalExecuted << _totalExecuted.load() //
+ << kThreadsInUse << _threadsInUse.load() //
+ << kTotalTimeRunningUs //
+ << ticksToMicros(_getThreadTimerTotal(ThreadTimer::kRunning, lk), _tickSource) //
+ << kTotalTimeExecutingUs //
+ << ticksToMicros(_getThreadTimerTotal(ThreadTimer::kExecuting, lk), _tickSource) //
+ << kTotalTimeQueuedUs << ticksToMicros(_totalSpentQueued.load(), _tickSource) //
+ << kThreadsRunning << _threadsRunning.load() //
+ << kThreadsPending << _threadsPending.load();
+
+ BSONObjBuilder threadStartReasons(section.subobjStart(kThreadReasons));
for (size_t i = 0; i < _threadStartCounters.size(); i++) {
threadStartReasons << _threadStartedByToString(static_cast<ThreadCreationReason>(i))
<< _threadStartCounters[i];
@@ -684,7 +685,7 @@ void ServiceExecutorAdaptive::appendStats(BSONObjBuilder* bob) const {
threadStartReasons.doneFast();
- BSONObjBuilder metricsByTask(bob->subobjStart("metricsByTask"));
+ BSONObjBuilder metricsByTask(section.subobjStart("metricsByTask"));
MetricsArray totalMetrics;
_accumulateAllTaskMetrics(&totalMetrics, lk);
lk.unlock();
@@ -702,6 +703,7 @@ void ServiceExecutorAdaptive::appendStats(BSONObjBuilder* bob) const {
subSection.doneFast();
}
metricsByTask.doneFast();
+ section.doneFast();
}
} // namespace transport
diff --git a/src/mongo/transport/service_executor_reserved.cpp b/src/mongo/transport/service_executor_reserved.cpp
deleted file mode 100644
index 7d8e39c5a7d..00000000000
--- a/src/mongo/transport/service_executor_reserved.cpp
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Copyright (C) 2018 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor;
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/transport/service_executor_reserved.h"
-
-#include "mongo/db/server_parameters.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/transport/service_entry_point_utils.h"
-#include "mongo/transport/service_executor_task_names.h"
-#include "mongo/transport/thread_idle_callback.h"
-#include "mongo/util/log.h"
-#include "mongo/util/processinfo.h"
-
-namespace mongo {
-namespace transport {
-namespace {
-
-// Tasks scheduled with MayRecurse may be called recursively if the recursion depth is below this
-// value.
-MONGO_EXPORT_SERVER_PARAMETER(reservedServiceExecutorRecursionLimit, int, 8);
-
-constexpr auto kThreadsRunning = "threadsRunning"_sd;
-constexpr auto kExecutorLabel = "executor"_sd;
-constexpr auto kExecutorName = "reserved"_sd;
-constexpr auto kReadyThreads = "readyThreads"_sd;
-constexpr auto kStartingThreads = "startingThreads"_sd;
-} // namespace
-
-thread_local std::deque<ServiceExecutor::Task> ServiceExecutorReserved::_localWorkQueue = {};
-thread_local int ServiceExecutorReserved::_localRecursionDepth = 0;
-thread_local int64_t ServiceExecutorReserved::_localThreadIdleCounter = 0;
-
-ServiceExecutorReserved::ServiceExecutorReserved(ServiceContext* ctx,
- std::string name,
- size_t reservedThreads)
- : _name(std::move(name)), _reservedThreads(reservedThreads) {}
-
-Status ServiceExecutorReserved::start() {
- {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _stillRunning.store(true);
- _numStartingThreads = _reservedThreads;
- }
-
- for (size_t i = 0; i < _reservedThreads; i++) {
- auto status = _startWorker();
- if (!status.isOK()) {
- return status;
- }
- }
-
- return Status::OK();
-}
-
-Status ServiceExecutorReserved::_startWorker() {
- log() << "Starting new worker thread for " << _name << " service executor";
- return launchServiceWorkerThread([this] {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- _numRunningWorkerThreads.addAndFetch(1);
- auto numRunningGuard = MakeGuard([&] {
- _numRunningWorkerThreads.subtractAndFetch(1);
- _shutdownCondition.notify_one();
- });
-
- _numStartingThreads--;
- _numReadyThreads++;
-
- while (_stillRunning.load()) {
- _threadWakeup.wait(lk, [&] { return (!_stillRunning.load() || !_readyTasks.empty()); });
-
- if (!_stillRunning.loadRelaxed()) {
- break;
- }
-
- if (_readyTasks.empty()) {
- continue;
- }
-
- auto task = std::move(_readyTasks.front());
- _readyTasks.pop_front();
- _numReadyThreads -= 1;
- bool launchReplacement = false;
- if (_numReadyThreads + _numStartingThreads < _reservedThreads) {
- _numStartingThreads++;
- launchReplacement = true;
- }
-
- lk.unlock();
-
- if (launchReplacement) {
- auto threadStartStatus = _startWorker();
- if (!threadStartStatus.isOK()) {
- warning() << "Could not start new reserve worker thread: " << threadStartStatus;
- }
- }
-
- _localWorkQueue.emplace_back(std::move(task));
- while (!_localWorkQueue.empty() && _stillRunning.loadRelaxed()) {
- _localRecursionDepth = 1;
- _localWorkQueue.front()();
- _localWorkQueue.pop_front();
- }
-
- lk.lock();
- if (_numReadyThreads + 1 > _reservedThreads) {
- break;
- } else {
- _numReadyThreads += 1;
- }
- }
-
- LOG(3) << "Exiting worker thread in " << _name << " service executor";
- });
-}
-
-
-Status ServiceExecutorReserved::shutdown(Milliseconds timeout) {
- LOG(3) << "Shutting down reserved executor";
-
- stdx::unique_lock<stdx::mutex> lock(_mutex);
- _stillRunning.store(false);
- _threadWakeup.notify_all();
-
- bool result = _shutdownCondition.wait_for(lock, timeout.toSystemDuration(), [this]() {
- return _numRunningWorkerThreads.load() == 0;
- });
-
- return result
- ? Status::OK()
- : Status(ErrorCodes::Error::ExceededTimeLimit,
- "reserved executor couldn't shutdown all worker threads within time limit.");
-}
-
-Status ServiceExecutorReserved::schedule(Task task,
- ScheduleFlags flags,
- ServiceExecutorTaskName taskName) {
- if (!_stillRunning.load()) {
- return Status{ErrorCodes::ShutdownInProgress, "Executor is not running"};
- }
-
- if (!_localWorkQueue.empty()) {
- /*
- * In perf testing we found that yielding after running a each request produced
- * at 5% performance boost in microbenchmarks if the number of worker threads
- * was greater than the number of available cores.
- */
- if (flags & ScheduleFlags::kMayYieldBeforeSchedule) {
- if ((_localThreadIdleCounter++ & 0xf) == 0) {
- markThreadIdle();
- }
- }
-
- // Execute task directly (recurse) if allowed by the caller as it produced better
- // performance in testing. Try to limit the amount of recursion so we don't blow up the
- // stack, even though this shouldn't happen with this executor that uses blocking network
- // I/O.
- if ((flags & ScheduleFlags::kMayRecurse) &&
- (_localRecursionDepth < reservedServiceExecutorRecursionLimit.loadRelaxed())) {
- ++_localRecursionDepth;
- task();
- } else {
- _localWorkQueue.emplace_back(std::move(task));
- }
- return Status::OK();
- }
-
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _readyTasks.push_back(std::move(task));
- _threadWakeup.notify_one();
-
- return Status::OK();
-}
-
-void ServiceExecutorReserved::appendStats(BSONObjBuilder* bob) const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- *bob << kExecutorLabel << kExecutorName << kThreadsRunning
- << static_cast<int>(_numRunningWorkerThreads.loadRelaxed()) << kReadyThreads
- << static_cast<int>(_numReadyThreads) << kStartingThreads
- << static_cast<int>(_numStartingThreads);
-}
-
-} // namespace transport
-} // namespace mongo
diff --git a/src/mongo/transport/service_executor_reserved.h b/src/mongo/transport/service_executor_reserved.h
deleted file mode 100644
index 235e86e5930..00000000000
--- a/src/mongo/transport/service_executor_reserved.h
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Copyright (C) 2018 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <deque>
-
-#include "mongo/base/status.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/mutex.h"
-#include "mongo/transport/service_executor.h"
-#include "mongo/transport/service_executor_task_names.h"
-
-namespace mongo {
-namespace transport {
-
-/**
- * The reserved service executor emulates a thread per connection.
- * Each connection has its own worker thread where jobs get scheduled.
- *
- * The executor will start reservedThreads on start, and create a new thread every time it
- * starts a new thread, ensuring there are always reservedThreads available for work - this
- * means that even when you hit the NPROC ulimit, there will still be threads ready to
- * accept work. When threads exit, they will go back to waiting for work if there are fewer
- * than reservedThreads available.
- */
-class ServiceExecutorReserved final : public ServiceExecutor {
-public:
- explicit ServiceExecutorReserved(ServiceContext* ctx, std::string name, size_t reservedThreads);
-
- Status start() override;
- Status shutdown(Milliseconds timeout) override;
- Status schedule(Task task, ScheduleFlags flags, ServiceExecutorTaskName taskName) override;
-
- Mode transportMode() const override {
- return Mode::kSynchronous;
- }
-
- void appendStats(BSONObjBuilder* bob) const override;
-
-private:
- Status _startWorker();
-
- static thread_local std::deque<Task> _localWorkQueue;
- static thread_local int _localRecursionDepth;
- static thread_local int64_t _localThreadIdleCounter;
-
- AtomicBool _stillRunning{false};
-
- mutable stdx::mutex _mutex;
- stdx::condition_variable _threadWakeup;
- stdx::condition_variable _shutdownCondition;
-
- std::deque<Task> _readyTasks;
-
- AtomicUInt32 _numRunningWorkerThreads{0};
- size_t _numReadyThreads{0};
- size_t _numStartingThreads{0};
-
- const std::string _name;
- const size_t _reservedThreads;
-};
-
-} // namespace transport
-} // namespace mongo
diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp
index 7b1b6509537..a3bd83a0f06 100644
--- a/src/mongo/transport/service_executor_synchronous.cpp
+++ b/src/mongo/transport/service_executor_synchronous.cpp
@@ -142,8 +142,9 @@ Status ServiceExecutorSynchronous::schedule(Task task,
}
void ServiceExecutorSynchronous::appendStats(BSONObjBuilder* bob) const {
- *bob << kExecutorLabel << kExecutorName << kThreadsRunning
- << static_cast<int>(_numRunningWorkerThreads.loadRelaxed());
+ BSONObjBuilder section(bob->subobjStart("serviceExecutorTaskStats"));
+ section << kExecutorLabel << kExecutorName << kThreadsRunning
+ << static_cast<int>(_numRunningWorkerThreads.loadRelaxed());
}
} // namespace transport
diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp
index 83eb411c7b6..0ecbfcca6d1 100644
--- a/src/mongo/transport/service_state_machine.cpp
+++ b/src/mongo/transport/service_state_machine.cpp
@@ -226,7 +226,6 @@ ServiceStateMachine::ServiceStateMachine(ServiceContext* svcContext,
_sep{svcContext->getServiceEntryPoint()},
_transportMode(transportMode),
_serviceContext(svcContext),
- _serviceExecutor(_serviceContext->getServiceExecutor()),
_sessionHandle(session),
_threadName{str::stream() << "conn" << _session()->id()},
_dbClient{svcContext->makeClient(_threadName, std::move(session))},
@@ -470,10 +469,6 @@ void ServiceStateMachine::start(Ownership ownershipModel) {
ownershipModel);
}
-void ServiceStateMachine::setServiceExecutor(ServiceExecutor* executor) {
- _serviceExecutor = executor;
-}
-
void ServiceStateMachine::_scheduleNextWithGuard(ThreadGuard guard,
transport::ServiceExecutor::ScheduleFlags flags,
transport::ServiceExecutorTaskName taskName,
@@ -485,7 +480,8 @@ void ServiceStateMachine::_scheduleNextWithGuard(ThreadGuard guard,
ssm->_runNextInGuard(std::move(guard));
};
guard.release();
- Status status = _serviceExecutor->schedule(std::move(func), flags, taskName);
+ Status status =
+ _serviceContext->getServiceExecutor()->schedule(std::move(func), flags, taskName);
if (status.isOK()) {
return;
}
diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h
index ee54eb99cc8..bac5417dd67 100644
--- a/src/mongo/transport/service_state_machine.h
+++ b/src/mongo/transport/service_state_machine.h
@@ -126,12 +126,6 @@ public:
void start(Ownership ownershipModel);
/*
- * Set the executor to be used for the next call to runNext(). This allows switching between
- * thread models after the SSM has started.
- */
- void setServiceExecutor(transport::ServiceExecutor* executor);
-
- /*
* Gets the current state of connection for testing/diagnostic purposes.
*/
State state();
@@ -225,7 +219,6 @@ private:
transport::Mode _transportMode;
ServiceContext* const _serviceContext;
- transport::ServiceExecutor* _serviceExecutor;
transport::SessionHandle _sessionHandle;
const std::string _threadName;
diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp
index 6d37d63b165..e29c33627a6 100644
--- a/src/mongo/transport/service_state_machine_test.cpp
+++ b/src/mongo/transport/service_state_machine_test.cpp
@@ -84,15 +84,13 @@ public:
void endAllSessions(transport::Session::TagMask tags) override {}
- Status start() override {
- return Status::OK();
- }
-
bool shutdown(Milliseconds timeout) override {
return true;
}
- void appendStats(BSONObjBuilder*) const override {}
+ Stats sessionStats() const override {
+ return {};
+ }
size_t numOpenSessions() const override {
return 0ULL;
diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp
index abf49b48690..b9ff5580dc4 100644
--- a/src/mongo/transport/transport_layer_asio_test.cpp
+++ b/src/mongo/transport/transport_layer_asio_test.cpp
@@ -63,15 +63,13 @@ public:
old_sessions.clear();
}
- Status start() override {
- return Status::OK();
- }
-
bool shutdown(Milliseconds timeout) override {
return true;
}
- void appendStats(BSONObjBuilder*) const override {}
+ Stats sessionStats() const override {
+ return {};
+ }
size_t numOpenSessions() const override {
stdx::unique_lock<stdx::mutex> lock(_mutex);
@@ -171,12 +169,10 @@ public:
return true;
}
- Status start() override {
- return Status::OK();
+ Stats sessionStats() const override {
+ return {};
}
- void appendStats(BSONObjBuilder*) const override {}
-
size_t numOpenSessions() const override {
return 0;
}