diff options
author | Jonathan Reams <jbreams@mongodb.com> | 2017-06-27 14:04:53 -0400 |
---|---|---|
committer | Jonathan Reams <jbreams@mongodb.com> | 2017-07-14 16:19:40 -0400 |
commit | 1622c6b7a7971ea7fbdd4b3d5b10455e48e5cf69 (patch) | |
tree | 4cad8d64f1ace9bc13aea786b460872b1ce466c3 /src/mongo | |
parent | e0b06a9da3c0c6071f4e636f3c3ba3e8851c5db0 (diff) | |
download | mongo-1622c6b7a7971ea7fbdd4b3d5b10455e48e5cf69.tar.gz |
SERVER-29402 Implement ServiceExecutor and fixed-size test executor
Diffstat (limited to 'src/mongo')
41 files changed, 1027 insertions, 383 deletions
diff --git a/src/mongo/client/scoped_db_conn_test.cpp b/src/mongo/client/scoped_db_conn_test.cpp index 1a9d7581424..c26428c7539 100644 --- a/src/mongo/client/scoped_db_conn_test.cpp +++ b/src/mongo/client/scoped_db_conn_test.cpp @@ -90,6 +90,12 @@ public: _threads.emplace_back(&DummyServiceEntryPoint::run, this, std::move(session)); } + // This is not used in this test, so it is only here to complete the interface of + // ServiceEntryPoint + void endAllSessions(transport::Session::TagMask tags) override { + MONGO_UNREACHABLE; + } + void setReplyDelay(Milliseconds delay) { _replyDelay = delay; } diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index ed571c55ca3..9f16eda1e46 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -31,7 +31,10 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/db/commands', - ] + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/transport/service_executor', + ], ) env.Library( diff --git a/src/mongo/db/commands/feature_compatibility_version.cpp b/src/mongo/db/commands/feature_compatibility_version.cpp index 9184c309049..b8a54bead96 100644 --- a/src/mongo/db/commands/feature_compatibility_version.cpp +++ b/src/mongo/db/commands/feature_compatibility_version.cpp @@ -41,7 +41,7 @@ #include "mongo/db/storage/storage_engine.h" #include "mongo/db/write_concern_options.h" #include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/transport/transport_layer.h" +#include "mongo/transport/service_entry_point.h" #include "mongo/util/log.h" namespace mongo { @@ -185,7 +185,7 @@ void FeatureCompatibilityVersion::set(OperationContext* opCtx, StringData versio // Close all internal connections to versions lower than 3.6. if (version == FeatureCompatibilityVersionCommandParser::kVersion36) { - opCtx->getServiceContext()->getTransportLayer()->endAllSessions( + opCtx->getServiceContext()->getServiceEntryPoint()->endAllSessions( transport::Session::kLatestVersionInternalClientKeepOpen | transport::Session::kExternalClientKeepOpen); } diff --git a/src/mongo/db/commands/server_status.cpp b/src/mongo/db/commands/server_status.cpp index 1d559708d73..4464df34a33 100644 --- a/src/mongo/db/commands/server_status.cpp +++ b/src/mongo/db/commands/server_status.cpp @@ -295,6 +295,10 @@ public: BSONObjBuilder b; networkCounter.append(b); appendMessageCompressionStats(&b); + auto executor = opCtx->getServiceContext()->getServiceExecutor(); + if (executor) + executor->appendStats(&b); + return b.obj(); } diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 569958ccbad..7ead195653e 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -737,6 +737,14 @@ ExitCode _initAndListen(int listenPort) { return EXIT_NET_ERROR; } + if (globalServiceContext->getServiceExecutor()) { + start = globalServiceContext->getServiceExecutor()->start(); + if (!start.isOK()) { + error() << "Failed to start the service executor: " << start; + return EXIT_NET_ERROR; + } + } + globalServiceContext->notifyStartupComplete(); #ifndef _WIN32 mongo::signalForkSuccess(); @@ -997,18 +1005,22 @@ static void shutdownTask() { log(LogComponent::kNetwork) << "shutdown: going to close all sockets because ASAN is active..."; + + // Shutdown the TransportLayer so that new connections aren't accepted tl->shutdown(); + // Request that all sessions end. + sep->endAllSessions(transport::Session::kEmptyTagMask); + // Close all sockets in a detached thread, and then wait for the number of active // connections to reach zero. Give the detached background thread a 10 second deadline. If // we haven't closed drained all active operations within that deadline, just keep going // with shutdown: the OS will do it for us when the process terminates. - stdx::packaged_task<void()> dryOutTask([sep] { // There isn't currently a way to wait on the TicketHolder to have all its tickets back, // unfortunately. So, busy wait in this detached thread. while (true) { - const auto runningWorkers = sep->getNumberOfActiveWorkerThreads(); + const auto runningWorkers = sep->getNumberOfConnections(); if (runningWorkers == 0) { log(LogComponent::kNetwork) << "shutdown: no running workers found..."; @@ -1027,6 +1039,12 @@ static void shutdownTask() { log(LogComponent::kNetwork) << "shutdown: exhausted grace period for" << " active workers to drain; continuing with shutdown... "; } + + // Shutdown and wait for the service executor to exit + auto svcExec = serviceContext->getServiceExecutor(); + if (svcExec) { + fassertStatusOK(40550, svcExec->shutdown()); + } } #endif diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index e14878e88f4..bff3aa40f5e 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -90,8 +90,8 @@ #include "mongo/stdx/functional.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/thread.h" +#include "mongo/transport/service_entry_point.h" #include "mongo/transport/session.h" -#include "mongo/transport/transport_layer.h" #include "mongo/util/assert_util.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/exit.h" @@ -698,7 +698,7 @@ HostAndPort ReplicationCoordinatorExternalStateImpl::getClientHostAndPort( } void ReplicationCoordinatorExternalStateImpl::closeConnections() { - _service->getTransportLayer()->endAllSessions(transport::Session::kKeepOpen); + _service->getServiceEntryPoint()->endAllSessions(transport::Session::kKeepOpen); } void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationContext* opCtx) { diff --git a/src/mongo/db/server_options.h b/src/mongo/db/server_options.h index 502785e1982..c8775b6a56e 100644 --- a/src/mongo/db/server_options.h +++ b/src/mongo/db/server_options.h @@ -73,10 +73,14 @@ struct ServerGlobalParams { int defaultLocalThresholdMillis = 15; // --localThreshold in ms to consider a node local bool moveParanoia = false; // for move chunk paranoia - bool noUnixSocket = false; // --nounixsocket - bool doFork = false; // --fork - std::string socket = "/tmp"; // UNIX domain socket directory - std::string transportLayer; // --transportLayer (must be either "asio" or "legacy") + bool noUnixSocket = false; // --nounixsocket + bool doFork = false; // --fork + std::string socket = "/tmp"; // UNIX domain socket directory + std::string transportLayer; // --transportLayer (must be either "asio" or "legacy") + + // --serviceExecutor ("adaptive", "synchronous", or "fixedForTesting") + std::string serviceExecutor; + int maxConns = DEFAULT_MAX_CONN; // Maximum number of simultaneous open connections. int unixSocketPermissions = DEFAULT_UNIX_PERMS; // permissions for the UNIX domain socket diff --git a/src/mongo/db/server_options_helpers.cpp b/src/mongo/db/server_options_helpers.cpp index 3d968ebabd1..52bf5e2022c 100644 --- a/src/mongo/db/server_options_helpers.cpp +++ b/src/mongo/db/server_options_helpers.cpp @@ -211,6 +211,14 @@ Status addGeneralServerOptions(moe::OptionSection* options) { .setDefault(moe::Value("asio")); options + ->addOptionChaining("net.serviceExecutor", + "serviceExecutor", + moe::String, + "sets the service executor implementation") + .hidden() + .setDefault(moe::Value("synchronous")); + + options ->addOptionChaining( "logpath", "logpath", @@ -808,6 +816,21 @@ Status storeServerOptions(const moe::Environment& params) { } } + if (params.count("net.serviceExecutor")) { + if (serverGlobalParams.transportLayer == "legacy") { + return {ErrorCodes::BadValue, + "Cannot specify a serviceExecutor with the legacy transportLayer"}; + } + const auto valid = {"synchronous"_sd, "fixedForTesting"_sd}; + auto value = params["net.serviceExecutor"].as<std::string>(); + if (std::find(valid.begin(), valid.end(), value) == valid.end()) { + return {ErrorCodes::BadValue, "Unsupported value for serviceExecutor"}; + } + serverGlobalParams.serviceExecutor = value; + } else { + serverGlobalParams.serviceExecutor = "synchronous"; + } + if (params.count("security.transitionToAuth")) { serverGlobalParams.transitionToAuth = params["security.transitionToAuth"].as<bool>(); } diff --git a/src/mongo/db/service_context.cpp b/src/mongo/db/service_context.cpp index 805a6f0802e..2b4eb81c354 100644 --- a/src/mongo/db/service_context.cpp +++ b/src/mongo/db/service_context.cpp @@ -184,6 +184,11 @@ ServiceEntryPoint* ServiceContext::getServiceEntryPoint() const { return _serviceEntryPoint.get(); } +transport::ServiceExecutor* ServiceContext::getServiceExecutor() const { + return _serviceExecutor.get(); +} + + TickSource* ServiceContext::getTickSource() const { return _tickSource.get(); } @@ -216,6 +221,10 @@ void ServiceContext::setTransportLayer(std::unique_ptr<transport::TransportLayer _transportLayer = std::move(tl); } +void ServiceContext::setServiceExecutor(std::unique_ptr<transport::ServiceExecutor> exec) { + _serviceExecutor = std::move(exec); +} + void ServiceContext::ClientDeleter::operator()(Client* client) const { ServiceContext* const service = client->getServiceContext(); { diff --git a/src/mongo/db/service_context.h b/src/mongo/db/service_context.h index 2386e3065a7..db486a72857 100644 --- a/src/mongo/db/service_context.h +++ b/src/mongo/db/service_context.h @@ -40,6 +40,7 @@ #include "mongo/stdx/functional.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" +#include "mongo/transport/service_executor_base.h" #include "mongo/transport/session.h" #include "mongo/util/clock_source.h" #include "mongo/util/decorable.h" @@ -371,6 +372,14 @@ public: ServiceEntryPoint* getServiceEntryPoint() const; /** + * Get the service executor for the service context. + * + * See ServiceStateMachine for how this is used. Some configurations may not have a service + * executor registered and this will return a nullptr. + */ + transport::ServiceExecutor* getServiceExecutor() const; + + /** * Waits for the ServiceContext to be fully initialized and for all TransportLayers to have been * added/started. * @@ -444,6 +453,11 @@ public: */ void setTransportLayer(std::unique_ptr<transport::TransportLayer> tl); + /** + * Binds the service executor to the service context + */ + void setServiceExecutor(std::unique_ptr<transport::ServiceExecutor> exec); + protected: ServiceContext(); @@ -487,6 +501,11 @@ private: std::unique_ptr<ServiceEntryPoint> _serviceEntryPoint; /** + * The ServiceExecutor + */ + std::unique_ptr<transport::ServiceExecutor> _serviceExecutor; + + /** * Vector of registered observers. */ std::vector<std::unique_ptr<ClientObserver>> _clientObservers; diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 4558e93701a..a5c8746e759 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -353,6 +353,14 @@ static ExitCode runMongosServer() { return EXIT_NET_ERROR; } + if (auto svcExec = getGlobalServiceContext()->getServiceExecutor()) { + start = svcExec->start(); + if (!start.isOK()) { + error() << "Failed to start the service executor: " << start; + return EXIT_NET_ERROR; + } + } + getGlobalServiceContext()->notifyStartupComplete(); #if !defined(_WIN32) mongo::signalForkSuccess(); diff --git a/src/mongo/shell/servers.js b/src/mongo/shell/servers.js index cd2ebd5d34f..995dfd993cf 100644 --- a/src/mongo/shell/servers.js +++ b/src/mongo/shell/servers.js @@ -1054,6 +1054,15 @@ var MongoRunner, _startMongod, startMongoProgram, runMongoProgram, startMongoPro if (jsTest.options().auth) { argArray.push(...['--setParameter', "enableLocalhostAuthBypass=false"]); } + + if (jsTest.options().serviceExecutor && + (!programVersion || (parseInt(programVersion.split(".")[0]) >= 3 && + parseInt(programVersion.split(".")[1]) >= 5))) { + if (!argArrayContains("serviceExecutor")) { + argArray.push(...["--serviceExecutor", jsTest.options().serviceExecutor]); + } + } + // Since options may not be backward compatible, mongos options are not // set on older versions, e.g., mongos-3.0. if (programName.endsWith('mongos')) { diff --git a/src/mongo/shell/utils.js b/src/mongo/shell/utils.js index 2cb79d743ab..eff1c4aea03 100644 --- a/src/mongo/shell/utils.js +++ b/src/mongo/shell/utils.js @@ -198,6 +198,7 @@ var _jsTestOptions = {enableTestCommands: true}; // Test commands should be ena jsTestOptions = function() { if (TestData) { return Object.merge(_jsTestOptions, { + serviceExecutor: TestData.serviceExecutor, setParameters: TestData.setParameters, setParametersMongos: TestData.setParametersMongos, storageEngine: TestData.storageEngine, diff --git a/src/mongo/transport/SConscript b/src/mongo/transport/SConscript index ce0679290c9..5fe57bfa9e5 100644 --- a/src/mongo/transport/SConscript +++ b/src/mongo/transport/SConscript @@ -36,6 +36,9 @@ env.Library( LIBDEPS=[ 'transport_layer', ], + LIBDEPS_PRIVATE=[ + 'service_executor', + ], ) env.Library( @@ -59,12 +62,27 @@ tlEnv.Library( ], LIBDEPS=[ 'transport_layer_common', - '$BUILD_DIR/mongo/db/auth/authentication_restriction', - '$BUILD_DIR/third_party/shim_asio', '$BUILD_DIR/mongo/base/system_error', + '$BUILD_DIR/mongo/db/auth/authentication_restriction', '$BUILD_DIR/mongo/db/server_options_core', '$BUILD_DIR/mongo/db/service_context', '$BUILD_DIR/mongo/db/stats/counters', + '$BUILD_DIR/third_party/shim_asio', + ], +) + +tlEnv.Library( + target='service_executor', + source=[ + 'service_executor_base.cpp', + 'service_executor_fixed.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/service_context', + ], + LIBDEPS_PRIVATE=[ + "$BUILD_DIR/mongo/util/processinfo", + '$BUILD_DIR/third_party/shim_asio', ], ) diff --git a/src/mongo/transport/service_entry_point.h b/src/mongo/transport/service_entry_point.h index b03bb65825a..834e76ab16b 100644 --- a/src/mongo/transport/service_entry_point.h +++ b/src/mongo/transport/service_entry_point.h @@ -53,6 +53,11 @@ public: virtual void startSession(transport::SessionHandle session) = 0; /** + * End all sessions that do not match the mask in tags. + */ + virtual void endAllSessions(transport::Session::TagMask tags) = 0; + + /** * Processes a request and fills out a DbResponse. */ virtual DbResponse handleRequest(OperationContext* opCtx, const Message& request) = 0; diff --git a/src/mongo/transport/service_entry_point_impl.cpp b/src/mongo/transport/service_entry_point_impl.cpp index a4f5c6a665d..cd2fa26ea1e 100644 --- a/src/mongo/transport/service_entry_point_impl.cpp +++ b/src/mongo/transport/service_entry_point_impl.cpp @@ -38,6 +38,7 @@ #include "mongo/transport/service_entry_point_utils.h" #include "mongo/transport/service_state_machine.h" #include "mongo/transport/session.h" +#include "mongo/util/log.h" #include "mongo/util/processinfo.h" #include "mongo/util/scopeguard.h" @@ -52,14 +53,30 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { stdx::make_unique<RestrictionEnvironment>(*remoteAddr, *localAddr); RestrictionEnvironment::set(session, std::move(restrictionEnvironment)); - // Pass ownership of the transport::SessionHandle into our worker thread. When this - // thread exits, the session will end. - // - launchServiceWorkerThread([ this, session = std::move(session) ]() mutable { + SSMListIterator ssmIt; + + const auto sync = (_svcCtx->getServiceExecutor() == nullptr); + auto ssm = ServiceStateMachine::create(_svcCtx, std::move(session), sync); + { + stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex); + ssmIt = _sessions.emplace(_sessions.begin(), ssm); + } + + ssm->setCleanupHook([this, ssmIt] { + stdx::lock_guard<decltype(_sessionsMutex)> lk(_sessionsMutex); + _sessions.erase(ssmIt); + }); + + if (!sync) { + dassert(_svcCtx->getServiceExecutor()); + ssm->scheduleNext(); + return; + } + + launchServiceWorkerThread([ this, ssm = std::move(ssm) ]() mutable { _nWorkers.addAndFetch(1); - const auto guard = MakeGuard([this] { _nWorkers.subtractAndFetch(1); }); + const auto guard = MakeGuard([this, &ssm] { _nWorkers.subtractAndFetch(1); }); - ServiceStateMachine ssm(_svcCtx, std::move(session), true); const auto numCores = [] { ProcessInfo p; if (auto availCores = p.getNumAvailableCores()) { @@ -68,12 +85,53 @@ void ServiceEntryPointImpl::startSession(transport::SessionHandle session) { return static_cast<unsigned>(p.getNumCores()); }(); - while (ssm.state() != ServiceStateMachine::State::Ended) { - ssm.runNext(); + while (ssm->state() != ServiceStateMachine::State::Ended) { + ssm->runNext(); + + /* + * In perf testing we found that yielding after running a each request produced + * at 5% performance boost in microbenchmarks if the number of worker threads + * was greater than the number of available cores. + */ if (_nWorkers.load() > numCores) stdx::this_thread::yield(); } }); } +void ServiceEntryPointImpl::endAllSessions(transport::Session::TagMask tags) { + SSMList connsToEnd; + + // While holding the _sesionsMutex, loop over all the current connections, and if their tags + // do not match the requested tags to skip, create a copy of their shared_ptr and place it in + // connsToEnd. + // + // This will ensure that sessions to be ended will live at least long enough for us to call + // their terminate() function, even if they've already ended because of an i/o error. + { + stdx::unique_lock<decltype(_sessionsMutex)> lk(_sessionsMutex); + for (auto& ssm : _sessions) { + if (ssm->session()->getTags() & tags) { + log() << "Skip closing connection for connection # " << ssm->session()->id(); + } else { + connsToEnd.emplace_back(ssm); + } + } + } + + // Loop through all the connections we marked for ending and call terminate on them. They will + // then remove themselves from _sessions whenever they transition to the next state. + // + // If they've already ended, then this is a noop, and the SSM will be destroyed when connsToEnd + // goes out of scope. + for (auto& ssm : connsToEnd) { + ssm->terminate(); + } +} + +std::size_t ServiceEntryPointImpl::getNumberOfConnections() const { + stdx::unique_lock<decltype(_sessionsMutex)> lk(_sessionsMutex); + return _sessions.size(); +} + } // namespace mongo diff --git a/src/mongo/transport/service_entry_point_impl.h b/src/mongo/transport/service_entry_point_impl.h index 6dd6047d9d2..fa1f92eb545 100644 --- a/src/mongo/transport/service_entry_point_impl.h +++ b/src/mongo/transport/service_entry_point_impl.h @@ -28,12 +28,13 @@ #pragma once -#include <vector> #include "mongo/base/disallow_copying.h" #include "mongo/platform/atomic_word.h" +#include "mongo/stdx/list.h" #include "mongo/stdx/mutex.h" #include "mongo/transport/service_entry_point.h" +#include "mongo/transport/service_state_machine.h" namespace mongo { class ServiceContext; @@ -57,13 +58,19 @@ public: void startSession(transport::SessionHandle session) final; - std::size_t getNumberOfActiveWorkerThreads() const { - return _nWorkers.load(); - } + void endAllSessions(transport::Session::TagMask tags) final; + + std::size_t getNumberOfConnections() const; private: - ServiceContext* _svcCtx; + using SSMList = stdx::list<std::shared_ptr<ServiceStateMachine>>; + using SSMListIterator = SSMList::iterator; + + ServiceContext* const _svcCtx; AtomicWord<std::size_t> _nWorkers; + + mutable stdx::mutex _sessionsMutex; + SSMList _sessions; }; } // namespace mongo diff --git a/src/mongo/transport/service_entry_point_mock.cpp b/src/mongo/transport/service_entry_point_mock.cpp index 0cc465aff92..0f1fd2f07bd 100644 --- a/src/mongo/transport/service_entry_point_mock.cpp +++ b/src/mongo/transport/service_entry_point_mock.cpp @@ -46,14 +46,7 @@ ServiceEntryPointMock::ServiceEntryPointMock(transport::TransportLayer* tl) : _tl(tl), _inShutdown(false) {} ServiceEntryPointMock::~ServiceEntryPointMock() { - { - stdx::lock_guard<stdx::mutex> lk(_shutdownLock); - _inShutdown = true; - } - - for (auto& t : _threads) { - t.join(); - } + endAllSessions(transport::Session::kEmptyTagMask); } void ServiceEntryPointMock::startSession(transport::SessionHandle session) { @@ -106,4 +99,15 @@ DbResponse ServiceEntryPointMock::handleRequest(OperationContext* opCtx, const M return {Message(b.release()), ""}; } +void ServiceEntryPointMock::endAllSessions(transport::Session::TagMask) { + { + stdx::lock_guard<stdx::mutex> lk(_shutdownLock); + _inShutdown = true; + } + + for (auto& t : _threads) { + t.join(); + } +} + } // namespace mongo diff --git a/src/mongo/transport/service_entry_point_mock.h b/src/mongo/transport/service_entry_point_mock.h index 58c6149bf7d..719454f16dd 100644 --- a/src/mongo/transport/service_entry_point_mock.h +++ b/src/mongo/transport/service_entry_point_mock.h @@ -65,6 +65,8 @@ public: */ void startSession(transport::SessionHandle session) override; + void endAllSessions(transport::Session::TagMask tags) override; + DbResponse handleRequest(OperationContext* opCtx, const Message& request) override; private: diff --git a/src/mongo/transport/service_entry_point_test_suite.cpp b/src/mongo/transport/service_entry_point_test_suite.cpp index a774285a133..f1efcbf8580 100644 --- a/src/mongo/transport/service_entry_point_test_suite.cpp +++ b/src/mongo/transport/service_entry_point_test_suite.cpp @@ -139,10 +139,6 @@ void ServiceEntryPointTestSuite::MockTLHarness::end(const SessionHandle& session return _end(session); } -void ServiceEntryPointTestSuite::MockTLHarness::endAllSessions(Session::TagMask tags) { - return _endAllSessions(tags); -} - Status ServiceEntryPointTestSuite::MockTLHarness::setup() { return Status::OK(); } diff --git a/src/mongo/transport/service_entry_point_test_suite.h b/src/mongo/transport/service_entry_point_test_suite.h index 45ebc589978..7c31e38f5b0 100644 --- a/src/mongo/transport/service_entry_point_test_suite.h +++ b/src/mongo/transport/service_entry_point_test_suite.h @@ -112,7 +112,6 @@ public: Stats sessionStats() override; void end(const transport::SessionHandle& session) override; - void endAllSessions(transport::Session::TagMask tags) override; Status setup() override; Status start() override; void shutdown() override; @@ -128,8 +127,6 @@ public: stdx::function<void(transport::Ticket, TicketCallback)> _asyncWait; stdx::function<void(const transport::SessionHandle&)> _end; stdx::function<void(SEPTestSession& session)> _destroy_hook; - stdx::function<void(transport::Session::TagMask tags)> _endAllSessions = - [](transport::Session::TagMask tags) {}; stdx::function<Status(void)> _start = [] { return Status::OK(); }; stdx::function<void(void)> _shutdown = [] {}; diff --git a/src/mongo/transport/service_executor.h b/src/mongo/transport/service_executor.h new file mode 100644 index 00000000000..fc2d442eed5 --- /dev/null +++ b/src/mongo/transport/service_executor.h @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/status.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/stdx/functional.h" + +namespace mongo { +// This needs to be forward declared here because the service_context.h is a circular dependency. +class ServiceContext; + +namespace transport { + +/* + * This is the interface for all ServiceExecutors. + */ +class ServiceExecutor { +public: + virtual ~ServiceExecutor() = default; + using Task = stdx::function<void()>; + + /* + * Starts the ServiceExecutor. This may create threads even if no tasks are scheduled. + */ + virtual Status start() = 0; + + /* + * Schedules a task with the ServiceExecutor and returns immediately. + * + * This is guaranteed to unwind the stack before running the task, although the task may be + * run later in the same thread. + */ + virtual Status schedule(Task task) = 0; + + /* + * Stops and joins the ServiceExecutor. Any outstanding tasks will not be executed, and any + * associated callbacks waiting on I/O may get called with an error code. + * + * This should only be called during server shutdown to gracefully destroy the ServiceExecutor + */ + virtual Status shutdown() = 0; + + /* + * Appends statistics about task scheduling to a BSONObjBuilder for serverStatus output. + */ + virtual void appendStats(BSONObjBuilder* bob) const = 0; +}; + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/service_executor_base.cpp b/src/mongo/transport/service_executor_base.cpp new file mode 100644 index 00000000000..b2284c13f23 --- /dev/null +++ b/src/mongo/transport/service_executor_base.cpp @@ -0,0 +1,104 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor; + +#include "mongo/platform/basic.h" + +#include "mongo/transport/service_executor_base.h" + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/service_context.h" + +namespace mongo { +namespace transport { +namespace { + +constexpr auto kTotalScheduled = "totalScheduled"_sd; +constexpr auto kTotalExecuted = "totalExecuted"_sd; +constexpr auto kQueueDepth = "queueDepth"_sd; +constexpr auto kTotalTimeRunningUs = "totalTimeRunningMicros"_sd; +constexpr auto kTotalTimeQueuedUs = "totalTimeQueuedMicros"_sd; + +int64_t ticksToMicros(TickSource::Tick ticks, TickSource* tickSource) { + invariant(tickSource->getTicksPerSecond() > 1000000); + static const auto ticksPerMicro = tickSource->getTicksPerSecond() / 1000000; + return ticks / ticksPerMicro; +} + +} // namespace + +ServiceExecutorBase::ServiceExecutorBase(ServiceContext* ctx) : _tickSource{ctx->getTickSource()} {} + +Status ServiceExecutorBase::schedule(ServiceExecutorBase::Task task) { + + const auto scheduledTime = _tickSource->getTicks(); + auto wrapped = [ this, task = std::move(task), scheduledTime ] { + auto start = _tickSource->getTicks(); + task(); + auto end = _tickSource->getTicks(); + _outstandingTasks.subtractAndFetch(1); + _tasksExecuted.addAndFetch(1); + _ticksRunning.addAndFetch(end - start); + _ticksQueued.addAndFetch(start - scheduledTime); + }; + + auto ret = _schedule(std::move(wrapped)); + if (ret.isOK()) { + _tasksScheduled.addAndFetch(1); + _outstandingTasks.addAndFetch(1); + } + + return ret; +} + +ServiceExecutorBase::Stats ServiceExecutorBase::getStats() const { + return {_ticksRunning.load(), + _ticksQueued.load(), + _tasksExecuted.load(), + _tasksScheduled.load(), + _outstandingTasks.load()}; +} + +void ServiceExecutorBase::appendStats(BSONObjBuilder* bob) const { + const auto stats = getStats(); + + BSONObjBuilder section(bob->subobjStart("serviceExecutorTaskStats")); + section << kTotalScheduled << static_cast<int64_t>(stats.tasksScheduled) << kTotalExecuted + << static_cast<int64_t>(stats.tasksExecuted) << kQueueDepth << stats.outstandingTasks + << kTotalTimeRunningUs << ticksToMicros(stats.ticksRunning, _tickSource) + << kTotalTimeQueuedUs << ticksToMicros(stats.ticksQueued, _tickSource); + section.doneFast(); +} + +TickSource* ServiceExecutorBase::tickSource() const { + return _tickSource; +} + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/service_executor_base.h b/src/mongo/transport/service_executor_base.h new file mode 100644 index 00000000000..0cc024efaa8 --- /dev/null +++ b/src/mongo/transport/service_executor_base.h @@ -0,0 +1,77 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/platform/atomic_word.h" +#include "mongo/transport/service_executor.h" +#include "mongo/util/tick_source.h" + +namespace mongo { +namespace transport { +/* + * This is the base class of ServiceExecutors. + * + * Service executors should derive from this class and implement scheduleImpl(). They may + * get timing/counter statistics by calling getStats(). + */ +class ServiceExecutorBase : public ServiceExecutor { +public: + Status schedule(Task task) final; + + struct Stats { + TickSource::Tick ticksRunning; // Total number of ticks spent running tasks + TickSource::Tick ticksQueued; // Total number of ticks tasks have spent waiting to run + int64_t tasksExecuted; // Total number of tasks executed + int64_t tasksScheduled; // Total number of tasks scheduled + int64_t outstandingTasks; // Current number of tasks waiting to be run + }; + + Stats getStats() const; + void appendStats(BSONObjBuilder* bob) const final; + +protected: + explicit ServiceExecutorBase(ServiceContext* ctx); + + TickSource* tickSource() const; + +private: + // Sub-classes should implement this function to actually schedule the task. It will be called + // by schedule() with a wrapped task that does all the necessary stats/timing tracking. + virtual Status _schedule(Task task) = 0; + + TickSource* _tickSource; + AtomicWord<TickSource::Tick> _ticksRunning{0}; + AtomicWord<TickSource::Tick> _ticksQueued{0}; + AtomicWord<int64_t> _tasksExecuted{0}; + AtomicWord<int64_t> _tasksScheduled{0}; + AtomicWord<int64_t> _outstandingTasks{0}; +}; + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/service_executor_fixed.cpp b/src/mongo/transport/service_executor_fixed.cpp new file mode 100644 index 00000000000..cd299a75ac4 --- /dev/null +++ b/src/mongo/transport/service_executor_fixed.cpp @@ -0,0 +1,102 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kExecutor; + +#include "mongo/platform/basic.h" + +#include "mongo/transport/service_executor_fixed.h" + +#include "mongo/db/server_parameters.h" +#include "mongo/util/log.h" +#include "mongo/util/processinfo.h" + +#include <asio.hpp> + +namespace mongo { +namespace transport { +namespace { +MONGO_EXPORT_STARTUP_SERVER_PARAMETER(fixedServiceExecutorNumThreads, int, -1); + +} // namespace + +ServiceExecutorFixed::ServiceExecutorFixed(ServiceContext* ctx, + std::shared_ptr<asio::io_context> ioCtx) + : ServiceExecutorBase(ctx), _ioContext(std::move(ioCtx)) {} + +ServiceExecutorFixed::~ServiceExecutorFixed() { + invariant(!_isRunning.load()); +} + +Status ServiceExecutorFixed::start() { + invariant(!_isRunning.load()); + + auto threadCount = fixedServiceExecutorNumThreads; + if (threadCount == -1) { + ProcessInfo pi; + threadCount = pi.getNumAvailableCores().value_or(pi.getNumCores()); + log() << "No thread count configured for fixed executor. Using number of cores: " + << threadCount; + } + + _isRunning.store(true); + for (auto i = 0; i < threadCount; i++) { + _threads.push_back(stdx::thread([this, i] { + auto threadId = i + 1; + LOG(3) << "Starting worker thread " << threadId; + asio::io_context::work work(*_ioContext); + while (_isRunning.load()) { + _ioContext->run(); + } + LOG(3) << "Exiting worker thread " << threadId; + })); + } + + return Status::OK(); +} + +Status ServiceExecutorFixed::shutdown() { + invariant(_isRunning.load()); + + _isRunning.store(false); + _ioContext->stop(); + for (auto& thread : _threads) { + thread.join(); + } + _threads.clear(); + + return Status::OK(); +} + +Status ServiceExecutorFixed::_schedule(ServiceExecutorFixed::Task task) { + _ioContext->post(std::move(task)); + return Status::OK(); +} + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/service_executor_fixed.h b/src/mongo/transport/service_executor_fixed.h new file mode 100644 index 00000000000..14203cd794f --- /dev/null +++ b/src/mongo/transport/service_executor_fixed.h @@ -0,0 +1,61 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/stdx/thread.h" +#include "mongo/transport/service_executor_base.h" +#include "mongo/transport/transport_layer_asio.h" + +namespace mongo { +namespace transport { + +/** + * This is an ASIO-based fixed-size ServiceExecutor. It should only be used for testing because + * it won't add any threads if all threads become blocked. + */ +class ServiceExecutorFixed : public ServiceExecutorBase { +public: + explicit ServiceExecutorFixed(ServiceContext* ctx, std::shared_ptr<asio::io_context> ioCtx); + virtual ~ServiceExecutorFixed(); + + Status start() final; + Status shutdown() final; + +private: + Status _schedule(Task task) final; + + std::shared_ptr<asio::io_context> _ioContext; + std::vector<stdx::thread> _threads; + AtomicWord<bool> _isRunning{false}; +}; + +} // namespace transport +} // namespace mongo diff --git a/src/mongo/transport/service_state_machine.cpp b/src/mongo/transport/service_state_machine.cpp index d755a5f4c61..624b22dd1d8 100644 --- a/src/mongo/transport/service_state_machine.cpp +++ b/src/mongo/transport/service_state_machine.cpp @@ -51,7 +51,6 @@ #include "mongo/util/net/socket_exception.h" #include "mongo/util/net/thread_idle_callback.h" #include "mongo/util/quick_exit.h" -#include "mongo/util/scopeguard.h" namespace mongo { namespace { @@ -91,12 +90,98 @@ bool setExhaustMessage(Message* m, const DbResponse& dbresponse) { } // namespace using transport::TransportLayer; + +/* + * This class wraps up the logic for swapping/unswapping the Client during runNext(). + */ +class ServiceStateMachine::ThreadGuard { + ThreadGuard(ThreadGuard&) = delete; + ThreadGuard& operator=(ThreadGuard&) = delete; + +public: + explicit ThreadGuard(ServiceStateMachine* ssm) + : _ssm{ssm}, + _haveTakenOwnership{!_ssm->_isOwned.test_and_set()}, + _oldThreadName{getThreadName().toString()} { + const auto currentOwningThread = _ssm->_currentOwningThread.load(); + const auto currentThreadId = stdx::this_thread::get_id(); + + // If this is true, then we are the "owner" of the Client and we should swap the + // client/thread name before doing any work. + if (_haveTakenOwnership) { + _ssm->_currentOwningThread.store(currentThreadId); + + // Set up the thread name + setThreadName(_ssm->_threadName); + + // These are sanity checks to make sure that the Client is what we expect it to be + invariant(!haveClient()); + invariant(_ssm->_dbClient.get() == _ssm->_dbClientPtr); + + // Swap the current Client so calls to cc() work as expected + Client::setCurrent(std::move(_ssm->_dbClient)); + } else if (currentOwningThread != currentThreadId) { + // If the currentOwningThread does not equal the currentThreadId, then another thread + // currently "owns" the Client and we should reschedule ourself. + _okayToRunNext = false; + } + } + + ~ThreadGuard() { + // If we are not the owner of the SSM, then do nothing. Something higher up the call stack + // will have to clean up. + if (!_haveTakenOwnership) + return; + + // If the session has ended, then assume that it's unsafe to do anything but call the + // cleanup hook. + if (_ssm->state() == State::Ended) { + // The cleanup hook may change as soon as we unlock the mutex, so move it out of the + // ssm before unlocking the lock. + auto cleanupHook = std::move(_ssm->_cleanupHook); + if (cleanupHook) + cleanupHook(); + + return; + } + + // Otherwise swap thread locals and thread names back into the SSM so its ready for the + // next run. + if (haveClient()) { + _ssm->_dbClient = Client::releaseCurrent(); + } + setThreadName(_oldThreadName); + _ssm->_isOwned.clear(); + } + + // This bool operator reflects whether the ThreadGuard was able to take ownership of the thread + // either higher up the call chain, or in this call. If this returns false, then it is not safe + // to assume the thread has been setup correctly, or that any mutable state of the SSM is safe + // to access except for the current _state value. + explicit operator bool() const { + return _okayToRunNext; + } + +private: + ServiceStateMachine* _ssm; + bool _haveTakenOwnership; + const std::string _oldThreadName; + bool _okayToRunNext = true; +}; + +std::shared_ptr<ServiceStateMachine> ServiceStateMachine::create(ServiceContext* svcContext, + transport::SessionHandle session, + bool sync) { + return std::make_shared<ServiceStateMachine>(svcContext, std::move(session), sync); +} + ServiceStateMachine::ServiceStateMachine(ServiceContext* svcContext, transport::SessionHandle session, bool sync) - : _state{State::Source}, + : _state{State::Created}, _sep{svcContext->getServiceEntryPoint()}, _sync(sync), + _serviceContext(svcContext), _dbClient{svcContext->makeClient("conn", std::move(session))}, _dbClientPtr{_dbClient.get()}, _threadName{str::stream() << "conn" << _dbClient->session()->id()}, @@ -108,55 +193,74 @@ const transport::SessionHandle& ServiceStateMachine::session() const { } void ServiceStateMachine::sourceCallback(Status status) { + // The first thing to do is create a ThreadGuard which will take ownership of the SSM in this + // thread. + ThreadGuard guard(this); + // If the guard wasn't able to take ownership of the thread, then reschedule this call to + // runNext() so that this thread can do other useful work with its timeslice instead of going + // to sleep while waiting for the SSM to be released. + if (!guard) { + return scheduleFunc([this, status] { sourceCallback(status); }); + } + // Make sure we just called sourceMessage(); - invariant(_state == State::SourceWait); + invariant(state() == State::SourceWait); auto remote = session()->remote(); if (status.isOK()) { - _state = State::Process; + _state.store(State::Process); } else if (ErrorCodes::isInterruption(status.code()) || ErrorCodes::isNetworkError(status.code())) { LOG(2) << "Session from " << remote << " encountered a network error during SourceMessage"; - _state = State::EndSession; + _state.store(State::EndSession); } else if (status == TransportLayer::TicketSessionClosedStatus) { // Our session may have been closed internally. LOG(2) << "Session from " << remote << " was closed internally during SourceMessage"; - _state = State::EndSession; + _state.store(State::EndSession); } else { log() << "Error receiving request from client: " << status << ". Ending connection from " << remote << " (connection id: " << session()->id() << ")"; - _state = State::EndSession; + _state.store(State::EndSession); } - // In asyncronous mode this is the entrypoint back into the database from the network layer - // after a message has been received, so we want to call runNext() to process the message. - // - // In synchronous mode, runNext() will fall through to call processMessage() so we avoid - // the recursive call. - if (!_sync) - return runNext(); + runNextInGuard(guard); } void ServiceStateMachine::sinkCallback(Status status) { - invariant(_state == State::SinkWait); + // The first thing to do is create a ThreadGuard which will take ownership of the SSM in this + // thread. + ThreadGuard guard(this); + // If the guard wasn't able to take ownership of the thread, then reschedule this call to + // runNext() so that this thread can do other useful work with its timeslice instead of going + // to sleep while waiting for the SSM to be released. + if (!guard) { + return scheduleFunc([this, status] { sinkCallback(status); }); + } + + invariant(state() == State::SinkWait); if (!status.isOK()) { log() << "Error sending response to client: " << status << ". Ending connection from " << session()->remote() << " (connection id: " << session()->id() << ")"; - _state = State::EndSession; + _state.store(State::EndSession); } else if (inExhaust) { - _state = State::Process; + _state.store(State::Process); } else { - _state = State::Source; + _state.store(State::Source); } - return scheduleNext(); + // If the session ended, then runNext to clean it up + if (state() == State::EndSession) { + runNextInGuard(guard); + } else { // Otherwise scheduleNext to unwind the stack and run the next step later + scheduleNext(); + } } void ServiceStateMachine::processMessage() { // This may have been called just after a failure to source a message, in which case this // should return early so the session can be cleaned up. - if (_state != State::Process) { + if (state() != State::Process) { return; } invariant(!_inMessage.empty()); @@ -174,7 +278,7 @@ void ServiceStateMachine::processMessage() { networkCounter.hitLogicalIn(_inMessage.size()); - // 2. Pass sourced Message to handler to generate response. + // Pass sourced Message to handler to generate response. auto opCtx = cc().makeOperationContext(); // The handleRequest is implemented in a subclass for mongod/mongos and actually all the @@ -185,7 +289,7 @@ void ServiceStateMachine::processMessage() { // up in currentOp results after the response reaches the client opCtx.reset(); - // 3. Format our response, if we have one + // Format our response, if we have one Message& toSink = dbresponse.response; if (!toSink.empty()) { toSink.header().setId(nextMessageId()); @@ -207,9 +311,10 @@ void ServiceStateMachine::processMessage() { toSink = swm.getValue(); } - // 4. Sink our response to the client + // Sink our response to the client auto ticket = session()->sinkMessage(toSink); - _state = State::SinkWait; + + _state.store(State::SinkWait); if (_sync) { sinkCallback(session()->getTransportLayer()->wait(std::move(ticket))); } else { @@ -217,88 +322,44 @@ void ServiceStateMachine::processMessage() { std::move(ticket), [this](Status status) { sinkCallback(status); }); } } else { - _state = State::Source; + _state.store(State::Source); _inMessage.reset(); return scheduleNext(); } } -/* - * This class wraps up the logic for swapping/unswapping the Client during runNext(). - */ -class ServiceStateMachine::ThreadGuard { - ThreadGuard(ThreadGuard&) = delete; - ThreadGuard& operator=(ThreadGuard&) = delete; - -public: - explicit ThreadGuard(ServiceStateMachine* ssm) - : _ssm{ssm}, - _haveTakenOwnership{!_ssm->_isOwned.test_and_set()}, - _oldThreadName{getThreadName().toString()} { - const auto currentOwningThread = _ssm->_currentOwningThread.load(); - const auto currentThreadId = stdx::this_thread::get_id(); - - // If this is true, then we are the "owner" of the Client and we should swap the - // client/thread name before doing any work. - if (_haveTakenOwnership) { - _ssm->_currentOwningThread.store(currentThreadId); - - // Set up the thread name - setThreadName(_ssm->_threadName); - - // These are sanity checks to make sure that the Client is what we expect it to be - invariant(!haveClient()); - invariant(_ssm->_dbClient.get() == _ssm->_dbClientPtr); - - // Swap the current Client so calls to cc() work as expected - Client::setCurrent(std::move(_ssm->_dbClient)); - } else if (currentOwningThread != currentThreadId) { - // If the currentOwningThread does not equal the currentThreadId, then another thread - // currently "owns" the Client and we should reschedule ourself. - _okayToRunNext = false; - } - } - - ~ThreadGuard() { - if (!_haveTakenOwnership) - return; - - if (haveClient()) { - _ssm->_dbClient = Client::releaseCurrent(); - } - setThreadName(_oldThreadName); - _ssm->_isOwned.clear(); +void ServiceStateMachine::runNext() { + // The first thing to do is create a ThreadGuard which will take ownership of the SSM in this + // thread. + ThreadGuard guard(this); + // If the guard wasn't able to take ownership of the thread, then reschedule this call to + // runNext() so that this thread can do other useful work with its timeslice instead of going + // to sleep while waiting for the SSM to be released. + if (!guard) { + return scheduleNext(); } + return runNextInGuard(guard); +} - void dismiss() { - _haveTakenOwnership = false; - } +void ServiceStateMachine::runNextInGuard(ThreadGuard& guard) { + auto curState = state(); + invariant(curState != State::Ended); - explicit operator bool() const { - return _okayToRunNext; + // If this is the first run of the SSM, then update its state to Source + if (curState == State::Created) { + curState = State::Source; + _state.store(curState); } -private: - ServiceStateMachine* _ssm; - bool _haveTakenOwnership; - const std::string _oldThreadName; - bool _okayToRunNext = true; -}; - -void ServiceStateMachine::runNext() { - ThreadGuard guard(this); - if (!guard) - return scheduleNext(); - // Make sure the current Client got set correctly invariant(Client::getCurrent() == _dbClientPtr); try { - switch (_state) { + switch (curState) { case State::Source: { invariant(_inMessage.empty()); auto ticket = session()->sourceMessage(&_inMessage); - _state = State::SourceWait; + _state.store(State::SourceWait); if (_sync) { MONGO_IDLE_THREAD_BLOCK; sourceCallback(session()->getTransportLayer()->wait(std::move(ticket))); @@ -319,14 +380,14 @@ void ServiceStateMachine::runNext() { MONGO_UNREACHABLE; } - if (_state == State::EndSession) { - guard.dismiss(); - endSession(); - } - if ((_counter++ & 0xf) == 0) { markThreadIdle(); - }; + } + + if (state() == State::EndSession) { + cleanupSession(); + } + return; } catch (const AssertionException& e) { log() << "AssertionException handling request, closing client connection: " << e; @@ -340,16 +401,33 @@ void ServiceStateMachine::runNext() { quickExit(EXIT_UNCAUGHT); } - _state = State::EndSession; - guard.dismiss(); - endSession(); + _state.store(State::EndSession); + cleanupSession(); } -// TODO: Right now this is a noop because we only run in synchronous mode. When an async -// TransportLayer is written, this will call the serviceexecutor to schedule calls to runNext(). -void ServiceStateMachine::scheduleNext() {} +void ServiceStateMachine::scheduleNext() { + maybeScheduleFunc(_serviceContext->getServiceExecutor(), [this] { runNext(); }); +} + +void ServiceStateMachine::terminate() { + if (state() == State::Ended) + return; + auto tl = session()->getTransportLayer(); + tl->end(session()); +} + +void ServiceStateMachine::setCleanupHook(stdx::function<void()> hook) { + invariant(state() == State::Created); + _cleanupHook = std::move(hook); +} + +ServiceStateMachine::State ServiceStateMachine::state() { + return _state.load(); +} + +void ServiceStateMachine::cleanupSession() { + _state.store(State::Ended); -void ServiceStateMachine::endSession() { auto tl = session()->getTransportLayer(); _inMessage.reset(); @@ -362,32 +440,6 @@ void ServiceStateMachine::endSession() { const char* word = (conns == 1 ? " connection" : " connections"); log() << "end connection " << remote << " (" << conns << word << " now open)"; } - - _state = State::Ended; -} - -std::ostream& operator<<(std::ostream& stream, const ServiceStateMachine::State& state) { - switch (state) { - case ServiceStateMachine::State::Source: - stream << "source"; - break; - case ServiceStateMachine::State::SourceWait: - stream << "sourceWait"; - break; - case ServiceStateMachine::State::Process: - stream << "process"; - break; - case ServiceStateMachine::State::SinkWait: - stream << "sinkWait"; - break; - case ServiceStateMachine::State::EndSession: - stream << "endSession"; - break; - case ServiceStateMachine::State::Ended: - stream << "ended"; - break; - } - return stream; } } // namespace mongo diff --git a/src/mongo/transport/service_state_machine.h b/src/mongo/transport/service_state_machine.h index d1750a6eb98..e3212ad21b0 100644 --- a/src/mongo/transport/service_state_machine.h +++ b/src/mongo/transport/service_state_machine.h @@ -33,6 +33,8 @@ #include "mongo/base/status.h" #include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" +#include "mongo/stdx/functional.h" +#include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/transport/session.h" @@ -40,25 +42,29 @@ namespace mongo { class ServiceEntryPoint; -namespace transport { -class ServiceExecutorBase; -} // namespace transport - /* * The ServiceStateMachine holds the state of a single client connection and represents the * lifecycle of each user request as a state machine. It is the glue between the stateless * ServiceEntryPoint and TransportLayer that ties network and database logic together for a * user. */ -class ServiceStateMachine { +class ServiceStateMachine : public std::enable_shared_from_this<ServiceStateMachine> { ServiceStateMachine(ServiceStateMachine&) = delete; ServiceStateMachine& operator=(ServiceStateMachine&) = delete; public: - ServiceStateMachine() = default; ServiceStateMachine(ServiceStateMachine&&) = default; ServiceStateMachine& operator=(ServiceStateMachine&&) = default; + /* + * Creates a new ServiceStateMachine for a given session/service context. If sync is true, + * then calls into the transport layer will block while they complete, otherwise they will + * be handled asynchronously. + */ + static std::shared_ptr<ServiceStateMachine> create(ServiceContext* svcContext, + transport::SessionHandle session, + bool sync); + ServiceStateMachine(ServiceContext* svcContext, transport::SessionHandle session, bool sync); /* @@ -69,6 +75,7 @@ public: * Source -> SourceWait -> Process -> Source (fire-and-forget) */ enum class State { + Created, // The session has been created, but no operations have been performed yet Source, // Request a new Message from the network to handle SourceWait, // Wait for the new Message to arrive from the network Process, // Run the Message through the database @@ -85,6 +92,9 @@ public: * Each state function (processMessage(), sinkCallback(), etc) should always unwind the stack * if they have just completed a database operation to make sure that this doesn't infinitely * recurse. + * + * runNext() will attempt to create a ThreadGuard when it first runs. If it's unable to take + * ownership of the SSM, it will call scheduleNext() and return immediately. */ void runNext(); @@ -100,17 +110,60 @@ public: /* * Gets the current state of connection for testing/diagnostic purposes. */ - State state() const { - return _state; - } + State state(); + + /* + * Terminates the associated transport Session, and requests that the next call to runNext + * should end the session. If the session has already ended, this does nothing. + */ + void terminate(); /* - * Explicitly ends the session. + * Sets a function to be called after the session is ended */ - void endSession(); + void setCleanupHook(stdx::function<void()> hook); + + /* + * Gets the transport::Session associated with this connection + */ + const transport::SessionHandle& session() const; private: /* + * A class that wraps up lifetime management of the _dbClient and _threadName for runNext(); + */ + class ThreadGuard; + friend class ThreadGuard; + + /* + * This and scheduleFunc() are helper functions to schedule tasks on the serviceExecutor + * while maintaining a shared_ptr copy to anchor the lifetime of the SSM while waiting for + * callbacks to run. + */ + template <typename Executor, typename Func> + void maybeScheduleFunc(Executor* svcExec, Func&& func) { + if (svcExec) { + uassertStatusOK(svcExec->schedule( + [ func = std::move(func), anchor = shared_from_this() ] { func(); })); + } + } + + template <typename Func> + void scheduleFunc(Func&& func) { + auto svcExec = _serviceContext->getServiceExecutor(); + invariant(svcExec); + maybeScheduleFunc(svcExec, func); + } + + /* + * This is the actual implementation of runNext() that gets called after the ThreadGuard + * has been successfully created. If any callbacks (like sourceCallback()) need to call + * runNext() and already own a ThreadGuard, they should call this with that guard as the + * argument. + */ + void runNextInGuard(ThreadGuard& guard); + + /* * This function actually calls into the database and processes a request. It's broken out * into its own inline function for better readability. */ @@ -123,21 +176,20 @@ private: void sinkCallback(Status status); /* - * A class that wraps up lifetime management of the _dbClient and _threadName for runNext(); + * Releases all the resources associated with the session and call the cleanupHook. */ - class ThreadGuard; - friend class ThreadGuard; - - const transport::SessionHandle& session() const; + void cleanupSession(); - State _state{State::Source}; + AtomicWord<State> _state{State::Created}; ServiceEntryPoint* _sep; bool _sync; + ServiceContext* const _serviceContext; ServiceContext::UniqueClient _dbClient; const Client* _dbClientPtr; const std::string _threadName; + stdx::function<void()> _cleanupHook; bool inExhaust = false; bool wasCompressed = false; @@ -148,6 +200,32 @@ private: std::atomic_flag _isOwned = ATOMIC_FLAG_INIT; // NOLINT }; -std::ostream& operator<<(std::ostream& stream, const ServiceStateMachine::State& state); +template <typename T> +T& operator<<(T& stream, const ServiceStateMachine::State& state) { + switch (state) { + case ServiceStateMachine::State::Created: + stream << "created"; + break; + case ServiceStateMachine::State::Source: + stream << "source"; + break; + case ServiceStateMachine::State::SourceWait: + stream << "sourceWait"; + break; + case ServiceStateMachine::State::Process: + stream << "process"; + break; + case ServiceStateMachine::State::SinkWait: + stream << "sinkWait"; + break; + case ServiceStateMachine::State::EndSession: + stream << "endSession"; + break; + case ServiceStateMachine::State::Ended: + stream << "ended"; + break; + } + return stream; +} } // namespace mongo diff --git a/src/mongo/transport/service_state_machine_test.cpp b/src/mongo/transport/service_state_machine_test.cpp index 1153b554a27..fd65cb0cf6a 100644 --- a/src/mongo/transport/service_state_machine_test.cpp +++ b/src/mongo/transport/service_state_machine_test.cpp @@ -74,6 +74,8 @@ public: return DbResponse{builder.finish()}; } + void endAllSessions(transport::Session::TagMask tags) override {} + void setUassertInHandler() { _uassertInHandler = true; } @@ -213,8 +215,7 @@ protected: sc->setTransportLayer(std::move(tl)); _tl->start().transitional_ignore(); - _ssm = stdx::make_unique<ServiceStateMachine>( - getGlobalServiceContext(), _tl->createSession(), true); + _ssm = ServiceStateMachine::create(getGlobalServiceContext(), _tl->createSession(), true); _tl->setSSM(_ssm.get()); } @@ -228,7 +229,7 @@ protected: MockTL* _tl; MockSEP* _sep; SessionHandle _session; - std::unique_ptr<ServiceStateMachine> _ssm; + std::shared_ptr<ServiceStateMachine> _ssm; bool _ranHandler; }; @@ -236,7 +237,7 @@ ServiceStateMachine::State ServiceStateMachineFixture::runPingTest() { _tl->setNextMessage(buildRequest(BSON("ping" << 1))); ASSERT_FALSE(haveClient()); - ASSERT_EQ(_ssm->state(), ServiceStateMachine::State::Source); + ASSERT_EQ(_ssm->state(), ServiceStateMachine::State::Created); log() << "run next"; _ssm->runNext(); auto ret = _ssm->state(); diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h index 9586118093c..df0eb67b9eb 100644 --- a/src/mongo/transport/session_asio.h +++ b/src/mongo/transport/session_asio.h @@ -37,8 +37,6 @@ #include "mongo/util/net/ssl_types.h" #endif -#include "boost/optional.hpp" - #include "asio.hpp" #ifdef MONGO_CONFIG_SSL #include "asio/ssl.hpp" @@ -57,8 +55,13 @@ public: : _socket(std::move(socket)), _tl(tl) {} virtual ~ASIOSession() { - if (_sessionsListIterator) { - _tl->eraseSession(*_sessionsListIterator); + if (_didPostAcceptSetup) { + // This is incremented in TransportLayerASIO::_acceptConnection if there are less than + // maxConns connections already established. A call to postAcceptSetup means that the + // session is valid and will be handed off to the ServiceEntryPoint. + // + // We decrement this here to keep the counters in the TL accurate. + _tl->_currentConnections.subtractAndFetch(1); } } @@ -85,6 +88,7 @@ public: void shutdown() { std::error_code ec; + getSocket().cancel(); getSocket().shutdown(GenericSocket::shutdown_both, ec); if (ec) { error() << "Error closing socket: " << ec.message(); @@ -99,7 +103,7 @@ public: #endif } - void postAcceptSetup(bool async, TransportLayerASIO::SessionsListIterator listIt) { + void postAcceptSetup(bool async) { std::error_code ec; _socket.non_blocking(async, ec); fassert(40490, ec.value() == 0); @@ -143,7 +147,8 @@ public: if (ec) { LOG(3) << "Unable to get remote endpoint address: " << ec.message(); } - _sessionsListIterator.emplace(std::move(listIt)); + + _didPostAcceptSetup = true; } template <typename MutableBufferSequence, typename CompleteHandler> @@ -153,7 +158,7 @@ public: return opportunisticRead(sync, *_sslSocket, buffers, std::move(handler)); } else if (!_ranHandshake) { invariant(asio::buffer_size(buffers) >= sizeof(MSGHEADER::Value)); - auto postHandshakeCb = [this, sync, &buffers, handler](Status status, bool needsRead) { + auto postHandshakeCb = [this, sync, buffers, handler](Status status, bool needsRead) { if (status.isOK()) { if (needsRead) { read(sync, buffers, handler); @@ -167,7 +172,7 @@ public: }; auto handshakeRecvCb = - [ this, postHandshakeCb = std::move(postHandshakeCb), sync, &buffers, handler ]( + [ this, postHandshakeCb = std::move(postHandshakeCb), sync, buffers, handler ]( const std::error_code& ec, size_t size) { _ranHandshake = true; if (ec) { @@ -210,7 +215,14 @@ private: std::error_code ec; auto size = asio::read(stream, buffers, ec); if ((ec == asio::error::would_block || ec == asio::error::try_again) && !sync) { - asio::async_read(stream, buffers, std::move(handler)); + // asio::read is a loop internally, so some of buffers may have been read into already. + // So we need to adjust the buffers passed into async_read to be offset by size, if + // size is > 0. + MutableBufferSequence asyncBuffers(buffers); + if (size > 0) { + asyncBuffers += size; + } + asio::async_read(stream, asyncBuffers, std::move(handler)); } else { handler(ec, size); } @@ -224,7 +236,14 @@ private: std::error_code ec; auto size = asio::write(stream, buffers, ec); if ((ec == asio::error::would_block || ec == asio::error::try_again) && !sync) { - asio::async_write(stream, buffers, std::move(handler)); + // asio::write is a loop internally, so some of buffers may have been read into already. + // So we need to adjust the buffers passed into async_write to be offset by size, if + // size is > 0. + ConstBufferSequence asyncBuffers(buffers); + if (size > 0) { + asyncBuffers += size; + } + asio::async_write(stream, asyncBuffers, std::move(handler)); } else { handler(ec, size); } @@ -318,7 +337,7 @@ private: #endif TransportLayerASIO* const _tl; - boost::optional<TransportLayerASIO::SessionsListIterator> _sessionsListIterator; + bool _didPostAcceptSetup = false; }; } // namespace transport diff --git a/src/mongo/transport/transport_layer.h b/src/mongo/transport/transport_layer.h index 186452c0098..47b0e379992 100644 --- a/src/mongo/transport/transport_layer.h +++ b/src/mongo/transport/transport_layer.h @@ -166,16 +166,6 @@ public: virtual void end(const SessionHandle& session) = 0; /** - * End all active sessions in the TransportLayer. Tickets that have already been started via - * wait() or asyncWait() will complete, but may return a failed Status. This method is - * asynchronous and will return after all sessions have been notified to end. - * - * If a non-empty TagMask is provided, endAllSessions() will skip over sessions with matching - * tags and leave them open. - */ - virtual void endAllSessions(Session::TagMask tags) = 0; - - /** * Start the TransportLayer. After this point, the TransportLayer will begin accepting active * sessions from new transport::Endpoints. */ diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index 25a3e9c7832..9d0b894071d 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -81,7 +81,7 @@ std::shared_ptr<TransportLayerASIO::ASIOSession> TransportLayerASIO::createSessi TransportLayerASIO::TransportLayerASIO(const TransportLayerASIO::Options& opts, ServiceEntryPoint* sep) - : _ioContext(stdx::make_unique<asio::io_context>()), + : _ioContext(std::make_shared<asio::io_context>()), #ifdef MONGO_CONFIG_SSL _sslContext(nullptr), #endif @@ -129,10 +129,7 @@ void TransportLayerASIO::asyncWait(Ticket&& ticket, TicketCallback callback) { TransportLayer::Stats TransportLayerASIO::sessionStats() { TransportLayer::Stats ret; - auto sessionCount = [this] { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _sessions.size(); - }(); + auto sessionCount = _currentConnections.load(); ret.numOpenSessions = sessionCount; ret.numCreatedSessions = _createdConnections.load(); ret.numAvailableSessions = static_cast<size_t>(_listenerOptions.maxConns) - sessionCount; @@ -145,39 +142,6 @@ void TransportLayerASIO::end(const SessionHandle& session) { asioSession->shutdown(); } -void TransportLayerASIO::endAllSessions(Session::TagMask tags) { - log() << "ASIO transport layer closing all connections"; - std::vector<ASIOSessionHandle> sessions; - // This is more complicated than it seems. We need to lock() all the weak_ptrs in _sessions - // and then end them if their tags don't match the tags passed into the function. - // - // When you lock the session, the lifetime of the session is extended by creating a new - // shared_ptr, but the session could end before this lock is released, which means that we - // must extend the lifetime of the session past the scope of the lock_guard, or else the - // session's destructor will acquire a lock already held here and deadlock/crash. - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - sessions.reserve(_sessions.size()); - for (auto&& weakSession : _sessions) { - if (auto session = weakSession.lock()) { - sessions.emplace_back(std::move(session)); - } - } - } - - // Outside the lock we kill any sessions that don't match our tags. - for (auto&& session : sessions) { - if (session->getTags() & tags) { - log() << "Skip closing connection for connection # " << session->id(); - } else { - end(session); - } - } - - // Any other sessions that may have ended while this was running will get cleaned up by - // sessions being destructed at the end of this function. -} - Status TransportLayerASIO::setup() { std::vector<std::string> listenAddrs; if (_listenerOptions.ipList.empty()) { @@ -293,18 +257,29 @@ Status TransportLayerASIO::start() { void TransportLayerASIO::shutdown() { stdx::lock_guard<stdx::mutex> lk(_mutex); _running.store(false); - _ioContext->stop(); + // Loop through the acceptors and cancel their calls to async_accept. This will prevent new + // connections from being opened. + for (auto& acceptor : _acceptors) { + acceptor.cancel(); + } + + // If the listener thread is joinable (that is, we created/started a listener thread), then + // the io_context is owned exclusively by the TransportLayer and we should stop it and join + // the listener thread. + // + // Otherwise the ServiceExecutor may need to continue running the io_context to drain running + // connections, so we just cancel the acceptors and return. if (_listenerThread.joinable()) { + // We should only have started a listener if the TransportLayer is in sync mode. + dassert(!_listenerOptions.async); + _ioContext->stop(); _listenerThread.join(); } } -void TransportLayerASIO::eraseSession(TransportLayerASIO::SessionsListIterator it) { - if (it != _sessions.end()) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _sessions.erase(it); - } +const std::shared_ptr<asio::io_context>& TransportLayerASIO::getIOContext() { + return _ioContext; } void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) { @@ -316,6 +291,9 @@ void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) { auto& socket = session->getSocket(); auto acceptCb = [ this, session = std::move(session), &acceptor ](std::error_code ec) mutable { + if (!_running.load()) + return; + if (ec) { log() << "Error accepting new connection on " << endpointToHostAndPort(acceptor.local_endpoint()) << ": " << ec.message(); @@ -323,20 +301,15 @@ void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) { return; } - size_t connCount = 0; - SessionsListIterator listIt; - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_sessions.size() + 1 > _listenerOptions.maxConns) { - log() << "connection refused because too many open connections: " - << _sessions.size(); - _acceptConnection(acceptor); - return; - } - listIt = _sessions.emplace(_sessions.end(), session); + size_t connCount = _currentConnections.addAndFetch(1); + if (connCount > _listenerOptions.maxConns) { + log() << "connection refused because too many open connections: " << connCount; + _currentConnections.subtractAndFetch(1); + _acceptConnection(acceptor); + return; } - session->postAcceptSetup(_listenerOptions.async, listIt); + session->postAcceptSetup(_listenerOptions.async); _createdConnections.addAndFetch(1); if (!serverGlobalParams.quiet.load()) { diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h index ede44211c22..a9182ea9b8b 100644 --- a/src/mongo/transport/transport_layer_asio.h +++ b/src/mongo/transport/transport_layer_asio.h @@ -34,7 +34,6 @@ #include "mongo/config.h" #include "mongo/db/server_options.h" #include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/list.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" @@ -107,37 +106,33 @@ public: void end(const SessionHandle& session) final; - void endAllSessions(transport::Session::TagMask tags) final; - Status setup() final; Status start() final; void shutdown() final; + const std::shared_ptr<asio::io_context>& getIOContext(); + private: class ASIOSession; class ASIOTicket; class ASIOSourceTicket; class ASIOSinkTicket; - class SessionListGuard; using ASIOSessionHandle = std::shared_ptr<ASIOSession>; using ConstASIOSessionHandle = std::shared_ptr<const ASIOSession>; using GenericAcceptor = asio::basic_socket_acceptor<asio::generic::stream_protocol>; - using SessionsListIterator = stdx::list<std::weak_ptr<ASIOSession>>::iterator; ASIOSessionHandle createSession(); - void eraseSession(SessionsListIterator it); void _acceptConnection(GenericAcceptor& acceptor); stdx::mutex _mutex; std::vector<GenericAcceptor> _acceptors; - stdx::list<std::weak_ptr<ASIOSession>> _sessions; // Only used if _listenerOptions.async is false. stdx::thread _listenerThread; - std::unique_ptr<asio::io_context> _ioContext; + std::shared_ptr<asio::io_context> _ioContext; #ifdef MONGO_CONFIG_SSL std::unique_ptr<asio::ssl::context> _sslContext; SSLParams::SSLModes _sslMode; @@ -148,6 +143,7 @@ private: Options _listenerOptions; AtomicWord<size_t> _createdConnections{0}; + AtomicWord<size_t> _currentConnections{0}; }; } // namespace transport diff --git a/src/mongo/transport/transport_layer_legacy.cpp b/src/mongo/transport/transport_layer_legacy.cpp index 94db97ee8ce..920baa5e5c4 100644 --- a/src/mongo/transport/transport_layer_legacy.cpp +++ b/src/mongo/transport/transport_layer_legacy.cpp @@ -50,14 +50,6 @@ namespace mongo { namespace transport { -namespace { -struct lock_weak { - template <typename T> - std::shared_ptr<T> operator()(const std::weak_ptr<T>& p) const { - return p.lock(); - } -}; -} // namespace TransportLayerLegacy::Options::Options(const ServerGlobalParams* params) : port(params->port), ipList(params->bind_ip) {} @@ -163,11 +155,7 @@ Ticket TransportLayerLegacy::sourceMessage(const SessionHandle& session, TransportLayer::Stats TransportLayerLegacy::sessionStats() { Stats stats; - { - stdx::lock_guard<stdx::mutex> lk(_sessionsMutex); - stats.numOpenSessions = _sessions.size(); - } - + stats.numOpenSessions = _currentConnections.load(); stats.numAvailableSessions = Listener::globalTicketHolder.available(); stats.numCreatedSessions = Listener::globalConnectionNumber.load(); @@ -222,48 +210,10 @@ void TransportLayerLegacy::_closeConnection(Connection* conn) { Listener::globalTicketHolder.release(); } -// Capture all of the weak pointers behind the lock, to delay their expiry until we leave the -// locking context. This function requires proof of locking, by passing the lock guard. -auto TransportLayerLegacy::lockAllSessions(const stdx::unique_lock<stdx::mutex>&) const - -> std::vector<LegacySessionHandle> { - using std::begin; - using std::end; - std::vector<std::shared_ptr<LegacySession>> result; - std::transform(begin(_sessions), end(_sessions), std::back_inserter(result), lock_weak()); - // Skip expired weak pointers. - result.erase(std::remove(begin(result), end(result), nullptr), end(result)); - return result; -} - -void TransportLayerLegacy::endAllSessions(Session::TagMask tags) { - log() << "legacy transport layer closing all connections"; - { - stdx::unique_lock<stdx::mutex> lk(_sessionsMutex); - // We want to capture the shared_ptrs to our sessions in a way which lets us destroy them - // outside of the lock. - const auto sessions = lockAllSessions(lk); - - for (auto&& session : sessions) { - if (session->getTags() & tags) { - log() << "Skip closing connection for connection # " - << session->conn()->connectionId; - } else { - _closeConnection(session->conn()); - } - } - // TODO(SERVER-27069): Revamp this lock to not cover the loop. This unlock was put here - // specifically to minimize risk, just before the release of 3.4. The risk is that we would - // be in the loop without the lock, which most of our testing didn't do. We must unlock - // manually here, because the `sessions` vector must be destroyed *outside* of the lock. - lk.unlock(); - } -} - void TransportLayerLegacy::shutdown() { _running.store(false); _listener->shutdown(); _listenerThread.join(); - endAllSessions(Session::kEmptyTagMask); } void TransportLayerLegacy::_destroy(LegacySession& session) { @@ -271,8 +221,7 @@ void TransportLayerLegacy::_destroy(LegacySession& session) { _closeConnection(session.conn()); } - stdx::lock_guard<stdx::mutex> lk(_sessionsMutex); - _sessions.erase(session.getIter()); + _currentConnections.subtractAndFetch(1); } Status TransportLayerLegacy::_runTicket(Ticket ticket) { @@ -328,17 +277,8 @@ void TransportLayerLegacy::_handleNewConnection(std::unique_ptr<AbstractMessagin amp->setLogLevel(logger::LogSeverity::Debug(1)); + _currentConnections.addAndFetch(1); auto session = LegacySession::create(std::move(amp), this); - stdx::list<std::weak_ptr<LegacySession>> list; - auto it = list.emplace(list.begin(), session); - - { - // Add the new session to our list - stdx::lock_guard<stdx::mutex> lk(_sessionsMutex); - session->setIter(it); - _sessions.splice(_sessions.begin(), list, it); - } - invariant(_sep); _sep->startSession(std::move(session)); } diff --git a/src/mongo/transport/transport_layer_legacy.h b/src/mongo/transport/transport_layer_legacy.h index 781e7783034..07a0d808066 100644 --- a/src/mongo/transport/transport_layer_legacy.h +++ b/src/mongo/transport/transport_layer_legacy.h @@ -30,6 +30,7 @@ #include <vector> +#include "mongo/platform/atomic_word.h" #include "mongo/stdx/list.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" @@ -85,7 +86,6 @@ public: Stats sessionStats() override; void end(const SessionHandle& session) override; - void endAllSessions(transport::Session::TagMask tags) override; void shutdown() override; @@ -93,7 +93,6 @@ private: class LegacySession; using LegacySessionHandle = std::shared_ptr<LegacySession>; using ConstLegacySessionHandle = std::shared_ptr<const LegacySession>; - using SessionEntry = std::list<std::weak_ptr<LegacySession>>::iterator; void _destroy(LegacySession& session); @@ -104,8 +103,6 @@ private: using NewConnectionCb = stdx::function<void(std::unique_ptr<AbstractMessagingPort>)>; using WorkHandle = stdx::function<Status(AbstractMessagingPort*)>; - std::vector<LegacySessionHandle> lockAllSessions(const stdx::unique_lock<stdx::mutex>&) const; - /** * Connection object, to associate Sessions with AbstractMessagingPorts. */ @@ -153,14 +150,6 @@ private: return _connection.get(); } - void setIter(SessionEntry it) { - _entry = std::move(it); - } - - SessionEntry getIter() const { - return _entry; - } - private: explicit LegacySession(std::unique_ptr<AbstractMessagingPort> amp, TransportLayerLegacy* tl); @@ -173,9 +162,6 @@ private: TagMask _tags; std::unique_ptr<Connection> _connection; - - // A handle to this session's entry in the TL's session list - SessionEntry _entry; }; /** @@ -238,11 +224,8 @@ private: std::unique_ptr<Listener> _listener; stdx::thread _listenerThread; - // TransportLayerLegacy holds non-owning pointers to all of its sessions. - mutable stdx::mutex _sessionsMutex; - stdx::list<std::weak_ptr<LegacySession>> _sessions; - AtomicWord<bool> _running; + AtomicWord<size_t> _currentConnections{0}; Options _options; }; diff --git a/src/mongo/transport/transport_layer_legacy_test.cpp b/src/mongo/transport/transport_layer_legacy_test.cpp index 103ad622d2b..39ec7236d3e 100644 --- a/src/mongo/transport/transport_layer_legacy_test.cpp +++ b/src/mongo/transport/transport_layer_legacy_test.cpp @@ -45,13 +45,25 @@ public: ASSERT_NOT_OK(s); tll->end(session); + + _sessions.emplace_back(std::move(session)); } DbResponse handleRequest(OperationContext* opCtx, const Message& request) override { MONGO_UNREACHABLE; } + // Sessions end as soon as they're started, so this doesn't need to do anything. + void endAllSessions(transport::Session::TagMask tags) override { + for (auto& session : _sessions) { + tll->end(session); + } + _sessions.clear(); + } + transport::TransportLayerLegacy* tll = nullptr; + + std::list<transport::SessionHandle> _sessions; }; // This test verifies a fix for SERVER-28239. The actual service entry point in use by mongod and @@ -61,7 +73,6 @@ TEST(TransportLayerLegacy, endSessionsDoesntDoubleClose) { // Disabling this test until we can figure out the best way to allocate port numbers for unit // tests return; - ServiceEntryPointUtil sepu; transport::TransportLayerLegacy::Options opts{}; @@ -89,7 +100,7 @@ TEST(TransportLayerLegacy, endSessionsDoesntDoubleClose) { while (Listener::globalTicketHolder.used() == 0) { } - tll.endAllSessions(transport::Session::TagMask{}); + sepu.endAllSessions(transport::Session::TagMask{}); while (Listener::globalTicketHolder.used() == 1) { } diff --git a/src/mongo/transport/transport_layer_manager.cpp b/src/mongo/transport/transport_layer_manager.cpp index cdfc86cb93d..e28115c1e78 100644 --- a/src/mongo/transport/transport_layer_manager.cpp +++ b/src/mongo/transport/transport_layer_manager.cpp @@ -34,6 +34,7 @@ #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" #include "mongo/stdx/memory.h" +#include "mongo/transport/service_executor_fixed.h" #include "mongo/transport/session.h" #include "mongo/transport/transport_layer_asio.h" #include "mongo/transport/transport_layer_legacy.h" @@ -101,10 +102,6 @@ void TransportLayerManager::end(const SessionHandle& session) { session->getTransportLayer()->end(session); } -void TransportLayerManager::endAllSessions(Session::TagMask tags) { - _foreach([&tags](TransportLayer* tl) { tl->endAllSessions(tags); }); -} - // TODO Right now this and setup() leave TLs started if there's an error. In practice the server // exits with an error and this isn't an issue, but we should make this more robust. Status TransportLayerManager::start() { @@ -151,7 +148,17 @@ std::unique_ptr<TransportLayer> TransportLayerManager::createWithConfig( auto sep = ctx->getServiceEntryPoint(); if (config->transportLayer == "asio") { transport::TransportLayerASIO::Options opts(config); - transportLayer = stdx::make_unique<transport::TransportLayerASIO>(opts, sep); + if (config->serviceExecutor != "synchronous") { + opts.async = true; + } + + auto transportLayerASIO = stdx::make_unique<transport::TransportLayerASIO>(opts, sep); + + if (config->serviceExecutor == "fixedForTesting") { + ctx->setServiceExecutor( + stdx::make_unique<ServiceExecutorFixed>(ctx, transportLayerASIO->getIOContext())); + } + transportLayer = std::move(transportLayerASIO); } else if (serverGlobalParams.transportLayer == "legacy") { transport::TransportLayerLegacy::Options opts(config); transportLayer = stdx::make_unique<transport::TransportLayerLegacy>(opts, sep); diff --git a/src/mongo/transport/transport_layer_manager.h b/src/mongo/transport/transport_layer_manager.h index 0fb762264ea..3c91e31719d 100644 --- a/src/mongo/transport/transport_layer_manager.h +++ b/src/mongo/transport/transport_layer_manager.h @@ -72,7 +72,6 @@ public: Stats sessionStats() override; void end(const SessionHandle& session) override; - void endAllSessions(Session::TagMask tags) override; Status start() override; void shutdown() override; diff --git a/src/mongo/transport/transport_layer_mock.cpp b/src/mongo/transport/transport_layer_mock.cpp index d1d7ec6e8fc..9b51fda6f5c 100644 --- a/src/mongo/transport/transport_layer_mock.cpp +++ b/src/mongo/transport/transport_layer_mock.cpp @@ -119,14 +119,6 @@ void TransportLayerMock::end(const SessionHandle& session) { _sessions[session->id()].ended = true; } -void TransportLayerMock::endAllSessions(Session::TagMask tags) { - auto it = _sessions.begin(); - while (it != _sessions.end()) { - end(it->second.session); - it++; - } -} - Status TransportLayerMock::setup() { return Status::OK(); } @@ -138,7 +130,6 @@ Status TransportLayerMock::start() { void TransportLayerMock::shutdown() { if (!inShutdown()) { _shutdown = true; - endAllSessions(Session::kEmptyTagMask); } } diff --git a/src/mongo/transport/transport_layer_mock.h b/src/mongo/transport/transport_layer_mock.h index d0eeee0e8ac..208a7d93c97 100644 --- a/src/mongo/transport/transport_layer_mock.h +++ b/src/mongo/transport/transport_layer_mock.h @@ -67,7 +67,6 @@ public: SessionHandle get(Session::Id id); bool owns(Session::Id id); void end(const SessionHandle& session) override; - void endAllSessions(Session::TagMask tags) override; Status setup() override; Status start() override; diff --git a/src/mongo/transport/transport_layer_mock_test.cpp b/src/mongo/transport/transport_layer_mock_test.cpp index 1cf90b395ff..2234095c806 100644 --- a/src/mongo/transport/transport_layer_mock_test.cpp +++ b/src/mongo/transport/transport_layer_mock_test.cpp @@ -249,13 +249,6 @@ void assertEnded(TransportLayer* tl, } } -// endAllSessions() ends all sessions -TEST_F(TransportLayerMockTest, EndAllSessions) { - std::vector<SessionHandle> sessions = createSessions(tl()); - tl()->endAllSessions(Session::kEmptyTagMask); - assertEnded(tl(), sessions); -} - // shutdown() ends all sessions and shuts down TEST_F(TransportLayerMockTest, Shutdown) { std::vector<SessionHandle> sessions = createSessions(tl()); |