diff options
author | Henrik Edin <henrik.edin@mongodb.com> | 2017-08-15 11:25:22 -0400 |
---|---|---|
committer | Henrik Edin <henrik.edin@mongodb.com> | 2017-09-22 16:38:54 -0400 |
commit | 6732fbb1fb749e9f22f0ed4633e24515f842dafc (patch) | |
tree | 46ecca0cb2d251ee428f51fcb32c975a184bce9f /src/mongo | |
parent | ab7ceed2108a7d19518490929b03fa6f4a13257c (diff) | |
download | mongo-6732fbb1fb749e9f22f0ed4633e24515f842dafc.tar.gz |
SERVER-30135 Added a synchronous executor to make the code path between the two modes similar while still allowing customization in the execution. Should fix some perf regressions that came with unifying the service state machine.
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/db.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/server_options.h | 2 | ||||
-rw-r--r-- | src/mongo/platform/bitwise_enum_operators.h | 99 | ||||
-rw-r--r-- | src/mongo/s/server.cpp | 10 | ||||
-rw-r--r-- | src/mongo/stdx/condition_variable.h | 1 | ||||
-rw-r--r-- | src/mongo/transport/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/transport/service_entry_point_impl.cpp | 48 | ||||
-rw-r--r-- | src/mongo/transport/service_executor.h | 14 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_adaptive.h | 4 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_noop.h | 63 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_synchronous.cpp | 152 | ||||
-rw-r--r-- | src/mongo/transport/service_executor_synchronous.h | 75 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.cpp | 35 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine.h | 51 | ||||
-rw-r--r-- | src/mongo/transport/service_state_machine_test.cpp | 6 | ||||
-rw-r--r-- | src/mongo/transport/session_asio.h | 2 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio.h | 9 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_manager.cpp | 12 | ||||
-rw-r--r-- | src/mongo/transport/transport_mode.h | 35 |
19 files changed, 530 insertions, 106 deletions
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index ca235602fa5..89d608f2778 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -814,12 +814,10 @@ ExitCode _initAndListen(int listenPort) { return EXIT_NET_ERROR; } - if (globalServiceContext->getServiceExecutor()) { - start = globalServiceContext->getServiceExecutor()->start(); - if (!start.isOK()) { - error() << "Failed to start the service executor: " << start; - return EXIT_NET_ERROR; - } + start = globalServiceContext->getServiceExecutor()->start(); + if (!start.isOK()) { + error() << "Failed to start the service executor: " << start; + return EXIT_NET_ERROR; } globalServiceContext->notifyStartupComplete(); @@ -1140,9 +1138,10 @@ void shutdownTask() { } // Shutdown and wait for the service executor to exit - auto svcExec = serviceContext->getServiceExecutor(); - if (svcExec) { - fassertStatusOK(40550, svcExec->shutdown()); + Status status = serviceContext->getServiceExecutor()->shutdown(); + if (!status.isOK()) { + log(LogComponent::kExecutor) << "shutdown: service executor failed with: " + << status.reason(); } } #endif diff --git a/src/mongo/db/server_options.h b/src/mongo/db/server_options.h index 4a4871f5766..a82d20acc79 100644 --- a/src/mongo/db/server_options.h +++ b/src/mongo/db/server_options.h @@ -76,7 +76,7 @@ struct ServerGlobalParams { std::string socket = "/tmp"; // UNIX domain socket directory std::string transportLayer; // --transportLayer (must be either "asio" or "legacy") - // --serviceExecutor ("adaptive", "synchronous", or "fixedForTesting") + // --serviceExecutor ("adaptive", "synchronous") std::string serviceExecutor; size_t maxConns = DEFAULT_MAX_CONN; // Maximum number of simultaneous open connections. diff --git a/src/mongo/platform/bitwise_enum_operators.h b/src/mongo/platform/bitwise_enum_operators.h new file mode 100644 index 00000000000..8647f5e8a43 --- /dev/null +++ b/src/mongo/platform/bitwise_enum_operators.h @@ -0,0 +1,99 @@ +/** +* Copyright (C) 2017 MongoDB Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects for +* all of the code used other than as permitted herein. If you modify file(s) +* with this exception, you may extend this exception to your version of the +* file(s), but you are not obligated to do so. If you do not wish to do so, +* delete this exception statement from your version. If you delete this +* exception statement from all source files in the program, then also delete +* it in the license file. +*/ + +#pragma once + +#include <type_traits> + +namespace mongo { + +template <typename Enum> +struct EnableBitMaskOperators { + static const bool enable = false; +}; + +template <typename Enum> +typename std::enable_if<EnableBitMaskOperators<Enum>::enable, Enum>::type operator&(Enum lhs, + Enum rhs) { + using underlying = typename std::underlying_type<Enum>::type; + return static_cast<Enum>(static_cast<underlying>(lhs) & static_cast<underlying>(rhs)); +} + +template <typename Enum> +typename std::enable_if<EnableBitMaskOperators<Enum>::enable, Enum>::type operator|(Enum lhs, + Enum rhs) { + using underlying = typename std::underlying_type<Enum>::type; + return static_cast<Enum>(static_cast<underlying>(lhs) | static_cast<underlying>(rhs)); +} + +template <typename Enum> +typename std::enable_if<EnableBitMaskOperators<Enum>::enable, Enum>::type operator^(Enum lhs, + Enum rhs) { + using underlying = typename std::underlying_type<Enum>::type; + return static_cast<Enum>(static_cast<underlying>(lhs) ^ static_cast<underlying>(rhs)); +} + +template <typename Enum> +typename std::enable_if<EnableBitMaskOperators<Enum>::enable, Enum>::type operator~(Enum rhs) { + return static_cast<Enum>(~static_cast<typename std::underlying_type<Enum>::type>(rhs)); +} + +template <typename Enum> +typename std::enable_if<EnableBitMaskOperators<Enum>::enable, Enum&>::type operator|=(Enum& lhs, + Enum rhs) { + using underlying = typename std::underlying_type<Enum>::type; + lhs = static_cast<Enum>(static_cast<underlying>(lhs) | static_cast<underlying>(rhs)); + return lhs; +} + +template <typename Enum> +typename std::enable_if<EnableBitMaskOperators<Enum>::enable, Enum&>::type operator&=(Enum& lhs, + Enum rhs) { + using underlying = typename std::underlying_type<Enum>::type; + lhs = static_cast<Enum>(static_cast<underlying>(lhs) & static_cast<underlying>(rhs)); + return lhs; +} + +template <typename Enum> +typename std::enable_if<EnableBitMaskOperators<Enum>::enable, Enum&>::type operator^=(Enum& lhs, + Enum rhs) { + using underlying = typename std::underlying_type<Enum>::type; + lhs = static_cast<Enum>(static_cast<underlying>(lhs) ^ static_cast<underlying>(rhs)); + return lhs; +} + +} // namespace mongo + +#define ENABLE_BITMASK_OPERATORS(x) \ + \ + template <> \ + \ + struct EnableBitMaskOperators<x> { \ + static_assert(std::is_enum<typename x>::value, "template parameter is not an enum type"); \ + static const bool enable = true; \ + }; diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 8c51e2dfb77..4af8f9a1d8d 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -412,12 +412,10 @@ static ExitCode runMongosServer() { return EXIT_NET_ERROR; } - if (auto svcExec = getGlobalServiceContext()->getServiceExecutor()) { - start = svcExec->start(); - if (!start.isOK()) { - error() << "Failed to start the service executor: " << start; - return EXIT_NET_ERROR; - } + start = getGlobalServiceContext()->getServiceExecutor()->start(); + if (!start.isOK()) { + error() << "Failed to start the service executor: " << start; + return EXIT_NET_ERROR; } getGlobalServiceContext()->notifyStartupComplete(); diff --git a/src/mongo/stdx/condition_variable.h b/src/mongo/stdx/condition_variable.h index d2dcd0a9a57..40ca92ba8f6 100644 --- a/src/mongo/stdx/condition_variable.h +++ b/src/mongo/stdx/condition_variable.h @@ -36,6 +36,7 @@ namespace stdx { using condition_variable = ::std::condition_variable; // NOLINT using condition_variable_any = ::std::condition_variable_any; // NOLINT using cv_status = ::std::cv_status; // NOLINT +using ::std::notify_all_at_thread_exit; // NOLINT } // namespace stdx } // namespace mongo diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index 0496a931a48..857ca9d9dfb 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -79,6 +79,7 @@ tlEnv.Library( target='service_executor', source=[ 'service_executor_adaptive.cpp', + 'service_executor_synchronous.cpp' ], LIBDEPS=[ '$BUILD_DIR/mongo/db/service_context', diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp index 62f148f9ec1..2d4f2676cbc 100644 --- a/src/mongo/transport/service_entry_point_impl.cpp +++ b/src/mongo/transport/service_entry_point_impl.cpp @@ -35,7 +35,6 @@ #include <vector> #include "mongo/db/auth/restriction_environment.h" -#include "mongo/transport/service_entry_point_utils.h" #include "mongo/transport/service_state_machine.h" #include "mongo/transport/session.h" #include "mongo/util/log.h" @@ -85,11 +84,11 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { SSMListIterator ssmIt; - const auto sync = (_svcCtx->getServiceExecutor() == nullptr); const bool quiet = serverGlobalParams.quiet.load(); size_t connectionCount; - auto ssm = ServiceStateMachine::create(_svcCtx, session, sync); + auto ssm = ServiceStateMachine::create( + _svcCtx, session, _svcCtx->getServiceExecutor()->transportMode()); { stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex); connectionCount = _sessions.size() + 1; @@ -129,48 +128,7 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { }); - if (!sync) { - dassert(_svcCtx->getServiceExecutor()); - ssm->scheduleNext(); - return; - } - - auto workerTask = [this, ssm]() mutable { - _nWorkers.addAndFetch(1); - const auto guard = MakeGuard([this, &ssm] { _nWorkers.subtractAndFetch(1); }); - - const auto numCores = [] { - ProcessInfo p; - if (auto availCores = p.getNumAvailableCores()) { - return static_cast<unsigned>(*availCores); - } - return static_cast<unsigned>(p.getNumCores()); - }(); - - while (ssm->state() != ServiceStateMachine::State::Ended) { - ssm->runNext(); - - /* - * In perf testing we found that yielding after running a each request produced - * at 5% performance boost in microbenchmarks if the number of worker threads - * was greater than the number of available cores. - */ - if (_nWorkers.load() > numCores) - stdx::this_thread::yield(); - } - }; - - const auto launchResult = launchServiceWorkerThread(std::move(workerTask)); - if (launchResult.isOK()) { - return; - } - - // We never got off the ground. Manually remove the new SSM from - // the list of sessions and close the associated socket. The SSM - // will be destroyed. - stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex); - _sessions.erase(ssmIt); - ssm->terminateIfTagsDontMatch(0); + ssm->scheduleNext(); } void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) { diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h index 436c331882c..684e9d0e9d2 100644 --- a/src/mongo/transport/service_executor.h +++ b/src/mongo/transport/service_executor.h @@ -30,7 +30,9 @@ #include "mongo/base/status.h" #include "mongo/bson/bsonobjbuilder.h" +#include "mongo/platform/bitwise_enum_operators.h" #include "mongo/stdx/functional.h" +#include "mongo/transport/transport_mode.h" namespace mongo { // This needs to be forward declared here because the service_context.h is a circular dependency. @@ -55,6 +57,10 @@ public: // MayRecurse indicates that a task may be run recursively. kMayRecurse = 1 << 2, + + // MayYieldBeforeSchedule indicates that the executor may yield on the current thread before + // scheduling the task. + kMayYieldBeforeSchedule = 1 << 3, }; /* @@ -82,10 +88,18 @@ public: virtual Status shutdown() = 0; /* + * Returns if this service executor is using asynchronous or synchronous networking. + */ + virtual Mode transportMode() const = 0; + + /* * Appends statistics about task scheduling to a BSONObjBuilder for serverStatus output. */ virtual void appendStats(BSONObjBuilder* bob) const = 0; }; } // namespace transport + +ENABLE_BITMASK_OPERATORS(transport::ServiceExecutor::ScheduleFlags) + } // namespace mongo diff --git a/src/mongo/transport/service_executor_adaptive.h b/src/mongo/transport/service_executor_adaptive.h index 2163d97f3e5..bc1a1f31c5b 100644 --- a/src/mongo/transport/service_executor_adaptive.h +++ b/src/mongo/transport/service_executor_adaptive.h @@ -93,6 +93,10 @@ public: Status shutdown() final; Status schedule(Task task, ScheduleFlags flags) final; + Mode transportMode() const final { + return Mode::kAsynchronous; + } + void appendStats(BSONObjBuilder* bob) const final; int threadsRunning() { diff --git a/src/mongo/transport/service_executor_noop.h b/src/mongo/transport/service_executor_noop.h new file mode 100644 index 00000000000..a0cd262d29a --- /dev/null +++ b/src/mongo/transport/service_executor_noop.h @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/status.h" +#include "mongo/transport/service_executor.h" + +namespace mongo { +namespace transport { + +/** + * The noop service executor provides the necessary interface for some unittests. Doesn't actually + * execute any work + */ +class ServiceExecutorNoop final : public ServiceExecutor { +public: + explicit ServiceExecutorNoop(ServiceContext* ctx) {} + + Status start() override { + return Status::OK(); + } + Status shutdown() override { + return Status::OK(); + } + Status schedule(Task task, ScheduleFlags flags) override { + return Status::OK(); + } + + Mode transportMode() const override { + return Mode::kSynchronous; + } + + void appendStats(BSONObjBuilder* bob) const override {} +}; + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/service_executor_synchronous.cpp b/src/mongo/transport/service_executor_synchronous.cpp new file mode 100644 index 00000000000..30140b811f2 --- /dev/null +++ b/src/mongo/transport/service_executor_synchronous.cpp @@ -0,0 +1,152 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor; + +#include "mongo/platform/basic.h" + +#include "mongo/transport/service_executor_synchronous.h" + +#include "mongo/db/server_parameters.h" +#include "mongo/stdx/thread.h" +#include "mongo/transport/service_entry_point_utils.h" +#include "mongo/util/log.h" +#include "mongo/util/net/thread_idle_callback.h" +#include "mongo/util/processinfo.h" + +namespace mongo { +namespace transport { +namespace { + +// Tasks scheduled with MayRecurse may be called recursively if the recursion depth is below this +// value. +MONGO_EXPORT_SERVER_PARAMETER(synchronousServiceExecutorRecursionLimit, int, 8); + +constexpr auto kThreadsRunning = "threadsRunning"_sd; +constexpr auto kExecutorLabel = "executor"_sd; +constexpr auto kExecutorName = "passthrough"_sd; +} // namespace + +thread_local std::deque<ServiceExecutor::Task> ServiceExecutorSynchronous::_localWorkQueue = {}; +thread_local int ServiceExecutorSynchronous::_localRecursionDepth = 0; +thread_local int64_t ServiceExecutorSynchronous::_localThreadIdleCounter = 0; + +ServiceExecutorSynchronous::ServiceExecutorSynchronous(ServiceContext* ctx) {} + +Status ServiceExecutorSynchronous::start() { + _numHardwareCores = [] { + ProcessInfo p; + if (auto availCores = p.getNumAvailableCores()) { + return static_cast<size_t>(*availCores); + } + return static_cast<size_t>(p.getNumCores()); + }(); + + _stillRunning.store(true); + + return Status::OK(); +} + +Status ServiceExecutorSynchronous::shutdown() { + LOG(3) << "Shutting down passthrough executor"; + + _stillRunning.store(false); + + // TODO pass a time into this function + stdx::unique_lock<stdx::mutex> lock(_shutdownMutex); + bool result = _shutdownCondition.wait_for(lock, Seconds(10).toSystemDuration(), [this]() { + return _numRunningWorkerThreads.load() == 0; + }); + + return result + ? Status::OK() + : Status(ErrorCodes::Error::ExceededTimeLimit, + "passthrough executor couldn't shutdown all worker threads within time limit."); +} + +Status ServiceExecutorSynchronous::schedule(Task task, ScheduleFlags flags) { + if (!_localWorkQueue.empty()) { + /* + * In perf testing we found that yielding after running a each request produced + * at 5% performance boost in microbenchmarks if the number of worker threads + * was greater than the number of available cores. + */ + if (flags & ScheduleFlags::kMayYieldBeforeSchedule) { + if ((_localThreadIdleCounter++ & 0xf) == 0) { + markThreadIdle(); + } + if (_numRunningWorkerThreads.loadRelaxed() > _numHardwareCores) { + stdx::this_thread::yield(); + } + } + + // Execute task directly (recurse) if allowed by the caller as it produced better + // performance in testing. Try to limit the amount of recursion so we don't blow up the + // stack, even though this shouldn't happen with this executor that uses blocking network + // I/O. + if ((flags & ScheduleFlags::kMayRecurse) && + (_localRecursionDepth < synchronousServiceExecutorRecursionLimit.loadRelaxed())) { + ++_localRecursionDepth; + task(); + } else { + _localWorkQueue.emplace_back(std::move(task)); + } + return Status::OK(); + } + + // First call to schedule() for this connection, spawn a worker thread that will push jobs + // into the thread local job queue. + LOG(3) << "Starting new executor thread in passthrough mode"; + + Status status = launchServiceWorkerThread([ this, task = std::move(task) ] { + _numRunningWorkerThreads.addAndFetch(1); + + _localWorkQueue.emplace_back(std::move(task)); + while (!_localWorkQueue.empty() && _stillRunning.loadRelaxed()) { + _localRecursionDepth = 1; + _localWorkQueue.front()(); + _localWorkQueue.pop_front(); + } + + if (_numRunningWorkerThreads.subtractAndFetch(1) == 0) { + stdx::unique_lock<stdx::mutex> lock(_shutdownMutex); + stdx::notify_all_at_thread_exit(_shutdownCondition, std::move(lock)); + } + }); + + return status; +} + +void ServiceExecutorSynchronous::appendStats(BSONObjBuilder* bob) const { + BSONObjBuilder section(bob->subobjStart("serviceExecutorTaskStats")); + section << kExecutorLabel << kExecutorName << kThreadsRunning + << static_cast<int>(_numRunningWorkerThreads.loadRelaxed()); +} + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/service_executor_synchronous.h b/src/mongo/transport/service_executor_synchronous.h new file mode 100644 index 00000000000..77afb2d37e9 --- /dev/null +++ b/src/mongo/transport/service_executor_synchronous.h @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <deque> + +#include "mongo/base/status.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" +#include "mongo/transport/service_executor.h" + +namespace mongo { +namespace transport { + +/** + * The passthrough service executor emulates a thread per connection. + * Each connection has its own worker thread where jobs get scheduled. + */ +class ServiceExecutorSynchronous final : public ServiceExecutor { +public: + explicit ServiceExecutorSynchronous(ServiceContext* ctx); + + Status start() override; + Status shutdown() override; + Status schedule(Task task, ScheduleFlags flags) override; + + Mode transportMode() const override { + return Mode::kSynchronous; + } + + void appendStats(BSONObjBuilder* bob) const override; + +private: + static thread_local std::deque<Task> _localWorkQueue; + static thread_local int _localRecursionDepth; + static thread_local int64_t _localThreadIdleCounter; + + AtomicBool _stillRunning{false}; + + mutable stdx::mutex _shutdownMutex; + stdx::condition_variable _shutdownCondition; + + AtomicWord<size_t> _numRunningWorkerThreads{0}; + size_t _numHardwareCores{0}; +}; + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index 6c5f9ad3568..35694c390ba 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -178,16 +178,16 @@ private: std::shared_ptr<ServiceStateMachine> ServiceStateMachine::create(ServiceContext* svcContext, transport::SessionHandle session, - bool sync) { - return std::make_shared<ServiceStateMachine>(svcContext, std::move(session), sync); + transport::Mode transportMode) { + return std::make_shared<ServiceStateMachine>(svcContext, std::move(session), transportMode); } ServiceStateMachine::ServiceStateMachine(ServiceContext* svcContext, transport::SessionHandle session, - bool sync) + transport::Mode transportMode) : _state{State::Created}, _sep{svcContext->getServiceEntryPoint()}, - _sync(sync), + _transportMode(transportMode), _serviceContext(svcContext), _sessionHandle(session), _dbClient{svcContext->makeClient("conn", std::move(session))}, @@ -275,7 +275,7 @@ void ServiceStateMachine::_sinkCallback(Status status) { _state.store(State::Source); } - return scheduleNext(ServiceExecutor::kDeferredTask); + return scheduleNext(ServiceExecutor::kDeferredTask | ServiceExecutor::kMayYieldBeforeSchedule); } void ServiceStateMachine::_processMessage(ThreadGuard& guard) { @@ -337,11 +337,13 @@ void ServiceStateMachine::_processMessage(ThreadGuard& guard) { auto ticket = _session()->sinkMessage(toSink); _state.store(State::SinkWait); - if (_sync) { + if (_transportMode == transport::Mode::kSynchronous) { _sinkCallback(_session()->getTransportLayer()->wait(std::move(ticket))); - } else { + } else if (_transportMode == transport::Mode::kAsynchronous) { _session()->getTransportLayer()->asyncWait( std::move(ticket), [this](Status status) { _sinkCallback(status); }); + } else { + MONGO_UNREACHABLE; } } else { _state.store(State::Source); @@ -382,14 +384,16 @@ void ServiceStateMachine::_runNextInGuard(ThreadGuard& guard) { auto ticket = _session()->sourceMessage(&_inMessage); _state.store(State::SourceWait); - if (_sync) { + if (_transportMode == transport::Mode::kSynchronous) { _sourceCallback([this](auto ticket) { MONGO_IDLE_THREAD_BLOCK; return _session()->getTransportLayer()->wait(std::move(ticket)); }(std::move(ticket))); - } else { + } else if (_transportMode == transport::Mode::kAsynchronous) { _session()->getTransportLayer()->asyncWait( std::move(ticket), [this](Status status) { _sourceCallback(status); }); + } else { + MONGO_UNREACHABLE; } break; } @@ -404,10 +408,6 @@ void ServiceStateMachine::_runNextInGuard(ThreadGuard& guard) { MONGO_UNREACHABLE; } - if ((_counter++ & 0xf) == 0) { - markThreadIdle(); - } - if (state() == State::EndSession) { _cleanupSession(guard); } @@ -426,7 +426,7 @@ void ServiceStateMachine::_runNextInGuard(ThreadGuard& guard) { } void ServiceStateMachine::scheduleNext(ServiceExecutor::ScheduleFlags flags) { - _maybeScheduleFunc(_serviceContext->getServiceExecutor(), [this] { runNext(); }, flags); + _scheduleFunc([this] { runNext(); }, flags); } void ServiceStateMachine::terminateIfTagsDontMatch(transport::Session::TagMask tags) { @@ -450,6 +450,13 @@ ServiceStateMachine::State ServiceStateMachine::state() { return _state.load(); } +void ServiceStateMachine::_terminateAndLogIfError(Status status) { + if (!status.isOK()) { + warning(logger::LogComponent::kExecutor) << "Terminating session due to error: " << status; + terminateIfTagsDontMatch(transport::Session::kEmptyTagMask); + } +} + void ServiceStateMachine::_cleanupSession(ThreadGuard& guard) { _state.store(State::Ended); diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h index fab326a5a68..b89b3a34d24 100644 --- a/src/mongo/transport/service_state_machine.h +++ b/src/mongo/transport/service_state_machine.h @@ -41,6 +41,7 @@ #include "mongo/transport/service_entry_point.h" #include "mongo/transport/service_executor.h" #include "mongo/transport/session.h" +#include "mongo/transport/transport_mode.h" namespace mongo { @@ -65,17 +66,19 @@ public: */ static std::shared_ptr<ServiceStateMachine> create(ServiceContext* svcContext, transport::SessionHandle session, - bool sync); + transport::Mode transportMode); - ServiceStateMachine(ServiceContext* svcContext, transport::SessionHandle session, bool sync); + ServiceStateMachine(ServiceContext* svcContext, + transport::SessionHandle session, + transport::Mode transportMode); /* - * Any state may transition to EndSession in case of an error, otherwise the valid state - * transitions are: - * Source -> SourceWait -> Process -> SinkWait -> Source (standard RPC) - * Source -> SourceWait -> Process -> SinkWait -> Process -> SinkWait ... (exhaust) - * Source -> SourceWait -> Process -> Source (fire-and-forget) - */ + * Any state may transition to EndSession in case of an error, otherwise the valid state + * transitions are: + * Source -> SourceWait -> Process -> SinkWait -> Source (standard RPC) + * Source -> SourceWait -> Process -> SinkWait -> Process -> SinkWait ... (exhaust) + * Source -> SourceWait -> Process -> Source (fire-and-forget) + */ enum class State { Created, // The session has been created, but no operations have been performed yet Source, // Request a new Message from the network to handle @@ -135,25 +138,28 @@ private: friend class ThreadGuard; /* + * Terminates the associated transport Session if status indicate error. + * + * This will not block on the session terminating cleaning itself up, it returns immediately. + */ + void _terminateAndLogIfError(Status status); + + /* * This and scheduleFunc() are helper functions to schedule tasks on the serviceExecutor * while maintaining a shared_ptr copy to anchor the lifetime of the SSM while waiting for * callbacks to run. */ - template <typename Executor, typename Func> - void _maybeScheduleFunc(Executor* svcExec, - Func&& func, - transport::ServiceExecutor::ScheduleFlags flags) { - if (svcExec) { - uassertStatusOK(svcExec->schedule( - [ func = std::move(func), anchor = shared_from_this() ] { func(); }, flags)); - } - } - template <typename Func> void _scheduleFunc(Func&& func, transport::ServiceExecutor::ScheduleFlags flags) { - auto svcExec = _serviceContext->getServiceExecutor(); - invariant(svcExec); - _maybeScheduleFunc(svcExec, func, flags); + Status status = _serviceContext->getServiceExecutor()->schedule( + [ func = std::move(func), anchor = shared_from_this() ] { func(); }, flags); + if (!status.isOK()) { + // The service executor failed to schedule the task + // This could for example be that we failed to start + // a worker thread. Terminate this connection to + // leave the system in a valid state. + _terminateAndLogIfError(status); + } } /* @@ -189,7 +195,7 @@ private: AtomicWord<State> _state{State::Created}; ServiceEntryPoint* _sep; - bool _sync; + transport::Mode _transportMode; ServiceContext* const _serviceContext; @@ -202,7 +208,6 @@ private: bool _inExhaust = false; boost::optional<MessageCompressorId> _compressorId; Message _inMessage; - int64_t _counter = 0; AtomicWord<stdx::thread::id> _currentOwningThread; std::atomic_flag _isOwned = ATOMIC_FLAG_INIT; // NOLINT diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp index b95cff83c0f..d3a7737ef3c 100644 --- a/src/mongo/transport/service_state_machine_test.cpp +++ b/src/mongo/transport/service_state_machine_test.cpp @@ -39,6 +39,7 @@ #include "mongo/transport/mock_session.h" #include "mongo/transport/mock_ticket.h" #include "mongo/transport/service_entry_point.h" +#include "mongo/transport/service_executor_noop.h" #include "mongo/transport/service_state_machine.h" #include "mongo/transport/transport_layer_mock.h" #include "mongo/unittest/unittest.h" @@ -218,12 +219,15 @@ protected: _sep = sep.get(); sc->setServiceEntryPoint(std::move(sep)); + sc->setServiceExecutor(stdx::make_unique<ServiceExecutorNoop>(sc)); + auto tl = stdx::make_unique<MockTL>(); _tl = tl.get(); sc->setTransportLayer(std::move(tl)); _tl->start().transitional_ignore(); - _ssm = ServiceStateMachine::create(getGlobalServiceContext(), _tl->createSession(), true); + _ssm = ServiceStateMachine::create( + getGlobalServiceContext(), _tl->createSession(), transport::Mode::kSynchronous); _tl->setSSM(_ssm.get()); } diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h index 2a0a7427eab..13122b6a2c4 100644 --- a/src/mongo/transport/session_asio.h +++ b/src/mongo/transport/session_asio.h @@ -58,7 +58,7 @@ public: : _socket(std::move(socket)), _tl(tl) { std::error_code ec; - _socket.non_blocking(_tl->_listenerOptions.async, ec); + _socket.non_blocking(_tl->_listenerOptions.transportMode == Mode::kAsynchronous, ec); fassert(40490, ec.value() == 0); auto family = endpointToSockAddr(_socket.local_endpoint()).getType(); diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h index c29bc63e56d..11cc43e489f 100644 --- a/src/mongo/transport/transport_layer_asio.h +++ b/src/mongo/transport/transport_layer_asio.h @@ -39,6 +39,7 @@ #include "mongo/stdx/thread.h" #include "mongo/transport/ticket_impl.h" #include "mongo/transport/transport_layer.h" +#include "mongo/transport/transport_mode.h" #include "mongo/util/net/hostandport.h" #include "mongo/util/net/ssl_options.h" #include "mongo/util/net/ssl_types.h" @@ -80,10 +81,10 @@ public: #ifndef _WIN32 bool useUnixSockets = true; // whether to allow UNIX sockets in ipList #endif - bool enableIPv6 = false; // whether to allow IPv6 sockets in ipList - bool async = false; // whether accepted sockets should be put into - // non-blocking mode after they're accepted - size_t maxConns = DEFAULT_MAX_CONN; // maximum number of active connections + bool enableIPv6 = false; // whether to allow IPv6 sockets in ipList + Mode transportMode = Mode::kSynchronous; // whether accepted sockets should be put into + // non-blocking mode after they're accepted + size_t maxConns = DEFAULT_MAX_CONN; // maximum number of active connections }; TransportLayerASIO(const Options& opts, ServiceEntryPoint* sep); diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp index 0bcd5b3a76e..82e23e1a049 100644 --- a/src/mongo/transport/transport_layer_manager.cpp +++ b/src/mongo/transport/transport_layer_manager.cpp @@ -35,6 +35,7 @@ #include "mongo/db/service_context.h" #include "mongo/stdx/memory.h" #include "mongo/transport/service_executor_adaptive.h" +#include "mongo/transport/service_executor_synchronous.h" #include "mongo/transport/session.h" #include "mongo/transport/transport_layer_asio.h" #include "mongo/transport/transport_layer_legacy.h" @@ -129,8 +130,12 @@ std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig( auto sep = ctx->getServiceEntryPoint(); if (config->transportLayer == "asio") { transport::TransportLayerASIO::Options opts(config); - if (config->serviceExecutor != "synchronous") { - opts.async = true; + if (config->serviceExecutor == "adaptive") { + opts.transportMode = transport::Mode::kAsynchronous; + } else if (config->serviceExecutor == "synchronous") { + opts.transportMode = transport::Mode::kSynchronous; + } else { + MONGO_UNREACHABLE; } auto transportLayerASIO = stdx::make_unique<transport::TransportLayerASIO>(opts, sep); @@ -138,11 +143,14 @@ std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig( if (config->serviceExecutor == "adaptive") { ctx->setServiceExecutor(stdx::make_unique<ServiceExecutorAdaptive>( ctx, transportLayerASIO->getIOContext())); + } else if (config->serviceExecutor == "synchronous") { + ctx->setServiceExecutor(stdx::make_unique<ServiceExecutorSynchronous>(ctx)); } transportLayer = std::move(transportLayerASIO); } else if (serverGlobalParams.transportLayer == "legacy") { transport::TransportLayerLegacy::Options opts(config); transportLayer = stdx::make_unique<transport::TransportLayerLegacy>(opts, sep); + ctx->setServiceExecutor(stdx::make_unique<ServiceExecutorSynchronous>(ctx)); } std::vector<std::unique_ptr<TransportLayer>> retVector; diff --git a/src/mongo/transport/transport_mode.h b/src/mongo/transport/transport_mode.h new file mode 100644 index 00000000000..2cbefef6699 --- /dev/null +++ b/src/mongo/transport/transport_mode.h @@ -0,0 +1,35 @@ +/** +* Copyright (C) 2017 MongoDB Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects for +* all of the code used other than as permitted herein. If you modify file(s) +* with this exception, you may extend this exception to your version of the +* file(s), but you are not obligated to do so. If you do not wish to do so, +* delete this exception statement from your version. If you delete this +* exception statement from all source files in the program, then also delete +* it in the license file. +*/ + +#pragma once + +namespace mongo { +namespace transport { +enum class Mode { kAsynchronous = 0, kSynchronous = 1 }; +} // namespace transport +} // namespace mongo |