From c836472353e736424c9bb87868508c9e633b892d Mon Sep 17 00:00:00 2001 From: Jason Carey Date: Wed, 28 Oct 2015 14:41:07 -0400 Subject: SERVER-20143 Strand NetworkInterfaceASIO Add strands (and an option for multiple io workers) in NetworkInterfaceASIO. strands are an asio specific mechanism for ensuring thread safety. --- src/mongo/executor/async_mock_stream_factory.cpp | 26 ++-- src/mongo/executor/async_mock_stream_factory.h | 6 +- src/mongo/executor/async_secure_stream.cpp | 32 ++-- src/mongo/executor/async_secure_stream.h | 3 +- src/mongo/executor/async_secure_stream_factory.cpp | 6 +- src/mongo/executor/async_secure_stream_factory.h | 2 +- src/mongo/executor/async_stream.cpp | 11 +- src/mongo/executor/async_stream.h | 3 +- src/mongo/executor/async_stream_common.h | 22 ++- src/mongo/executor/async_stream_factory.cpp | 6 +- src/mongo/executor/async_stream_factory.h | 2 +- .../executor/async_stream_factory_interface.h | 2 +- src/mongo/executor/async_timer_asio.cpp | 10 +- src/mongo/executor/async_timer_asio.h | 5 +- src/mongo/executor/async_timer_interface.h | 2 +- src/mongo/executor/async_timer_mock.cpp | 2 +- src/mongo/executor/async_timer_mock.h | 2 +- src/mongo/executor/connection_pool_asio.cpp | 161 +++++++++++---------- src/mongo/executor/connection_pool_asio.h | 6 +- src/mongo/executor/network_interface_asio.cpp | 157 +++++++++++--------- src/mongo/executor/network_interface_asio.h | 36 ++++- .../executor/network_interface_asio_connect.cpp | 4 +- .../executor/network_interface_asio_operation.cpp | 43 +++--- 23 files changed, 303 insertions(+), 246 deletions(-) diff --git a/src/mongo/executor/async_mock_stream_factory.cpp b/src/mongo/executor/async_mock_stream_factory.cpp index 341615bdbce..da3bd426508 100644 --- a/src/mongo/executor/async_mock_stream_factory.cpp +++ b/src/mongo/executor/async_mock_stream_factory.cpp @@ -67,25 +67,23 @@ StringData stateToString(AsyncMockStreamFactory::MockStream::StreamState state) } template -void checkCanceled(asio::io_service* io_service, +void checkCanceled(asio::io_service::strand* strand, AsyncMockStreamFactory::MockStream::StreamState* state, Handler&& handler, std::size_t bytes, std::error_code ec = std::error_code()) { auto wasCancelled = (*state == AsyncMockStreamFactory::MockStream::StreamState::kCanceled); *state = AsyncMockStreamFactory::MockStream::StreamState::kRunning; - asio::post(*io_service, - [handler, wasCancelled, bytes, ec] { - handler(wasCancelled ? make_error_code(asio::error::operation_aborted) : ec, - bytes); - }); + strand->post([handler, wasCancelled, bytes, ec] { + handler(wasCancelled ? make_error_code(asio::error::operation_aborted) : ec, bytes); + }); } } // namespace std::unique_ptr AsyncMockStreamFactory::makeStream( - asio::io_service* io_service, const HostAndPort& target) { - return stdx::make_unique(io_service, this, target); + asio::io_service::strand* strand, const HostAndPort& target) { + return stdx::make_unique(strand, this, target); } void AsyncMockStreamFactory::_createStream(const HostAndPort& host, MockStream* stream) { @@ -112,10 +110,10 @@ AsyncMockStreamFactory::MockStream* AsyncMockStreamFactory::blockUntilStreamExis return iter->second; } -AsyncMockStreamFactory::MockStream::MockStream(asio::io_service* io_service, +AsyncMockStreamFactory::MockStream::MockStream(asio::io_service::strand* strand, AsyncMockStreamFactory* factory, const HostAndPort& target) - : _io_service(io_service), _factory(factory), _target(target) { + : _strand(strand), _factory(factory), _target(target) { _factory->_createStream(_target, this); } @@ -133,7 +131,7 @@ void AsyncMockStreamFactory::MockStream::connect(asio::ip::tcp::resolver::iterat // We shim a lambda to give connectHandler the right signature since it doesn't take // a size_t param. checkCanceled( - _io_service, + _strand, &_state, [connectHandler](std::error_code ec, std::size_t) { return connectHandler(ec); }, 0); @@ -152,7 +150,7 @@ void AsyncMockStreamFactory::MockStream::write(asio::const_buffer buf, _defer_inlock(kBlockedAfterWrite, [this, writeHandler, size]() { stdx::unique_lock lk(_mutex); - checkCanceled(_io_service, &_state, std::move(writeHandler), size); + checkCanceled(_strand, &_state, std::move(writeHandler), size); }); } @@ -207,7 +205,7 @@ void AsyncMockStreamFactory::MockStream::read(asio::mutable_buffer buf, }; } - checkCanceled(_io_service, &_state, std::move(handler), nToCopy, _error); + checkCanceled(_strand, &_state, std::move(handler), nToCopy, _error); _error.clear(); }); } @@ -259,7 +257,7 @@ void AsyncMockStreamFactory::MockStream::_unblock_inlock() { } // Post our deferred action to resume state machine execution invariant(_deferredAction); - _io_service->post(std::move(_deferredAction)); + _strand->post(std::move(_deferredAction)); _deferredAction = nullptr; } diff --git a/src/mongo/executor/async_mock_stream_factory.h b/src/mongo/executor/async_mock_stream_factory.h index d7e1c258892..e64ab97e5b8 100644 --- a/src/mongo/executor/async_mock_stream_factory.h +++ b/src/mongo/executor/async_mock_stream_factory.h @@ -54,12 +54,12 @@ class AsyncMockStreamFactory final : public AsyncStreamFactoryInterface { public: AsyncMockStreamFactory() = default; - std::unique_ptr makeStream(asio::io_service* io_service, + std::unique_ptr makeStream(asio::io_service::strand* strand, const HostAndPort& host) override; class MockStream final : public AsyncStreamInterface { public: - MockStream(asio::io_service* io_service, + MockStream(asio::io_service::strand* strand, AsyncMockStreamFactory* factory, const HostAndPort& target); @@ -103,7 +103,7 @@ public: void _defer_inlock(StreamState state, Action&& handler); void _unblock_inlock(); - asio::io_service* _io_service; + asio::io_service::strand* _strand; AsyncMockStreamFactory* _factory; HostAndPort _target; diff --git a/src/mongo/executor/async_secure_stream.cpp b/src/mongo/executor/async_secure_stream.cpp index 964f805d07d..a3a1acc5fb7 100644 --- a/src/mongo/executor/async_secure_stream.cpp +++ b/src/mongo/executor/async_secure_stream.cpp @@ -43,8 +43,9 @@ namespace mongo { namespace executor { -AsyncSecureStream::AsyncSecureStream(asio::io_service* io_service, asio::ssl::context* sslContext) - : _stream(*io_service, *sslContext) {} +AsyncSecureStream::AsyncSecureStream(asio::io_service::strand* strand, + asio::ssl::context* sslContext) + : _strand(strand), _stream(_strand->get_io_service(), *sslContext) {} AsyncSecureStream::~AsyncSecureStream() { destroyStream(&_stream.lowest_layer(), _connected); @@ -55,33 +56,34 @@ void AsyncSecureStream::connect(const asio::ip::tcp::resolver::iterator endpoint // Stash the connectHandler as we won't be able to call it until we re-enter the state // machine. _userHandler = std::move(connectHandler); - asio::async_connect(_stream.lowest_layer(), - std::move(endpoints), - [this](std::error_code ec, asio::ip::tcp::resolver::iterator iter) { - if (ec) { - return _userHandler(ec); - } - _connected = true; - return _handleConnect(std::move(iter)); - }); + asio::async_connect( + _stream.lowest_layer(), + std::move(endpoints), + _strand->wrap([this](std::error_code ec, asio::ip::tcp::resolver::iterator iter) { + if (ec) { + return _userHandler(ec); + } + _connected = true; + return _handleConnect(std::move(iter)); + })); } void AsyncSecureStream::write(asio::const_buffer buffer, StreamHandler&& streamHandler) { - writeStream(&_stream, _connected, buffer, std::move(streamHandler)); + writeStream(&_stream, _strand, _connected, buffer, std::move(streamHandler)); } void AsyncSecureStream::read(asio::mutable_buffer buffer, StreamHandler&& streamHandler) { - readStream(&_stream, _connected, buffer, std::move(streamHandler)); + readStream(&_stream, _strand, _connected, buffer, std::move(streamHandler)); } void AsyncSecureStream::_handleConnect(asio::ip::tcp::resolver::iterator iter) { _stream.async_handshake(decltype(_stream)::client, - [this, iter](std::error_code ec) { + _strand->wrap([this, iter](std::error_code ec) { if (ec) { return _userHandler(ec); } return _handleHandshake(ec, iter->host_name()); - }); + })); } void AsyncSecureStream::_handleHandshake(std::error_code ec, const std::string& hostName) { diff --git a/src/mongo/executor/async_secure_stream.h b/src/mongo/executor/async_secure_stream.h index 699832f1ffd..cdbb54c8a31 100644 --- a/src/mongo/executor/async_secure_stream.h +++ b/src/mongo/executor/async_secure_stream.h @@ -42,7 +42,7 @@ namespace executor { class AsyncSecureStream final : public AsyncStreamInterface { public: - AsyncSecureStream(asio::io_service* io_service, asio::ssl::context* sslContext); + AsyncSecureStream(asio::io_service::strand* strand, asio::ssl::context* sslContext); ~AsyncSecureStream(); @@ -60,6 +60,7 @@ private: void _handleHandshake(std::error_code ec, const std::string& hostName); + asio::io_service::strand* const _strand; asio::ssl::stream _stream; ConnectHandler _userHandler; bool _connected = false; diff --git a/src/mongo/executor/async_secure_stream_factory.cpp b/src/mongo/executor/async_secure_stream_factory.cpp index a933975a981..ce6bd73f2d0 100644 --- a/src/mongo/executor/async_secure_stream_factory.cpp +++ b/src/mongo/executor/async_secure_stream_factory.cpp @@ -52,12 +52,12 @@ AsyncSecureStreamFactory::AsyncSecureStreamFactory(SSLManagerInterface* sslManag } std::unique_ptr AsyncSecureStreamFactory::makeStream( - asio::io_service* io_service, const HostAndPort&) { + asio::io_service::strand* strand, const HostAndPort&) { int sslModeVal = getSSLGlobalParams().sslMode.load(); if (sslModeVal == SSLParams::SSLMode_preferSSL || sslModeVal == SSLParams::SSLMode_requireSSL) { - return stdx::make_unique(io_service, &_sslContext); + return stdx::make_unique(strand, &_sslContext); } - return stdx::make_unique(io_service); + return stdx::make_unique(strand); } } // namespace executor diff --git a/src/mongo/executor/async_secure_stream_factory.h b/src/mongo/executor/async_secure_stream_factory.h index ecc03939cda..4d08af71eff 100644 --- a/src/mongo/executor/async_secure_stream_factory.h +++ b/src/mongo/executor/async_secure_stream_factory.h @@ -47,7 +47,7 @@ class AsyncSecureStreamFactory final : public AsyncStreamFactoryInterface { public: AsyncSecureStreamFactory(SSLManagerInterface* sslManager); - std::unique_ptr makeStream(asio::io_service* io_service, + std::unique_ptr makeStream(asio::io_service::strand* strand, const HostAndPort&) override; private: diff --git a/src/mongo/executor/async_stream.cpp b/src/mongo/executor/async_stream.cpp index 1ac8a47470f..398faaf2b6e 100644 --- a/src/mongo/executor/async_stream.cpp +++ b/src/mongo/executor/async_stream.cpp @@ -40,7 +40,8 @@ namespace executor { using asio::ip::tcp; -AsyncStream::AsyncStream(asio::io_service* io_service) : _stream(*io_service) {} +AsyncStream::AsyncStream(asio::io_service::strand* strand) + : _strand(strand), _stream(_strand->get_io_service()) {} AsyncStream::~AsyncStream() { destroyStream(&_stream, _connected); @@ -52,22 +53,22 @@ void AsyncStream::connect(tcp::resolver::iterator iter, ConnectHandler&& connect std::move(iter), // We need to wrap this with a lambda of the right signature so it compiles, even // if we don't actually use the resolver iterator. - [this, connectHandler](std::error_code ec, tcp::resolver::iterator iter) { + _strand->wrap([this, connectHandler](std::error_code ec, tcp::resolver::iterator iter) { if (!ec) { // We assume that our owner is responsible for keeping us alive until we call // connectHandler, so _connected should always be a valid memory location. _connected = true; } return connectHandler(ec); - }); + })); } void AsyncStream::write(asio::const_buffer buffer, StreamHandler&& streamHandler) { - writeStream(&_stream, _connected, buffer, std::move(streamHandler)); + writeStream(&_stream, _strand, _connected, buffer, std::move(streamHandler)); } void AsyncStream::read(asio::mutable_buffer buffer, StreamHandler&& streamHandler) { - readStream(&_stream, _connected, buffer, std::move(streamHandler)); + readStream(&_stream, _strand, _connected, buffer, std::move(streamHandler)); } void AsyncStream::cancel() { diff --git a/src/mongo/executor/async_stream.h b/src/mongo/executor/async_stream.h index c49014c860f..1e4e6982196 100644 --- a/src/mongo/executor/async_stream.h +++ b/src/mongo/executor/async_stream.h @@ -37,7 +37,7 @@ namespace executor { class AsyncStream final : public AsyncStreamInterface { public: - AsyncStream(asio::io_service* io_service); + AsyncStream(asio::io_service::strand* strand); ~AsyncStream(); @@ -50,6 +50,7 @@ public: void cancel() override; private: + asio::io_service::strand* const _strand; asio::ip::tcp::socket _stream; bool _connected = false; }; diff --git a/src/mongo/executor/async_stream_common.h b/src/mongo/executor/async_stream_common.h index 7b06881551c..ef154c0cc6f 100644 --- a/src/mongo/executor/async_stream_common.h +++ b/src/mongo/executor/async_stream_common.h @@ -51,17 +51,27 @@ void destroyStream(ASIOStream* stream, bool connected) { } template -void writeStream(ASIOStream* stream, bool connected, Buffer&& buffer, Handler&& handler) { +void writeStream(ASIOStream* stream, + asio::io_service::strand* strand, + bool connected, + Buffer&& buffer, + Handler&& handler) { invariant(connected); - asio::async_write( - *stream, asio::buffer(std::forward(buffer)), std::forward(handler)); + asio::async_write(*stream, + asio::buffer(std::forward(buffer)), + strand->wrap(std::forward(handler))); } template -void readStream(ASIOStream* stream, bool connected, Buffer&& buffer, Handler&& handler) { +void readStream(ASIOStream* stream, + asio::io_service::strand* strand, + bool connected, + Buffer&& buffer, + Handler&& handler) { invariant(connected); - asio::async_read( - *stream, asio::buffer(std::forward(buffer)), std::forward(handler)); + asio::async_read(*stream, + asio::buffer(std::forward(buffer)), + strand->wrap(std::forward(handler))); } template diff --git a/src/mongo/executor/async_stream_factory.cpp b/src/mongo/executor/async_stream_factory.cpp index 8e8fb93b6d7..5f24adf341b 100644 --- a/src/mongo/executor/async_stream_factory.cpp +++ b/src/mongo/executor/async_stream_factory.cpp @@ -38,9 +38,9 @@ namespace mongo { namespace executor { -std::unique_ptr AsyncStreamFactory::makeStream(asio::io_service* io_service, - const HostAndPort&) { - return stdx::make_unique(io_service); +std::unique_ptr AsyncStreamFactory::makeStream( + asio::io_service::strand* strand, const HostAndPort&) { + return stdx::make_unique(strand); } } // namespace executor diff --git a/src/mongo/executor/async_stream_factory.h b/src/mongo/executor/async_stream_factory.h index 198ea68b877..bbf9767427b 100644 --- a/src/mongo/executor/async_stream_factory.h +++ b/src/mongo/executor/async_stream_factory.h @@ -41,7 +41,7 @@ class AsyncStreamFactory final : public AsyncStreamFactoryInterface { public: AsyncStreamFactory() = default; - std::unique_ptr makeStream(asio::io_service* io_service, + std::unique_ptr makeStream(asio::io_service::strand* strand, const HostAndPort&) override; }; diff --git a/src/mongo/executor/async_stream_factory_interface.h b/src/mongo/executor/async_stream_factory_interface.h index c2f6261556a..c894a1fee5a 100644 --- a/src/mongo/executor/async_stream_factory_interface.h +++ b/src/mongo/executor/async_stream_factory_interface.h @@ -48,7 +48,7 @@ class AsyncStreamFactoryInterface { public: virtual ~AsyncStreamFactoryInterface() = default; - virtual std::unique_ptr makeStream(asio::io_service* io_service, + virtual std::unique_ptr makeStream(asio::io_service::strand* strand, const HostAndPort& target) = 0; protected: diff --git a/src/mongo/executor/async_timer_asio.cpp b/src/mongo/executor/async_timer_asio.cpp index bbd1373a0d3..293ab3e137a 100644 --- a/src/mongo/executor/async_timer_asio.cpp +++ b/src/mongo/executor/async_timer_asio.cpp @@ -33,20 +33,20 @@ namespace mongo { namespace executor { -AsyncTimerASIO::AsyncTimerASIO(asio::io_service* service, Milliseconds expiration) - : _timer(*service, expiration) {} +AsyncTimerASIO::AsyncTimerASIO(asio::io_service::strand* strand, Milliseconds expiration) + : _strand(strand), _timer(_strand->get_io_service(), expiration) {} void AsyncTimerASIO::cancel() { _timer.cancel(); } void AsyncTimerASIO::asyncWait(AsyncTimerInterface::Handler handler) { - _timer.async_wait(std::move(handler)); + _timer.async_wait(_strand->wrap(std::move(handler))); } -std::unique_ptr AsyncTimerFactoryASIO::make(asio::io_service* service, +std::unique_ptr AsyncTimerFactoryASIO::make(asio::io_service::strand* strand, Milliseconds expiration) { - return stdx::make_unique(service, expiration); + return stdx::make_unique(strand, expiration); } } // namespace executor diff --git a/src/mongo/executor/async_timer_asio.h b/src/mongo/executor/async_timer_asio.h index ae09b4b9247..eb3ae02e769 100644 --- a/src/mongo/executor/async_timer_asio.h +++ b/src/mongo/executor/async_timer_asio.h @@ -37,13 +37,14 @@ namespace executor { class AsyncTimerASIO final : public AsyncTimerInterface { public: - AsyncTimerASIO(asio::io_service* service, Milliseconds expiration); + AsyncTimerASIO(asio::io_service::strand* strand, Milliseconds expiration); void cancel() override; void asyncWait(AsyncTimerInterface::Handler handler) override; private: + asio::io_service::strand* const _strand; asio::steady_timer _timer; }; @@ -51,7 +52,7 @@ class AsyncTimerFactoryASIO final : public AsyncTimerFactoryInterface { public: AsyncTimerFactoryASIO() = default; - std::unique_ptr make(asio::io_service* service, + std::unique_ptr make(asio::io_service::strand* strand, Milliseconds expiration) override; }; diff --git a/src/mongo/executor/async_timer_interface.h b/src/mongo/executor/async_timer_interface.h index 6a63850a0ef..6f18d4ece7f 100644 --- a/src/mongo/executor/async_timer_interface.h +++ b/src/mongo/executor/async_timer_interface.h @@ -81,7 +81,7 @@ class AsyncTimerFactoryInterface { public: virtual ~AsyncTimerFactoryInterface() = default; - virtual std::unique_ptr make(asio::io_service* io_service, + virtual std::unique_ptr make(asio::io_service::strand* strand, Milliseconds expiration) = 0; protected: diff --git a/src/mongo/executor/async_timer_mock.cpp b/src/mongo/executor/async_timer_mock.cpp index 7460e840993..9807947dcf4 100644 --- a/src/mongo/executor/async_timer_mock.cpp +++ b/src/mongo/executor/async_timer_mock.cpp @@ -94,7 +94,7 @@ std::unique_ptr AsyncTimerFactoryMock::make(Milliseconds ex return make(nullptr, expiration); } -std::unique_ptr AsyncTimerFactoryMock::make(asio::io_service* io_service, +std::unique_ptr AsyncTimerFactoryMock::make(asio::io_service::strand* strand, Milliseconds expiration) { stdx::lock_guard lk(_timersMutex); auto elem = _timers.emplace(std::make_shared(expiration)); diff --git a/src/mongo/executor/async_timer_mock.h b/src/mongo/executor/async_timer_mock.h index a20bf84a6ff..6c0e4c755f2 100644 --- a/src/mongo/executor/async_timer_mock.h +++ b/src/mongo/executor/async_timer_mock.h @@ -119,7 +119,7 @@ public: /** * Create and return a new AsyncTimerMock object. */ - std::unique_ptr make(asio::io_service* io_service, + std::unique_ptr make(asio::io_service::strand* strand, Milliseconds expiration) override; /** diff --git a/src/mongo/executor/connection_pool_asio.cpp b/src/mongo/executor/connection_pool_asio.cpp index 39d70cd6c4e..12e6d0bd062 100644 --- a/src/mongo/executor/connection_pool_asio.cpp +++ b/src/mongo/executor/connection_pool_asio.cpp @@ -42,9 +42,9 @@ namespace mongo { namespace executor { namespace connection_pool_asio { -ASIOTimer::ASIOTimer(asio::io_service* io_service) - : _io_service(io_service), - _impl(*io_service), +ASIOTimer::ASIOTimer(asio::io_service::strand* strand) + : _strand(strand), + _impl(strand->get_io_service()), _callbackSharedState(std::make_shared()) {} ASIOTimer::~ASIOTimer() { @@ -52,55 +52,60 @@ ASIOTimer::~ASIOTimer() { } void ASIOTimer::setTimeout(Milliseconds timeout, TimeoutCallback cb) { - _cb = std::move(cb); + _strand->dispatch([this, timeout, cb] { + _cb = std::move(cb); - cancelTimeout(); - _impl.expires_after(timeout); - - decltype(_callbackSharedState->id) id; - decltype(_callbackSharedState) sharedState; + cancelTimeout(); + _impl.expires_after(timeout); - { - stdx::lock_guard lk(_callbackSharedState->mutex); - id = ++_callbackSharedState->id; - sharedState = _callbackSharedState; - } + decltype(_callbackSharedState->id) id; + decltype(_callbackSharedState) sharedState; - _impl.async_wait([this, id, sharedState](const asio::error_code& error) { - if (error == asio::error::operation_aborted) { - return; + { + stdx::lock_guard lk(_callbackSharedState->mutex); + id = ++_callbackSharedState->id; + sharedState = _callbackSharedState; } - stdx::unique_lock lk(sharedState->mutex); - - // If the id in shared state doesn't match the id in our callback, it - // means we were cancelled, but still executed. This can occur if we - // were cancelled just before our timeout. We need a generation, rather - // than just a bool here, because we could have been cancelled and - // another callback set, in which case we shouldn't run and the we - // should let the other callback execute instead. - if (sharedState->id == id) { - auto cb = std::move(_cb); - lk.unlock(); - cb(); - } + _impl.async_wait(_strand->wrap([this, id, sharedState](const asio::error_code& error) { + if (error == asio::error::operation_aborted) { + return; + } + + stdx::unique_lock lk(sharedState->mutex); + + // If the id in shared state doesn't match the id in our callback, it + // means we were cancelled, but still executed. This can occur if we + // were cancelled just before our timeout. We need a generation, rather + // than just a bool here, because we could have been cancelled and + // another callback set, in which case we shouldn't run and the we + // should let the other callback execute instead. + if (sharedState->id == id) { + auto cb = std::move(_cb); + lk.unlock(); + cb(); + } + })); }); } void ASIOTimer::cancelTimeout() { - { - stdx::lock_guard lk(_callbackSharedState->mutex); - _callbackSharedState->id++; - } - _impl.cancel(); + _strand->dispatch([this] { + { + stdx::lock_guard lk(_callbackSharedState->mutex); + _callbackSharedState->id++; + } + + _impl.cancel(); + }); } ASIOConnection::ASIOConnection(const HostAndPort& hostAndPort, size_t generation, ASIOImpl* global) : _global(global), - _timer(&global->_impl->_io_service), _hostAndPort(hostAndPort), _generation(generation), - _impl(makeAsyncOp(this)) {} + _impl(makeAsyncOp(this)), + _timer(&_impl->strand()) {} void ASIOConnection::indicateSuccess() { indicateUsed(); @@ -166,9 +171,11 @@ void ASIOConnection::cancelTimeout() { } void ASIOConnection::setup(Milliseconds timeout, SetupCallback cb) { - _setupCallback = std::move(cb); + _impl->strand().dispatch([this, timeout, cb] { + _setupCallback = std::move(cb); - _global->_impl->_connect(_impl.get()); + _global->_impl->_connect(_impl.get()); + }); } void ASIOConnection::resetToUnknown() { @@ -176,48 +183,48 @@ void ASIOConnection::resetToUnknown() { } void ASIOConnection::refresh(Milliseconds timeout, RefreshCallback cb) { - auto op = _impl.get(); - - _refreshCallback = std::move(cb); - - // Actually timeout refreshes - setTimeout(timeout, - [this]() { - asio::post(_global->_impl->_io_service, - [this] { _impl->connection().stream().cancel(); }); - }); - - // Our pings are isMaster's - auto beginStatus = op->beginCommand(makeIsMasterRequest(this), - NetworkInterfaceASIO::AsyncCommand::CommandType::kRPC, - _hostAndPort); - if (!beginStatus.isOK()) { - auto cb = std::move(_refreshCallback); - cb(this, beginStatus); - return; - } - - // If we fail during refresh, the _onFinish function of the AsyncOp will get called. As such we - // need to intercept those calls so we can capture them. This will get cleared out when we fill - // in the real onFinish in startCommand. - op->setOnFinish([this](StatusWith failedResponse) { - invariant(!failedResponse.isOK()); - auto cb = std::move(_refreshCallback); - cb(this, failedResponse.getStatus()); - }); + _impl->strand().dispatch([this, timeout, cb] { + auto op = _impl.get(); - _global->_impl->_asyncRunCommand( - op, - [this, op](std::error_code ec, size_t bytes) { - cancelTimeout(); + _refreshCallback = std::move(cb); - auto cb = std::move(_refreshCallback); + // Actually timeout refreshes + setTimeout(timeout, [this]() { _impl->connection().stream().cancel(); }); - if (ec) - return cb(this, Status(ErrorCodes::HostUnreachable, ec.message())); + // Our pings are isMaster's + auto beginStatus = op->beginCommand(makeIsMasterRequest(this), + NetworkInterfaceASIO::AsyncCommand::CommandType::kRPC, + _hostAndPort); + if (!beginStatus.isOK()) { + auto cb = std::move(_refreshCallback); + cb(this, beginStatus); + return; + } - cb(this, Status::OK()); + // If we fail during refresh, the _onFinish function of the AsyncOp will get called. As such + // we + // need to intercept those calls so we can capture them. This will get cleared out when we + // fill + // in the real onFinish in startCommand. + op->setOnFinish([this](StatusWith failedResponse) { + invariant(!failedResponse.isOK()); + auto cb = std::move(_refreshCallback); + cb(this, failedResponse.getStatus()); }); + + _global->_impl->_asyncRunCommand( + op, + [this, op](std::error_code ec, size_t bytes) { + cancelTimeout(); + + auto cb = std::move(_refreshCallback); + + if (ec) + return cb(this, Status(ErrorCodes::HostUnreachable, ec.message())); + + cb(this, Status::OK()); + }); + }); } std::unique_ptr ASIOConnection::releaseAsyncOp() { @@ -235,7 +242,7 @@ Date_t ASIOImpl::now() { } std::unique_ptr ASIOImpl::makeTimer() { - return stdx::make_unique(&_impl->_io_service); + return stdx::make_unique(&_impl->_strand); } std::unique_ptr ASIOImpl::makeConnection( diff --git a/src/mongo/executor/connection_pool_asio.h b/src/mongo/executor/connection_pool_asio.h index 6700819c63d..44235d23e16 100644 --- a/src/mongo/executor/connection_pool_asio.h +++ b/src/mongo/executor/connection_pool_asio.h @@ -44,7 +44,7 @@ namespace connection_pool_asio { */ class ASIOTimer final : public ConnectionPool::TimerInterface { public: - ASIOTimer(asio::io_service* service); + ASIOTimer(asio::io_service::strand* strand); ~ASIOTimer(); void setTimeout(Milliseconds timeout, TimeoutCallback cb) override; @@ -57,7 +57,7 @@ private: }; TimeoutCallback _cb; - asio::io_service* const _io_service; + asio::io_service::strand* const _strand; asio::steady_timer _impl; std::shared_ptr _callbackSharedState; }; @@ -99,12 +99,12 @@ private: SetupCallback _setupCallback; RefreshCallback _refreshCallback; ASIOImpl* const _global; - ASIOTimer _timer; Date_t _lastUsed; Status _status = ConnectionPool::kConnectionStateUnknown; HostAndPort _hostAndPort; size_t _generation; std::unique_ptr _impl; + ASIOTimer _timer; }; /** diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp index 5f554b4fa94..489246d217b 100644 --- a/src/mongo/executor/network_interface_asio.cpp +++ b/src/mongo/executor/network_interface_asio.cpp @@ -52,6 +52,10 @@ namespace mongo { namespace executor { +namespace { +const std::size_t kIOServiceWorkers = 1; +} // namespace + #if defined(_MSC_VER) && _MSC_VER < 1900 NetworkInterfaceASIO::Options::Options(Options&& other) : connectionPoolOptions(std::move(other.connectionPoolOptions)), @@ -75,13 +79,13 @@ NetworkInterfaceASIO::NetworkInterfaceASIO(Options options) _io_service(), _metadataHook(std::move(_options.metadataHook)), _hook(std::move(_options.networkConnectionHook)), - _resolver(_io_service), _state(State::kReady), _timerFactory(std::move(_options.timerFactory)), _streamFactory(std::move(_options.streamFactory)), _connectionPool(stdx::make_unique(this), _options.connectionPoolOptions), - _isExecutorRunnable(false) {} + _isExecutorRunnable(false), + _strand(_io_service) {} std::string NetworkInterfaceASIO::getDiagnosticString() { str::stream output; @@ -99,25 +103,31 @@ std::string NetworkInterfaceASIO::getHostName() { } void NetworkInterfaceASIO::startup() { - _serviceRunner = stdx::thread([this]() { - setThreadName("NetworkInterfaceASIO"); - try { - LOG(2) << "The NetworkInterfaceASIO worker thread is spinning up"; - asio::io_service::work work(_io_service); - _io_service.run(); - } catch (...) { - severe() << "Uncaught exception in NetworkInterfaceASIO IO worker thread of type: " - << exceptionToStatus(); - fassertFailed(28820); - } - }); + std::generate_n(std::back_inserter(_serviceRunners), + kIOServiceWorkers, + [&] { + return stdx::thread([this]() { + setThreadName("NetworkInterfaceASIO"); + try { + LOG(2) << "The NetworkInterfaceASIO worker thread is spinning up"; + asio::io_service::work work(_io_service); + _io_service.run(); + } catch (...) { + severe() << "Uncaught exception in NetworkInterfaceASIO IO " + "worker thread of type: " << exceptionToStatus(); + fassertFailed(28820); + } + }); + }); _state.store(State::kRunning); } void NetworkInterfaceASIO::shutdown() { _state.store(State::kShutdown); _io_service.stop(); - _serviceRunner.join(); + for (auto&& worker : _serviceRunners) { + worker.join(); + } LOG(2) << "NetworkInterfaceASIO shutdown successfully"; } @@ -197,35 +207,35 @@ void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHa AsyncOp* op = nullptr; - { - stdx::unique_lock lk(_inProgressMutex); + stdx::unique_lock lk(_inProgressMutex); - const auto eraseCount = _inGetConnection.erase(cbHandle); + const auto eraseCount = _inGetConnection.erase(cbHandle); - // If we didn't find the request, we've been canceled - if (eraseCount == 0) { - lk.unlock(); + // If we didn't find the request, we've been canceled + if (eraseCount == 0) { + lk.unlock(); - onFinish({ErrorCodes::CallbackCanceled, "Callback canceled"}); + onFinish({ErrorCodes::CallbackCanceled, "Callback canceled"}); - // Though we were canceled, we know that the stream is fine, so indicate success. - conn->indicateSuccess(); + // Though we were canceled, we know that the stream is fine, so indicate success. + conn->indicateSuccess(); - signalWorkAvailable(); + signalWorkAvailable(); - return; - } + return; + } - // We can't release the AsyncOp until we know we were not canceled. - auto ownedOp = conn->releaseAsyncOp(); - op = ownedOp.get(); + // We can't release the AsyncOp until we know we were not canceled. + auto ownedOp = conn->releaseAsyncOp(); + op = ownedOp.get(); - // Sanity check that we are getting a clean AsyncOp. - invariant(!op->canceled()); - invariant(!op->timedOut()); + // Sanity check that we are getting a clean AsyncOp. + invariant(!op->canceled()); + invariant(!op->timedOut()); - _inProgress.emplace(op, std::move(ownedOp)); - } + // Now that we're inProgress, an external cancel can touch our op, but + // not until we release the inProgressMutex. + _inProgress.emplace(op, std::move(ownedOp)); op->_cbHandle = std::move(cbHandle); op->_request = std::move(request); @@ -233,50 +243,53 @@ void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHa op->_connectionPoolHandle = std::move(swConn.getValue()); op->_start = startTime; - // Set timeout now that we have the correct request object - if (op->_request.timeout != RemoteCommandRequest::kNoTimeout) { - op->_timeoutAlarm = op->_owner->_timerFactory->make(&_io_service, op->_request.timeout); - - std::shared_ptr access; - std::size_t generation; - { - stdx::lock_guard lk(op->_access->mutex); - access = op->_access; - generation = access->id; - } - - op->_timeoutAlarm->asyncWait([this, op, access, generation](std::error_code ec) { - if (!ec) { - // We must pass a check for safe access before using op inside the - // callback or we may attempt access on an invalid pointer. - stdx::lock_guard lk(access->mutex); - if (generation != access->id) { - // The operation has been cleaned up, do not access. - return; - } - - LOG(2) << "Operation timed out: " << op->request().toString(); + // This ditches the lock and gets us onto the strand (so we're + // threadsafe) + op->_strand.post([this, op] { + // Set timeout now that we have the correct request object + if (op->_request.timeout != RemoteCommandRequest::kNoTimeout) { + op->_timeoutAlarm = + op->_owner->_timerFactory->make(&op->_strand, op->_request.timeout); + + std::shared_ptr access; + std::size_t generation; + { + stdx::lock_guard lk(op->_access->mutex); + access = op->_access; + generation = access->id; + } - // An operation may be in mid-flight when it times out, so we - // cancel any in-progress async calls but do not complete the operation now. - if (op->_connection) { - op->_connection->cancel(); + op->_timeoutAlarm->asyncWait([this, op, access, generation](std::error_code ec) { + if (!ec) { + // We must pass a check for safe access before using op inside the + // callback or we may attempt access on an invalid pointer. + stdx::lock_guard lk(access->mutex); + if (generation != access->id) { + // The operation has been cleaned up, do not access. + return; + } + + LOG(2) << "Operation timed out: " << op->request().toString(); + + // An operation may be in mid-flight when it times out, so we + // cancel any in-progress async calls but do not complete the operation now. + op->_timedOut = 1; + if (op->_connection) { + op->_connection->cancel(); + } + } else { + LOG(4) << "failed to time operation out: " << ec.message(); } - op->_timedOut.store(1); - } else { - LOG(4) << "failed to time operation out: " << ec.message(); - } - }); - } + }); + } - _beginCommunication(op); + _beginCommunication(op); + }); }; // TODO: thread some higher level timeout through, rather than 5 minutes, // once we make timeouts pervasive in this api. - asio::post( - _io_service, - [this, request, nextStep] { _connectionPool.get(request.target, Minutes(5), nextStep); }); + _connectionPool.get(request.target, Minutes(5), nextStep); } void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) { diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h index 74309aaf465..d7c73bc715e 100644 --- a/src/mongo/executor/network_interface_asio.h +++ b/src/mongo/executor/network_interface_asio.h @@ -278,6 +278,14 @@ private: void setOnFinish(RemoteCommandCompletionFn&& onFinish); + asio::io_service::strand& strand() { + return _strand; + } + + asio::ip::tcp::resolver& resolver() { + return _resolver; + } + private: NetworkInterfaceASIO* const _owner; // Information describing a task enqueued on the NetworkInterface @@ -305,8 +313,10 @@ private: Date_t _start; std::unique_ptr _timeoutAlarm; - AtomicUInt64 _canceled; - AtomicUInt64 _timedOut; + asio::ip::tcp::resolver _resolver; + + bool _canceled = false; + bool _timedOut = false; /** * We maintain a shared_ptr to an access control object. This ensures that tangent @@ -322,6 +332,15 @@ private: */ boost::optional _command; bool _inSetup; + + /** + * The explicit strand that all operations for this op must run on. + * This must be the last member of AsyncOp because any pending + * operation for the strand are run when it's dtor is called. Any + * members that fall after it will have already been destroyed, which + * will make those fields illegal to touch from callbacks. + */ + asio::io_service::strand _strand; }; void _startCommand(AsyncOp* op); @@ -371,14 +390,12 @@ private: Options _options; asio::io_service _io_service; - stdx::thread _serviceRunner; + std::vector _serviceRunners; const std::unique_ptr _metadataHook; const std::unique_ptr _hook; - asio::ip::tcp::resolver _resolver; - std::atomic _state; // NOLINT std::unique_ptr _timerFactory; @@ -396,6 +413,15 @@ private: stdx::mutex _executorMutex; bool _isExecutorRunnable; stdx::condition_variable _isExecutorRunnableCondition; + + /** + * The explicit strand that all non-op operations run on. This must be the + * last member of NetworkInterfaceASIO because any pending operation for + * the strand are run when it's dtor is called. Any members that fall after + * it will have already been destroyed, which will make those fields + * illegal to touch from callbacks. + */ + asio::io_service::strand _strand; }; template diff --git a/src/mongo/executor/network_interface_asio_connect.cpp b/src/mongo/executor/network_interface_asio_connect.cpp index 696ee271a63..72d4cba8ac1 100644 --- a/src/mongo/executor/network_interface_asio_connect.cpp +++ b/src/mongo/executor/network_interface_asio_connect.cpp @@ -102,13 +102,13 @@ void NetworkInterfaceASIO::_connect(AsyncOp* op) { _validateAndRun( op, ec, [this, op, endpoints]() { _setupSocket(op, std::move(endpoints)); }); }; - _resolver.async_resolve(query, std::move(thenConnect)); + op->resolver().async_resolve(query, op->_strand.wrap(std::move(thenConnect))); } void NetworkInterfaceASIO::_setupSocket(AsyncOp* op, tcp::resolver::iterator endpoints) { // TODO: Consider moving this call to post-auth so we only assign completed connections. { - auto stream = _streamFactory->makeStream(&_io_service, op->request().target); + auto stream = _streamFactory->makeStream(&op->strand(), op->request().target); op->setConnection({std::move(stream), rpc::supports::kOpQueryOnly}); } diff --git a/src/mongo/executor/network_interface_asio_operation.cpp b/src/mongo/executor/network_interface_asio_operation.cpp index dbce0eb9c73..21929559e5c 100644 --- a/src/mongo/executor/network_interface_asio_operation.cpp +++ b/src/mongo/executor/network_interface_asio_operation.cpp @@ -99,42 +99,39 @@ NetworkInterfaceASIO::AsyncOp::AsyncOp(NetworkInterfaceASIO* const owner, _request(request), _onFinish(onFinish), _start(now), + _resolver(owner->_io_service), _canceled(0), _timedOut(0), _access(std::make_shared()), - _inSetup(true) {} + _inSetup(true), + _strand(owner->_io_service) {} void NetworkInterfaceASIO::AsyncOp::cancel() { LOG(2) << "Canceling operation; original request was: " << request().toString(); - std::shared_ptr access; - std::size_t generation; - { - stdx::lock_guard lk(_access->mutex); - access = _access; - generation = access->id; - } + stdx::lock_guard lk(_access->mutex); + auto access = _access; + auto generation = access->id; // An operation may be in mid-flight when it is canceled, so we cancel any // in-progress async ops but do not complete the operation now. - asio::post(_owner->_io_service, - [this, access, generation] { - // Ensure 'this' pointer is still valid. - stdx::lock_guard lk(access->mutex); - if (generation == access->id) { - _canceled.store(1); - if (_connection) { - _connection->cancel(); - } - } - }); + + _strand.post([this, access, generation] { + stdx::lock_guard lk(access->mutex); + if (generation == access->id) { + _canceled = true; + if (_connection) { + _connection->cancel(); + } + } + }); } bool NetworkInterfaceASIO::AsyncOp::canceled() const { - return (_canceled.load() == 1); + return _canceled; } bool NetworkInterfaceASIO::AsyncOp::timedOut() const { - return (_timedOut.load() == 1); + return _timedOut; } const TaskExecutor::CallbackHandle& NetworkInterfaceASIO::AsyncOp::cbHandle() const { @@ -237,8 +234,8 @@ void NetworkInterfaceASIO::AsyncOp::reset() { // Ditto for _operationProtocol. _start = {}; _timeoutAlarm.reset(); - _canceled.store(0u); - _timedOut.store(0u); + _canceled = false; + _timedOut = false; _command = boost::none; // _inSetup should always be false at this point. } -- cgit v1.2.1