diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2018-08-16 18:58:49 -0400 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2018-08-16 18:58:49 -0400 |
commit | 5e461534b339c957d2e6dcf16015949a496a1110 (patch) | |
tree | c688bcfa73c9b4eacbb42fa778df3ccf7e65c02c /src/mongo/transport | |
parent | 57be1f7097ceed41b156a826c541aa276f35dd61 (diff) | |
download | mongo-5e461534b339c957d2e6dcf16015949a496a1110.tar.gz |
Revert "SERVER-34986 Allow connections to override maxConns based on CIDR range"
This reverts commit 1a643ba65b070c167cdfdd5056c7d2ac79dd5371.
Diffstat (limited to 'src/mongo/transport')
-rw-r--r-- | src/mongo/transport/SConscript | 14 | ||||
-rw-r--r-- | src/mongo/transport/max_conns_override_test.cpp | 84 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point.h | 35 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_impl.cpp | 71 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_impl.h | 14 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_adaptive.cpp | 30 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_reserved.cpp | 212 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_reserved.h | 91 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_synchronous.cpp | 5 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.cpp | 8 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.h | 7 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine_test.cpp | 8 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio_test.cpp | 14 |
13 files changed, 64 insertions, 529 deletions
diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 4312fc6f05a..2990aede9ea 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -113,7 +113,6 @@ tlEnv.Library( target='service_executor', source=[ 'service_executor_adaptive.cpp', - 'service_executor_reserved.cpp', 'service_executor_synchronous.cpp', 'thread_idle_callback.cpp', ], @@ -174,19 +173,6 @@ env.Library( ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/transport/message_compressor', - '$BUILD_DIR/mongo/transport/service_executor', - ], -) - -env.CppUnitTest( - target='max_conns_override_test', - source=[ - 'max_conns_override_test.cpp', - ], - LIBDEPS=[ - 'service_entry_point', - 'transport_layer_mock', - '$BUILD_DIR/mongo/unittest/unittest' ], ) diff --git a/src/mongo/transport/max_conns_override_test.cpp b/src/mongo/transport/max_conns_override_test.cpp deleted file mode 100644 index 195a4c030c6..00000000000 --- a/src/mongo/transport/max_conns_override_test.cpp +++ /dev/null @@ -1,84 +0,0 @@ -/* Copyright 2018 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#include "mongo/transport/mock_session.h" -#include "mongo/transport/service_entry_point_impl.h" -#include "mongo/unittest/unittest.h" - -namespace mongo { -namespace { - -using ExemptionVector = std::vector<stdx::variant<CIDR, std::string>>; - -template <typename T> -stdx::variant<CIDR, std::string> makeExemption(T exemption) { - auto swCIDR = CIDR::parse(exemption); - if (swCIDR.isOK()) { - return swCIDR.getValue(); - } else { - return std::string{exemption}; - } -} - -transport::SessionHandle makeIPSession(StringData ip) { - return transport::MockSession::create( - HostAndPort(SockAddr(ip, 27017, AF_INET)), HostAndPort(), nullptr); -} - -transport::SessionHandle makeUNIXSession(StringData path) { - return transport::MockSession::create(HostAndPort(SockAddr(""_sd, -1, AF_UNIX)), - HostAndPort(SockAddr(path, -1, AF_UNIX)), - nullptr); -} - -TEST(MaxConnsOverride, NormalCIDR) { - ExemptionVector cidrOnly{makeExemption("127.0.0.1"), makeExemption("10.0.0.0/24")}; - - ASSERT_TRUE(shouldOverrideMaxConns(makeIPSession("127.0.0.1"), cidrOnly)); - ASSERT_TRUE(shouldOverrideMaxConns(makeIPSession("10.0.0.35"), cidrOnly)); - ASSERT_FALSE(shouldOverrideMaxConns(makeIPSession("192.168.0.53"), cidrOnly)); -} - -TEST(MaxConnsOverride, UNIXPaths) { - ExemptionVector mixed{makeExemption("127.0.0.1"), - makeExemption("10.0.0.0/24"), - makeExemption("/tmp/mongod.sock")}; - - ASSERT_TRUE(shouldOverrideMaxConns(makeIPSession("127.0.0.1"), mixed)); - ASSERT_TRUE(shouldOverrideMaxConns(makeIPSession("10.0.0.35"), mixed)); - ASSERT_FALSE(shouldOverrideMaxConns(makeIPSession("192.168.0.53"), mixed)); -#ifdef _WIN32 - ASSERT_FALSE(shouldOverrideMaxConns(makeUNIXSession("/tmp/mongod.sock"), mixed)); -#else - ASSERT_TRUE(shouldOverrideMaxConns(makeUNIXSession("/tmp/mongod.sock"), mixed)); -#endif - ASSERT_FALSE(shouldOverrideMaxConns(makeUNIXSession("/tmp/other-mongod.sock"), mixed)); -} - - -} // namespace -} // namespace diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h index abb9c225843..e22135bdaf1 100644 --- a/src/mongo/transport/service_entry_point.h +++ b/src/mongo/transport/service_entry_point.h @@ -29,7 +29,6 @@ #pragma once #include "mongo/base/disallow_copying.h" -#include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/dbmessage.h" #include "mongo/transport/session.h" @@ -46,6 +45,29 @@ class ServiceEntryPoint { MONGO_DISALLOW_COPYING(ServiceEntryPoint); public: + /** + * Stats for sessions open. + */ + struct Stats { + /** + * Returns the number of sessions currently open. + */ + size_t numOpenSessions = 0; + + /** + * Returns the total number of sessions that have ever been created. + */ + size_t numCreatedSessions = 0; + + /** + * Returns the number of available sessions we could still open. Only relevant + * when we are operating under a transport::Session limit (for example, in the + * legacy implementation, we respect a maximum number of connections). If there + * is no session limit, returns std::numeric_limits<int>::max(). + */ + size_t numAvailableSessions = 0; + }; + virtual ~ServiceEntryPoint() = default; /** @@ -59,19 +81,14 @@ public: virtual void endAllSessions(transport::Session::TagMask tags) = 0; /** - * Starts the service entry point - */ - virtual Status start() = 0; - - /** * Shuts down the service entry point. */ virtual bool shutdown(Milliseconds timeout) = 0; /** - * Append high-level stats to a BSONObjBuilder for serverStatus - */ - virtual void appendStats(BSONObjBuilder* bob) const = 0; + * Returns high-level stats about current sessions. + */ + virtual Stats sessionStats() const = 0; /** * Returns the number of sessions currently open. diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp index 0113d1bdfc2..adbdc48a6ec 100644 --- a/src/mongo/transport/service_entry_point_impl.cpp +++ b/src/mongo/transport/service_entry_point_impl.cpp @@ -46,38 +46,6 @@ #endif namespace mongo { - -bool shouldOverrideMaxConns(const transport::SessionHandle& session, - const std::vector<stdx::variant<CIDR, std::string>>& exemptions) { - const auto& remoteAddr = session->remote().sockAddr(); - const auto& localAddr = session->local().sockAddr(); - boost::optional<CIDR> remoteCIDR; - - if (remoteAddr && remoteAddr->isIP()) { - remoteCIDR = uassertStatusOK(CIDR::parse(remoteAddr->getAddr())); - } - for (const auto& exemption : exemptions) { - // If this exemption is a CIDR range, then we check that the remote IP is in the - // CIDR range - if ((stdx::holds_alternative<CIDR>(exemption)) && (remoteCIDR)) { - if (stdx::get<CIDR>(exemption).contains(*remoteCIDR)) { - return true; - } -// Otherwise the exemption is a UNIX path and we should check the local path -// (the remoteAddr == "anonymous unix socket") against the exemption string -// -// On Windows we don't check this at all and only CIDR ranges are supported -#ifndef _WIN32 - } else if ((stdx::holds_alternative<std::string>(exemption)) && (localAddr) && - (localAddr->getAddr() == stdx::get<std::string>(exemption))) { - return true; -#endif - } - } - - return false; -} - ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(svcCtx) { const auto supportedMax = [] { @@ -103,18 +71,6 @@ ServiceEntryPointImpl::ServiceEntryPointImpl(ServiceContext* svcCtx) : _svcCtx(s } _maxNumConnections = supportedMax; - - if (serverGlobalParams.reservedAdminThreads) { - _adminInternalPool = std::make_unique<transport::ServiceExecutorReserved>( - _svcCtx, "admin/internal connections", serverGlobalParams.reservedAdminThreads); - } -} - -Status ServiceEntryPointImpl::start() { - if (_adminInternalPool) - return _adminInternalPool->start(); - else - return Status::OK(); } void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { @@ -133,16 +89,10 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { auto transportMode = _svcCtx->getServiceExecutor()->transportMode(); auto ssm = ServiceStateMachine::create(_svcCtx, session, transportMode); - auto usingMaxConnOverride = false; { stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex); connectionCount = _sessions.size() + 1; - if (connectionCount > _maxNumConnections) { - usingMaxConnOverride = - shouldOverrideMaxConns(session, serverGlobalParams.maxConnsOverride); - } - - if (connectionCount <= _maxNumConnections || usingMaxConnOverride) { + if (connectionCount <= _maxNumConnections) { ssmIt = _sessions.emplace(_sessions.begin(), ssm); _currentConnections.store(connectionCount); _createdConnections.addAndFetch(1); @@ -151,13 +101,11 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { // Checking if we successfully added a connection above. Separated from the lock so we don't log // while holding it. - if (connectionCount > _maxNumConnections && !usingMaxConnOverride) { + if (connectionCount > _maxNumConnections) { if (!quiet) { log() << "connection refused because too many open connections: " << connectionCount; } return; - } else if (usingMaxConnOverride && _adminInternalPool) { - ssm->setServiceExecutor(_adminInternalPool.get()); } if (!quiet) { @@ -235,18 +183,15 @@ bool ServiceEntryPointImpl::shutdown(Milliseconds timeout) { return result; } -void ServiceEntryPointImpl::appendStats(BSONObjBuilder* bob) const { +ServiceEntryPoint::Stats ServiceEntryPointImpl::sessionStats() const { size_t sessionCount = _currentConnections.load(); - bob->append("current", static_cast<int>(sessionCount)); - bob->append("available", static_cast<int>(_maxNumConnections - sessionCount)); - bob->append("totalCreated", static_cast<int>(_createdConnections.load())); - - if (_adminInternalPool) { - BSONObjBuilder section(bob->subobjStart("adminConnections")); - _adminInternalPool->appendStats(§ion); - } + ServiceEntryPoint::Stats ret; + ret.numOpenSessions = sessionCount; + ret.numCreatedSessions = _createdConnections.load(); + ret.numAvailableSessions = _maxNumConnections - sessionCount; + return ret; } } // namespace mongo diff --git a/src/mongo/transport/service_entry_point_impl.h b/src/mongo/transport/service_entry_point_impl.h index 6181885c953..6f47ccedccb 100644 --- a/src/mongo/transport/service_entry_point_impl.h +++ b/src/mongo/transport/service_entry_point_impl.h @@ -34,11 +34,8 @@ #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/list.h" #include "mongo/stdx/mutex.h" -#include "mongo/stdx/variant.h" #include "mongo/transport/service_entry_point.h" -#include "mongo/transport/service_executor_reserved.h" #include "mongo/transport/service_state_machine.h" -#include "mongo/util/net/cidr.h" namespace mongo { class ServiceContext; @@ -64,10 +61,9 @@ public: void endAllSessions(transport::Session::TagMask tags) final; - Status start() final; bool shutdown(Milliseconds timeout) final; - void appendStats(BSONObjBuilder* bob) const final; + Stats sessionStats() const final; size_t numOpenSessions() const final { return _currentConnections.load(); @@ -87,14 +83,6 @@ private: size_t _maxNumConnections{DEFAULT_MAX_CONN}; AtomicWord<size_t> _currentConnections{0}; AtomicWord<size_t> _createdConnections{0}; - - std::unique_ptr<transport::ServiceExecutorReserved> _adminInternalPool; }; -/* - * Returns true if a session with remote/local addresses should be exempted from maxConns - */ -bool shouldOverrideMaxConns(const transport::SessionHandle& session, - const std::vector<stdx::variant<CIDR, std::string>>& exemptions); - } // namespace mongo diff --git a/src/mongo/transport/service_executor_adaptive.cpp b/src/mongo/transport/service_executor_adaptive.cpp index 5d928bd0520..d0014485cc4 100644 --- a/src/mongo/transport/service_executor_adaptive.cpp +++ b/src/mongo/transport/service_executor_adaptive.cpp @@ -664,19 +664,20 @@ StringData ServiceExecutorAdaptive::_threadStartedByToString( void ServiceExecutorAdaptive::appendStats(BSONObjBuilder* bob) const { stdx::unique_lock<stdx::mutex> lk(_threadsMutex); - *bob << kExecutorLabel << kExecutorName // - << kTotalQueued << _totalQueued.load() // - << kTotalExecuted << _totalExecuted.load() // - << kThreadsInUse << _threadsInUse.load() // - << kTotalTimeRunningUs // - << ticksToMicros(_getThreadTimerTotal(ThreadTimer::kRunning, lk), _tickSource) // - << kTotalTimeExecutingUs // - << ticksToMicros(_getThreadTimerTotal(ThreadTimer::kExecuting, lk), _tickSource) // - << kTotalTimeQueuedUs << ticksToMicros(_totalSpentQueued.load(), _tickSource) // - << kThreadsRunning << _threadsRunning.load() // - << kThreadsPending << _threadsPending.load(); - - BSONObjBuilder threadStartReasons(bob->subobjStart(kThreadReasons)); + BSONObjBuilder section(bob->subobjStart("serviceExecutorTaskStats")); + section << kExecutorLabel << kExecutorName // + << kTotalQueued << _totalQueued.load() // + << kTotalExecuted << _totalExecuted.load() // + << kThreadsInUse << _threadsInUse.load() // + << kTotalTimeRunningUs // + << ticksToMicros(_getThreadTimerTotal(ThreadTimer::kRunning, lk), _tickSource) // + << kTotalTimeExecutingUs // + << ticksToMicros(_getThreadTimerTotal(ThreadTimer::kExecuting, lk), _tickSource) // + << kTotalTimeQueuedUs << ticksToMicros(_totalSpentQueued.load(), _tickSource) // + << kThreadsRunning << _threadsRunning.load() // + << kThreadsPending << _threadsPending.load(); + + BSONObjBuilder threadStartReasons(section.subobjStart(kThreadReasons)); for (size_t i = 0; i < _threadStartCounters.size(); i++) { threadStartReasons << _threadStartedByToString(static_cast<ThreadCreationReason>(i)) << _threadStartCounters[i]; @@ -684,7 +685,7 @@ void ServiceExecutorAdaptive::appendStats(BSONObjBuilder* bob) const { threadStartReasons.doneFast(); - BSONObjBuilder metricsByTask(bob->subobjStart("metricsByTask")); + BSONObjBuilder metricsByTask(section.subobjStart("metricsByTask")); MetricsArray totalMetrics; _accumulateAllTaskMetrics(&totalMetrics, lk); lk.unlock(); @@ -702,6 +703,7 @@ void ServiceExecutorAdaptive::appendStats(BSONObjBuilder* bob) const { subSection.doneFast(); } metricsByTask.doneFast(); + section.doneFast(); } } // namespace transport diff --git a/src/mongo/transport/service_executor_reserved.cpp b/src/mongo/transport/service_executor_reserved.cpp deleted file mode 100644 index 7d8e39c5a7d..00000000000 --- a/src/mongo/transport/service_executor_reserved.cpp +++ /dev/null @@ -1,212 +0,0 @@ -/** - * Copyright (C) 2018 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor; - -#include "mongo/platform/basic.h" - -#include "mongo/transport/service_executor_reserved.h" - -#include "mongo/db/server_parameters.h" -#include "mongo/stdx/thread.h" -#include "mongo/transport/service_entry_point_utils.h" -#include "mongo/transport/service_executor_task_names.h" -#include "mongo/transport/thread_idle_callback.h" -#include "mongo/util/log.h" -#include "mongo/util/processinfo.h" - -namespace mongo { -namespace transport { -namespace { - -// Tasks scheduled with MayRecurse may be called recursively if the recursion depth is below this -// value. -MONGO_EXPORT_SERVER_PARAMETER(reservedServiceExecutorRecursionLimit, int, 8); - -constexpr auto kThreadsRunning = "threadsRunning"_sd; -constexpr auto kExecutorLabel = "executor"_sd; -constexpr auto kExecutorName = "reserved"_sd; -constexpr auto kReadyThreads = "readyThreads"_sd; -constexpr auto kStartingThreads = "startingThreads"_sd; -} // namespace - -thread_local std::deque<ServiceExecutor::Task> ServiceExecutorReserved::_localWorkQueue = {}; -thread_local int ServiceExecutorReserved::_localRecursionDepth = 0; -thread_local int64_t ServiceExecutorReserved::_localThreadIdleCounter = 0; - -ServiceExecutorReserved::ServiceExecutorReserved(ServiceContext* ctx, - std::string name, - size_t reservedThreads) - : _name(std::move(name)), _reservedThreads(reservedThreads) {} - -Status ServiceExecutorReserved::start() { - { - stdx::unique_lock<stdx::mutex> lk(_mutex); - _stillRunning.store(true); - _numStartingThreads = _reservedThreads; - } - - for (size_t i = 0; i < _reservedThreads; i++) { - auto status = _startWorker(); - if (!status.isOK()) { - return status; - } - } - - return Status::OK(); -} - -Status ServiceExecutorReserved::_startWorker() { - log() << "Starting new worker thread for " << _name << " service executor"; - return launchServiceWorkerThread([this] { - stdx::unique_lock<stdx::mutex> lk(_mutex); - _numRunningWorkerThreads.addAndFetch(1); - auto numRunningGuard = MakeGuard([&] { - _numRunningWorkerThreads.subtractAndFetch(1); - _shutdownCondition.notify_one(); - }); - - _numStartingThreads--; - _numReadyThreads++; - - while (_stillRunning.load()) { - _threadWakeup.wait(lk, [&] { return (!_stillRunning.load() || !_readyTasks.empty()); }); - - if (!_stillRunning.loadRelaxed()) { - break; - } - - if (_readyTasks.empty()) { - continue; - } - - auto task = std::move(_readyTasks.front()); - _readyTasks.pop_front(); - _numReadyThreads -= 1; - bool launchReplacement = false; - if (_numReadyThreads + _numStartingThreads < _reservedThreads) { - _numStartingThreads++; - launchReplacement = true; - } - - lk.unlock(); - - if (launchReplacement) { - auto threadStartStatus = _startWorker(); - if (!threadStartStatus.isOK()) { - warning() << "Could not start new reserve worker thread: " << threadStartStatus; - } - } - - _localWorkQueue.emplace_back(std::move(task)); - while (!_localWorkQueue.empty() && _stillRunning.loadRelaxed()) { - _localRecursionDepth = 1; - _localWorkQueue.front()(); - _localWorkQueue.pop_front(); - } - - lk.lock(); - if (_numReadyThreads + 1 > _reservedThreads) { - break; - } else { - _numReadyThreads += 1; - } - } - - LOG(3) << "Exiting worker thread in " << _name << " service executor"; - }); -} - - -Status ServiceExecutorReserved::shutdown(Milliseconds timeout) { - LOG(3) << "Shutting down reserved executor"; - - stdx::unique_lock<stdx::mutex> lock(_mutex); - _stillRunning.store(false); - _threadWakeup.notify_all(); - - bool result = _shutdownCondition.wait_for(lock, timeout.toSystemDuration(), [this]() { - return _numRunningWorkerThreads.load() == 0; - }); - - return result - ? Status::OK() - : Status(ErrorCodes::Error::ExceededTimeLimit, - "reserved executor couldn't shutdown all worker threads within time limit."); -} - -Status ServiceExecutorReserved::schedule(Task task, - ScheduleFlags flags, - ServiceExecutorTaskName taskName) { - if (!_stillRunning.load()) { - return Status{ErrorCodes::ShutdownInProgress, "Executor is not running"}; - } - - if (!_localWorkQueue.empty()) { - /* - * In perf testing we found that yielding after running a each request produced - * at 5% performance boost in microbenchmarks if the number of worker threads - * was greater than the number of available cores. - */ - if (flags & ScheduleFlags::kMayYieldBeforeSchedule) { - if ((_localThreadIdleCounter++ & 0xf) == 0) { - markThreadIdle(); - } - } - - // Execute task directly (recurse) if allowed by the caller as it produced better - // performance in testing. Try to limit the amount of recursion so we don't blow up the - // stack, even though this shouldn't happen with this executor that uses blocking network - // I/O. - if ((flags & ScheduleFlags::kMayRecurse) && - (_localRecursionDepth < reservedServiceExecutorRecursionLimit.loadRelaxed())) { - ++_localRecursionDepth; - task(); - } else { - _localWorkQueue.emplace_back(std::move(task)); - } - return Status::OK(); - } - - stdx::lock_guard<stdx::mutex> lk(_mutex); - _readyTasks.push_back(std::move(task)); - _threadWakeup.notify_one(); - - return Status::OK(); -} - -void ServiceExecutorReserved::appendStats(BSONObjBuilder* bob) const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - *bob << kExecutorLabel << kExecutorName << kThreadsRunning - << static_cast<int>(_numRunningWorkerThreads.loadRelaxed()) << kReadyThreads - << static_cast<int>(_numReadyThreads) << kStartingThreads - << static_cast<int>(_numStartingThreads); -} - -} // namespace transport -} // namespace mongo diff --git a/src/mongo/transport/service_executor_reserved.h b/src/mongo/transport/service_executor_reserved.h deleted file mode 100644 index 235e86e5930..00000000000 --- a/src/mongo/transport/service_executor_reserved.h +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Copyright (C) 2018 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <deque> - -#include "mongo/base/status.h" -#include "mongo/platform/atomic_word.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" -#include "mongo/transport/service_executor.h" -#include "mongo/transport/service_executor_task_names.h" - -namespace mongo { -namespace transport { - -/** - * The reserved service executor emulates a thread per connection. - * Each connection has its own worker thread where jobs get scheduled. - * - * The executor will start reservedThreads on start, and create a new thread every time it - * starts a new thread, ensuring there are always reservedThreads available for work - this - * means that even when you hit the NPROC ulimit, there will still be threads ready to - * accept work. When threads exit, they will go back to waiting for work if there are fewer - * than reservedThreads available. - */ -class ServiceExecutorReserved final : public ServiceExecutor { -public: - explicit ServiceExecutorReserved(ServiceContext* ctx, std::string name, size_t reservedThreads); - - Status start() override; - Status shutdown(Milliseconds timeout) override; - Status schedule(Task task, ScheduleFlags flags, ServiceExecutorTaskName taskName) override; - - Mode transportMode() const override { - return Mode::kSynchronous; - } - - void appendStats(BSONObjBuilder* bob) const override; - -private: - Status _startWorker(); - - static thread_local std::deque<Task> _localWorkQueue; - static thread_local int _localRecursionDepth; - static thread_local int64_t _localThreadIdleCounter; - - AtomicBool _stillRunning{false}; - - mutable stdx::mutex _mutex; - stdx::condition_variable _threadWakeup; - stdx::condition_variable _shutdownCondition; - - std::deque<Task> _readyTasks; - - AtomicUInt32 _numRunningWorkerThreads{0}; - size_t _numReadyThreads{0}; - size_t _numStartingThreads{0}; - - const std::string _name; - const size_t _reservedThreads; -}; - -} // namespace transport -} // namespace mongo diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp index 7b1b6509537..a3bd83a0f06 100644 --- a/src/mongo/transport/service_executor_synchronous.cpp +++ b/src/mongo/transport/service_executor_synchronous.cpp @@ -142,8 +142,9 @@ Status ServiceExecutorSynchronous::schedule(Task task, } void ServiceExecutorSynchronous::appendStats(BSONObjBuilder* bob) const { - *bob << kExecutorLabel << kExecutorName << kThreadsRunning - << static_cast<int>(_numRunningWorkerThreads.loadRelaxed()); + BSONObjBuilder section(bob->subobjStart("serviceExecutorTaskStats")); + section << kExecutorLabel << kExecutorName << kThreadsRunning + << static_cast<int>(_numRunningWorkerThreads.loadRelaxed()); } } // namespace transport diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 83eb411c7b6..0ecbfcca6d1 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -226,7 +226,6 @@ ServiceStateMachine::ServiceStateMachine(ServiceContext* svcContext, _sep{svcContext->getServiceEntryPoint()}, _transportMode(transportMode), _serviceContext(svcContext), - _serviceExecutor(_serviceContext->getServiceExecutor()), _sessionHandle(session), _threadName{str::stream() << "conn" << _session()->id()}, _dbClient{svcContext->makeClient(_threadName, std::move(session))}, @@ -470,10 +469,6 @@ void ServiceStateMachine::start(Ownership ownershipModel) { ownershipModel); } -void ServiceStateMachine::setServiceExecutor(ServiceExecutor* executor) { - _serviceExecutor = executor; -} - void ServiceStateMachine::_scheduleNextWithGuard(ThreadGuard guard, transport::ServiceExecutor::ScheduleFlags flags, transport::ServiceExecutorTaskName taskName, @@ -485,7 +480,8 @@ void ServiceStateMachine::_scheduleNextWithGuard(ThreadGuard guard, ssm->_runNextInGuard(std::move(guard)); }; guard.release(); - Status status = _serviceExecutor->schedule(std::move(func), flags, taskName); + Status status = + _serviceContext->getServiceExecutor()->schedule(std::move(func), flags, taskName); if (status.isOK()) { return; } diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h index ee54eb99cc8..bac5417dd67 100644 --- a/src/mongo/transport/service_state_machine.h +++ b/src/mongo/transport/service_state_machine.h @@ -126,12 +126,6 @@ public: void start(Ownership ownershipModel); /* - * Set the executor to be used for the next call to runNext(). This allows switching between - * thread models after the SSM has started. - */ - void setServiceExecutor(transport::ServiceExecutor* executor); - - /* * Gets the current state of connection for testing/diagnostic purposes. */ State state(); @@ -225,7 +219,6 @@ private: transport::Mode _transportMode; ServiceContext* const _serviceContext; - transport::ServiceExecutor* _serviceExecutor; transport::SessionHandle _sessionHandle; const std::string _threadName; diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp index 6d37d63b165..e29c33627a6 100644 --- a/src/mongo/transport/service_state_machine_test.cpp +++ b/src/mongo/transport/service_state_machine_test.cpp @@ -84,15 +84,13 @@ public: void endAllSessions(transport::Session::TagMask tags) override {} - Status start() override { - return Status::OK(); - } - bool shutdown(Milliseconds timeout) override { return true; } - void appendStats(BSONObjBuilder*) const override {} + Stats sessionStats() const override { + return {}; + } size_t numOpenSessions() const override { return 0ULL; diff --git a/src/mongo/transport/transport_layer_asio_test.cpp b/src/mongo/transport/transport_layer_asio_test.cpp index abf49b48690..b9ff5580dc4 100644 --- a/src/mongo/transport/transport_layer_asio_test.cpp +++ b/src/mongo/transport/transport_layer_asio_test.cpp @@ -63,15 +63,13 @@ public: old_sessions.clear(); } - Status start() override { - return Status::OK(); - } - bool shutdown(Milliseconds timeout) override { return true; } - void appendStats(BSONObjBuilder*) const override {} + Stats sessionStats() const override { + return {}; + } size_t numOpenSessions() const override { stdx::unique_lock<stdx::mutex> lock(_mutex); @@ -171,12 +169,10 @@ public: return true; } - Status start() override { - return Status::OK(); + Stats sessionStats() const override { + return {}; } - void appendStats(BSONObjBuilder*) const override {} - size_t numOpenSessions() const override { return 0; } |