summaryrefslogtreecommitdiff
path: root/src/mongo/transport
diff options
context:
space:
mode:
authorJonathan Reams <jbreams@mongodb.com>2017-09-07 13:21:26 -0400
committerJonathan Reams <jbreams@mongodb.com>2017-09-08 15:53:05 -0400
commitd4a9df74058d91f0adf977365589578dca1c1ff1 (patch)
tree8036f8f72028b5936d03fea79cce3ef8e28a6f7c /src/mongo/transport
parente80d7eeef4f2cee89a85997da4d157a49e5295c4 (diff)
downloadmongo-d4a9df74058d91f0adf977365589578dca1c1ff1.tar.gz
SERVER-29830 Run async_accept on separate io_context
Diffstat (limited to 'src/mongo/transport')
-rw-r--r--src/mongo/transport/session_asio.h51
-rw-r--r--src/mongo/transport/transport_layer_asio.cpp56
-rw-r--r--src/mongo/transport/transport_layer_asio.h22
3 files changed, 59 insertions, 70 deletions
diff --git a/src/mongo/transport/session_asio.h b/src/mongo/transport/session_asio.h
index 1a84f584419..41b9da16885 100644
--- a/src/mongo/transport/session_asio.h
+++ b/src/mongo/transport/session_asio.h
@@ -55,17 +55,28 @@ class TransportLayerASIO::ASIOSession : public Session {
public:
ASIOSession(TransportLayerASIO* tl, GenericSocket socket)
- : _socket(std::move(socket)), _tl(tl) {}
+ : _socket(std::move(socket)), _tl(tl) {
+ std::error_code ec;
- virtual ~ASIOSession() {
- if (_didPostAcceptSetup) {
- // This is incremented in TransportLayerASIO::_acceptConnection if there are less than
- // maxConns connections already established. A call to postAcceptSetup means that the
- // session is valid and will be handed off to the ServiceEntryPoint.
- //
- // We decrement this here to keep the counters in the TL accurate.
- _tl->_currentConnections.subtractAndFetch(1);
+ _socket.non_blocking(_tl->_listenerOptions.async, ec);
+ fassert(40490, ec.value() == 0);
+
+ auto family = endpointToSockAddr(_socket.local_endpoint()).getType();
+ if (family == AF_INET || family == AF_INET6) {
+ _socket.set_option(asio::ip::tcp::no_delay(true));
+ _socket.set_option(asio::socket_base::keep_alive(true));
+ setSocketKeepAliveParams(_socket.native_handle());
}
+
+ _local = endpointToHostAndPort(_socket.local_endpoint());
+ _remote = endpointToHostAndPort(_socket.remote_endpoint(ec));
+ if (ec) {
+ LOG(3) << "Unable to get remote endpoint address: " << ec.message();
+ }
+ }
+
+ virtual ~ASIOSession() {
+ _tl->_currentConnections.subtractAndFetch(1);
}
TransportLayer* getTransportLayer() const override {
@@ -106,27 +117,6 @@ public:
#endif
}
- void postAcceptSetup(bool async) {
- std::error_code ec;
- _socket.non_blocking(async, ec);
- fassert(40490, ec.value() == 0);
-
- auto family = endpointToSockAddr(_socket.local_endpoint()).getType();
- if (family == AF_INET || family == AF_INET6) {
- _socket.set_option(asio::ip::tcp::no_delay(true));
- _socket.set_option(asio::socket_base::keep_alive(true));
- setSocketKeepAliveParams(_socket.native_handle());
- }
-
- _local = endpointToHostAndPort(_socket.local_endpoint());
- _remote = endpointToHostAndPort(_socket.remote_endpoint(ec));
- if (ec) {
- LOG(3) << "Unable to get remote endpoint address: " << ec.message();
- }
-
- _didPostAcceptSetup = true;
- }
-
template <typename MutableBufferSequence, typename CompleteHandler>
void read(bool sync, const MutableBufferSequence& buffers, CompleteHandler&& handler) {
#ifdef MONGO_CONFIG_SSL
@@ -315,7 +305,6 @@ private:
#endif
TransportLayerASIO* const _tl;
- bool _didPostAcceptSetup = false;
};
} // namespace transport
diff --git a/src/mongo/transport/transport_layer_asio.cpp b/src/mongo/transport/transport_layer_asio.cpp
index 4cf0bafdd2c..5fbba53dcbd 100644
--- a/src/mongo/transport/transport_layer_asio.cpp
+++ b/src/mongo/transport/transport_layer_asio.cpp
@@ -73,15 +73,10 @@ TransportLayerASIO::Options::Options(const ServerGlobalParams* params)
maxConns(params->maxConns) {
}
-std::shared_ptr<TransportLayerASIO::ASIOSession> TransportLayerASIO::createSession() {
- GenericSocket socket(*_ioContext);
- std::shared_ptr<ASIOSession> ret(new ASIOSession(this, std::move(socket)));
- return ret;
-}
-
TransportLayerASIO::TransportLayerASIO(const TransportLayerASIO::Options& opts,
ServiceEntryPoint* sep)
- : _ioContext(std::make_shared<asio::io_context>()),
+ : _workerIOContext(std::make_shared<asio::io_context>()),
+ _acceptorIOContext(stdx::make_unique<asio::io_context>()),
#ifdef MONGO_CONFIG_SSL
_sslContext(nullptr),
#endif
@@ -190,7 +185,7 @@ Status TransportLayerASIO::setup() {
fassertFailedNoTrace(40488);
}
- GenericAcceptor acceptor(*_ioContext);
+ GenericAcceptor acceptor(*_acceptorIOContext);
acceptor.open(endpoint.protocol());
acceptor.set_option(GenericAcceptor::reuse_address(true));
@@ -244,22 +239,18 @@ Status TransportLayerASIO::start() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
_running.store(true);
- // If we're in async mode then the ServiceExecutor will handle calling run_one() in a pool
- // of threads. Otherwise we need a thread to just handle the async_accept calls.
- if (!_listenerOptions.async) {
- _listenerThread = stdx::thread([this] {
- setThreadName("listener");
- while (_running.load()) {
- try {
- _ioContext->run();
- _ioContext->reset();
- } catch (...) {
- severe() << "Uncaught exception in the listener: " << exceptionToStatus();
- fassertFailed(40491);
- }
+ _listenerThread = stdx::thread([this] {
+ setThreadName("listener");
+ while (_running.load()) {
+ asio::io_context::work work(*_acceptorIOContext);
+ try {
+ _acceptorIOContext->run();
+ } catch (...) {
+ severe() << "Uncaught exception in the listener: " << exceptionToStatus();
+ fassertFailed(40491);
}
- });
- }
+ }
+ });
for (auto& acceptor : _acceptors) {
acceptor.listen(serverGlobalParams.listenBacklog);
@@ -294,26 +285,17 @@ void TransportLayerASIO::shutdown() {
// Otherwise the ServiceExecutor may need to continue running the io_context to drain running
// connections, so we just cancel the acceptors and return.
if (_listenerThread.joinable()) {
- // We should only have started a listener if the TransportLayer is in sync mode.
- dassert(!_listenerOptions.async);
- _ioContext->stop();
+ _acceptorIOContext->stop();
_listenerThread.join();
}
}
const std::shared_ptr<asio::io_context>& TransportLayerASIO::getIOContext() {
- return _ioContext;
+ return _workerIOContext;
}
void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) {
- auto session = createSession();
- if (!session) {
- _acceptConnection(acceptor);
- return;
- }
-
- auto& socket = session->getSocket();
- auto acceptCb = [ this, session = std::move(session), &acceptor ](std::error_code ec) mutable {
+ auto acceptCb = [this, &acceptor](const std::error_code& ec, GenericSocket peerSocket) mutable {
if (!_running.load())
return;
@@ -332,7 +314,7 @@ void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) {
return;
}
- session->postAcceptSetup(_listenerOptions.async);
+ std::shared_ptr<ASIOSession> session(new ASIOSession(this, std::move(peerSocket)));
_createdConnections.addAndFetch(1);
if (!serverGlobalParams.quiet.load()) {
@@ -345,7 +327,7 @@ void TransportLayerASIO::_acceptConnection(GenericAcceptor& acceptor) {
_acceptConnection(acceptor);
};
- acceptor.async_accept(socket, std::move(acceptCb));
+ acceptor.async_accept(*_workerIOContext, std::move(acceptCb));
}
} // namespace transport
diff --git a/src/mongo/transport/transport_layer_asio.h b/src/mongo/transport/transport_layer_asio.h
index 7b7469b365f..5df87057f0e 100644
--- a/src/mongo/transport/transport_layer_asio.h
+++ b/src/mongo/transport/transport_layer_asio.h
@@ -123,17 +123,35 @@ private:
using ConstASIOSessionHandle = std::shared_ptr<const ASIOSession>;
using GenericAcceptor = asio::basic_socket_acceptor<asio::generic::stream_protocol>;
- ASIOSessionHandle createSession();
void _acceptConnection(GenericAcceptor& acceptor);
stdx::mutex _mutex;
+ // There are two IO contexts that are used by TransportLayerASIO. The _workerIOContext
+ // contains all the accepted sockets and all normal networking activity. The
+ // _acceptorIOContext contains all the sockets in _acceptors.
+ //
+ // TransportLayerASIO should never call run() on the _workerIOContext.
+ // In synchronous mode, this will cause a massive performance degradation due to
+ // unnecessary wakeups on the asio thread for sockets we don't intend to interact
+ // with asynchronously. The additional IO context avoids registering those sockets
+ // with the acceptors epoll set, thus avoiding those wakeups. Calling run will
+ // undo that benefit.
+ //
+ // TransportLayerASIO should run its own thread that calls run() on the _acceptorIOContext
+ // to process calls to async_accept - this is the equivalent of the "listener" thread in
+ // other TransportLayers.
+ //
+ // The underlying problem that caused this is here:
+ // https://github.com/chriskohlhoff/asio/issues/240
+ //
// It is important that the io_context be declared before the
// vector of acceptors (or any other state that is associated with
// the io_context), so that we destroy any existing acceptors or
// other io_service associated state before we drop the refcount
// on the io_context, which may destroy it.
- std::shared_ptr<asio::io_context> _ioContext;
+ std::shared_ptr<asio::io_context> _workerIOContext;
+ std::unique_ptr<asio::io_context> _acceptorIOContext;
#ifdef MONGO_CONFIG_SSL
std::unique_ptr<asio::ssl::context> _sslContext;