diff options
author | Jonathan Reams <jbreams@mongodb.com> | 2017-09-07 13:21:26 -0400 |
---|---|---|
committer | Jonathan Reams <jbreams@mongodb.com> | 2017-09-08 15:53:05 -0400 |
commit | d4a9df74058d91f0adf977365589578dca1c1ff1 (patch) | |
tree | 8036f8f72028b5936d03fea79cce3ef8e28a6f7c /src/mongo/transport | |
parent | e80d7eeef4f2cee89a85997da4d157a49e5295c4 (diff) | |
download | mongo-d4a9df74058d91f0adf977365589578dca1c1ff1.tar.gz |
SERVER-29830 Run async_accept on separate io_context
Diffstat (limited to 'src/mongo/transport')
-rw-r--r-- | src/mongo/transport/session_asio.h | 51 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio.cpp | 56 | ||||
-rw-r--r-- | src/mongo/transport/transport_layer_asio.h | 22 |
3 files changed, 59 insertions, 70 deletions
diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h index 1a84f584419..41b9da16885 100644 --- a/src/mongo/transport/session_asio.h +++ b/src/mongo/transport/session_asio.h @@ -55,17 +55,28 @@ class TransportLayerASIO::ASIOSession : public Session { public: ASIOSession(TransportLayerASIO* tl, GenericSocket socket) - : _socket(std::move(socket)), _tl(tl) {} + : _socket(std::move(socket)), _tl(tl) { + std::error_code ec; - virtual ~ASIOSession() { - if (_didPostAcceptSetup) { - // This is incremented in TransportLayerASIO::_acceptConnection if there are less than - // maxConns connections already established. A call to postAcceptSetup means that the - // session is valid and will be handed off to the ServiceEntryPoint. - // - // We decrement this here to keep the counters in the TL accurate. - _tl->_currentConnections.subtractAndFetch(1); + _socket.non_blocking(_tl->_listenerOptions.async, ec); + fassert(40490, ec.value() == 0); + + auto family = endpointToSockAddr(_socket.local_endpoint()).getType(); + if (family == AF_INET || family == AF_INET6) { + _socket.set_option(asio::ip::tcp::no_delay(true)); + _socket.set_option(asio::socket_base::keep_alive(true)); + setSocketKeepAliveParams(_socket.native_handle()); } + + _local = endpointToHostAndPort(_socket.local_endpoint()); + _remote = endpointToHostAndPort(_socket.remote_endpoint(ec)); + if (ec) { + LOG(3) << "Unable to get remote endpoint address: " << ec.message(); + } + } + + virtual ~ASIOSession() { + _tl->_currentConnections.subtractAndFetch(1); } TransportLayer* getTransportLayer() const override { @@ -106,27 +117,6 @@ public: #endif } - void postAcceptSetup(bool async) { - std::error_code ec; - _socket.non_blocking(async, ec); - fassert(40490, ec.value() == 0); - - auto family = endpointToSockAddr(_socket.local_endpoint()).getType(); - if (family == AF_INET || family == AF_INET6) { - _socket.set_option(asio::ip::tcp::no_delay(true)); - _socket.set_option(asio::socket_base::keep_alive(true)); - setSocketKeepAliveParams(_socket.native_handle()); - } - - _local = endpointToHostAndPort(_socket.local_endpoint()); - _remote = endpointToHostAndPort(_socket.remote_endpoint(ec)); - if (ec) { - LOG(3) << "Unable to get remote endpoint address: " << ec.message(); - } - - _didPostAcceptSetup = true; - } - template <typename MutableBufferSequence, typename CompleteHandler> void read(bool sync, const MutableBufferSequence& buffers, CompleteHandler&& handler) { #ifdef MONGO_CONFIG_SSL @@ -315,7 +305,6 @@ private: #endif TransportLayerASIO* const _tl; - bool _didPostAcceptSetup = false; }; } // namespace transport diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp index 4cf0bafdd2c..5fbba53dcbd 100644 --- a/src/mongo/transport/transport_layer_asio.cpp +++ b/src/mongo/transport/transport_layer_asio.cpp @@ -73,15 +73,10 @@ TransportLayerASIO::Options::Options(const ServerGlobalParams* params) maxConns(params->maxConns) { } -std::shared_ptr<TransportLayerASIO::ASIOSession> TransportLayerASIO::createSession() { - GenericSocket socket(*_ioContext); - std::shared_ptr<ASIOSession> ret(new ASIOSession(this, std::move(socket))); - return ret; -} - TransportLayerASIO::TransportLayerASIO(const TransportLayerASIO::Options& opts, ServiceEntryPoint* sep) - : _ioContext(std::make_shared<asio::io_context>()), + : _workerIOContext(std::make_shared<asio::io_context>()), + _acceptorIOContext(stdx::make_unique<asio::io_context>()), #ifdef MONGO_CONFIG_SSL _sslContext(nullptr), #endif @@ -190,7 +185,7 @@ Status TransportLayerASIO::setup() { fassertFailedNoTrace(40488); } - GenericAcceptor acceptor(*_ioContext); + GenericAcceptor acceptor(*_acceptorIOContext); acceptor.open(endpoint.protocol()); acceptor.set_option(GenericAcceptor::reuse_address(true)); @@ -244,22 +239,18 @@ Status TransportLayerASIO::start() { stdx::lock_guard<stdx::mutex> lk(_mutex); _running.store(true); - // If we're in async mode then the ServiceExecutor will handle calling run_one() in a pool - // of threads. Otherwise we need a thread to just handle the async_accept calls. - if (!_listenerOptions.async) { - _listenerThread = stdx::thread([this] { - setThreadName("listener"); - while (_running.load()) { - try { - _ioContext->run(); - _ioContext->reset(); - } catch (...) { - severe() << "Uncaught exception in the listener: " << exceptionToStatus(); - fassertFailed(40491); - } + _listenerThread = stdx::thread([this] { + setThreadName("listener"); + while (_running.load()) { + asio::io_context::work work(*_acceptorIOContext); + try { + _acceptorIOContext->run(); + } catch (...) { + severe() << "Uncaught exception in the listener: " << exceptionToStatus(); + fassertFailed(40491); } - }); - } + } + }); for (auto& acceptor : _acceptors) { acceptor.listen(serverGlobalParams.listenBacklog); @@ -294,26 +285,17 @@ void TransportLayerASIO::shutdown() { // Otherwise the ServiceExecutor may need to continue running the io_context to drain running // connections, so we just cancel the acceptors and return. if (_listenerThread.joinable()) { - // We should only have started a listener if the TransportLayer is in sync mode. - dassert(!_listenerOptions.async); - _ioContext->stop(); + _acceptorIOContext->stop(); _listenerThread.join(); } } const std::shared_ptr<asio::io_context>& TransportLayerASIO::getIOContext() { - return _ioContext; + return _workerIOContext; } void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) { - auto session = createSession(); - if (!session) { - _acceptConnection(acceptor); - return; - } - - auto& socket = session->getSocket(); - auto acceptCb = [ this, session = std::move(session), &acceptor ](std::error_code ec) mutable { + auto acceptCb = [this, &acceptor](const std::error_code& ec, GenericSocket peerSocket) mutable { if (!_running.load()) return; @@ -332,7 +314,7 @@ void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) { return; } - session->postAcceptSetup(_listenerOptions.async); + std::shared_ptr<ASIOSession> session(new ASIOSession(this, std::move(peerSocket))); _createdConnections.addAndFetch(1); if (!serverGlobalParams.quiet.load()) { @@ -345,7 +327,7 @@ void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) { _acceptConnection(acceptor); }; - acceptor.async_accept(socket, std::move(acceptCb)); + acceptor.async_accept(*_workerIOContext, std::move(acceptCb)); } } // namespace transport diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h index 7b7469b365f..5df87057f0e 100644 --- a/src/mongo/transport/transport_layer_asio.h +++ b/src/mongo/transport/transport_layer_asio.h @@ -123,17 +123,35 @@ private: using ConstASIOSessionHandle = std::shared_ptr<const ASIOSession>; using GenericAcceptor = asio::basic_socket_acceptor<asio::generic::stream_protocol>; - ASIOSessionHandle createSession(); void _acceptConnection(GenericAcceptor& acceptor); stdx::mutex _mutex; + // There are two IO contexts that are used by TransportLayerASIO. The _workerIOContext + // contains all the accepted sockets and all normal networking activity. The + // _acceptorIOContext contains all the sockets in _acceptors. + // + // TransportLayerASIO should never call run() on the _workerIOContext. + // In synchronous mode, this will cause a massive performance degradation due to + // unnecessary wakeups on the asio thread for sockets we don't intend to interact + // with asynchronously. The additional IO context avoids registering those sockets + // with the acceptors epoll set, thus avoiding those wakeups. Calling run will + // undo that benefit. + // + // TransportLayerASIO should run its own thread that calls run() on the _acceptorIOContext + // to process calls to async_accept - this is the equivalent of the "listener" thread in + // other TransportLayers. + // + // The underlying problem that caused this is here: + // https://github.com/chriskohlhoff/asio/issues/240 + // // It is important that the io_context be declared before the // vector of acceptors (or any other state that is associated with // the io_context), so that we destroy any existing acceptors or // other io_service associated state before we drop the refcount // on the io_context, which may destroy it. - std::shared_ptr<asio::io_context> _ioContext; + std::shared_ptr<asio::io_context> _workerIOContext; + std::unique_ptr<asio::io_context> _acceptorIOContext; #ifdef MONGO_CONFIG_SSL std::unique_ptr<asio::ssl::context> _sslContext; |