diff options
author | Jason Carey <jcarey@argv.me> | 2015-10-28 14:41:07 -0400 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2015-11-09 18:03:18 -0500 |
commit | c836472353e736424c9bb87868508c9e633b892d (patch) | |
tree | 71b6bd4303e722ea244ac5e3b538ad65ab417f5c | |
parent | e8187cc8f07ac5fccd384430f33457d8a57f0381 (diff) | |
download | mongo-c836472353e736424c9bb87868508c9e633b892d.tar.gz |
SERVER-20143 Strand NetworkInterfaceASIO
Add strands (and an option for multiple io workers) in
NetworkInterfaceASIO.
strands are an asio specific mechanism for ensuring thread safety.
23 files changed, 303 insertions, 246 deletions
diff --git a/src/mongo/executor/async_mock_stream_factory.cpp b/src/mongo/executor/async_mock_stream_factory.cpp index 341615bdbce..da3bd426508 100644 --- a/src/mongo/executor/async_mock_stream_factory.cpp +++ b/src/mongo/executor/async_mock_stream_factory.cpp @@ -67,25 +67,23 @@ StringData stateToString(AsyncMockStreamFactory::MockStream::StreamState state) } template <typename Handler> -void checkCanceled(asio::io_service* io_service, +void checkCanceled(asio::io_service::strand* strand, AsyncMockStreamFactory::MockStream::StreamState* state, Handler&& handler, std::size_t bytes, std::error_code ec = std::error_code()) { auto wasCancelled = (*state == AsyncMockStreamFactory::MockStream::StreamState::kCanceled); *state = AsyncMockStreamFactory::MockStream::StreamState::kRunning; - asio::post(*io_service, - [handler, wasCancelled, bytes, ec] { - handler(wasCancelled ? make_error_code(asio::error::operation_aborted) : ec, - bytes); - }); + strand->post([handler, wasCancelled, bytes, ec] { + handler(wasCancelled ? make_error_code(asio::error::operation_aborted) : ec, bytes); + }); } } // namespace std::unique_ptr<AsyncStreamInterface> AsyncMockStreamFactory::makeStream( - asio::io_service* io_service, const HostAndPort& target) { - return stdx::make_unique<MockStream>(io_service, this, target); + asio::io_service::strand* strand, const HostAndPort& target) { + return stdx::make_unique<MockStream>(strand, this, target); } void AsyncMockStreamFactory::_createStream(const HostAndPort& host, MockStream* stream) { @@ -112,10 +110,10 @@ AsyncMockStreamFactory::MockStream* AsyncMockStreamFactory::blockUntilStreamExis return iter->second; } -AsyncMockStreamFactory::MockStream::MockStream(asio::io_service* io_service, +AsyncMockStreamFactory::MockStream::MockStream(asio::io_service::strand* strand, AsyncMockStreamFactory* factory, const HostAndPort& target) - : _io_service(io_service), _factory(factory), _target(target) { + : _strand(strand), _factory(factory), _target(target) { _factory->_createStream(_target, this); } @@ -133,7 +131,7 @@ void AsyncMockStreamFactory::MockStream::connect(asio::ip::tcp::resolver::iterat // We shim a lambda to give connectHandler the right signature since it doesn't take // a size_t param. checkCanceled( - _io_service, + _strand, &_state, [connectHandler](std::error_code ec, std::size_t) { return connectHandler(ec); }, 0); @@ -152,7 +150,7 @@ void AsyncMockStreamFactory::MockStream::write(asio::const_buffer buf, _defer_inlock(kBlockedAfterWrite, [this, writeHandler, size]() { stdx::unique_lock<stdx::mutex> lk(_mutex); - checkCanceled(_io_service, &_state, std::move(writeHandler), size); + checkCanceled(_strand, &_state, std::move(writeHandler), size); }); } @@ -207,7 +205,7 @@ void AsyncMockStreamFactory::MockStream::read(asio::mutable_buffer buf, }; } - checkCanceled(_io_service, &_state, std::move(handler), nToCopy, _error); + checkCanceled(_strand, &_state, std::move(handler), nToCopy, _error); _error.clear(); }); } @@ -259,7 +257,7 @@ void AsyncMockStreamFactory::MockStream::_unblock_inlock() { } // Post our deferred action to resume state machine execution invariant(_deferredAction); - _io_service->post(std::move(_deferredAction)); + _strand->post(std::move(_deferredAction)); _deferredAction = nullptr; } diff --git a/src/mongo/executor/async_mock_stream_factory.h b/src/mongo/executor/async_mock_stream_factory.h index d7e1c258892..e64ab97e5b8 100644 --- a/src/mongo/executor/async_mock_stream_factory.h +++ b/src/mongo/executor/async_mock_stream_factory.h @@ -54,12 +54,12 @@ class AsyncMockStreamFactory final : public AsyncStreamFactoryInterface { public: AsyncMockStreamFactory() = default; - std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service* io_service, + std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service::strand* strand, const HostAndPort& host) override; class MockStream final : public AsyncStreamInterface { public: - MockStream(asio::io_service* io_service, + MockStream(asio::io_service::strand* strand, AsyncMockStreamFactory* factory, const HostAndPort& target); @@ -103,7 +103,7 @@ public: void _defer_inlock(StreamState state, Action&& handler); void _unblock_inlock(); - asio::io_service* _io_service; + asio::io_service::strand* _strand; AsyncMockStreamFactory* _factory; HostAndPort _target; diff --git a/src/mongo/executor/async_secure_stream.cpp b/src/mongo/executor/async_secure_stream.cpp index 964f805d07d..a3a1acc5fb7 100644 --- a/src/mongo/executor/async_secure_stream.cpp +++ b/src/mongo/executor/async_secure_stream.cpp @@ -43,8 +43,9 @@ namespace mongo { namespace executor { -AsyncSecureStream::AsyncSecureStream(asio::io_service* io_service, asio::ssl::context* sslContext) - : _stream(*io_service, *sslContext) {} +AsyncSecureStream::AsyncSecureStream(asio::io_service::strand* strand, + asio::ssl::context* sslContext) + : _strand(strand), _stream(_strand->get_io_service(), *sslContext) {} AsyncSecureStream::~AsyncSecureStream() { destroyStream(&_stream.lowest_layer(), _connected); @@ -55,33 +56,34 @@ void AsyncSecureStream::connect(const asio::ip::tcp::resolver::iterator endpoint // Stash the connectHandler as we won't be able to call it until we re-enter the state // machine. _userHandler = std::move(connectHandler); - asio::async_connect(_stream.lowest_layer(), - std::move(endpoints), - [this](std::error_code ec, asio::ip::tcp::resolver::iterator iter) { - if (ec) { - return _userHandler(ec); - } - _connected = true; - return _handleConnect(std::move(iter)); - }); + asio::async_connect( + _stream.lowest_layer(), + std::move(endpoints), + _strand->wrap([this](std::error_code ec, asio::ip::tcp::resolver::iterator iter) { + if (ec) { + return _userHandler(ec); + } + _connected = true; + return _handleConnect(std::move(iter)); + })); } void AsyncSecureStream::write(asio::const_buffer buffer, StreamHandler&& streamHandler) { - writeStream(&_stream, _connected, buffer, std::move(streamHandler)); + writeStream(&_stream, _strand, _connected, buffer, std::move(streamHandler)); } void AsyncSecureStream::read(asio::mutable_buffer buffer, StreamHandler&& streamHandler) { - readStream(&_stream, _connected, buffer, std::move(streamHandler)); + readStream(&_stream, _strand, _connected, buffer, std::move(streamHandler)); } void AsyncSecureStream::_handleConnect(asio::ip::tcp::resolver::iterator iter) { _stream.async_handshake(decltype(_stream)::client, - [this, iter](std::error_code ec) { + _strand->wrap([this, iter](std::error_code ec) { if (ec) { return _userHandler(ec); } return _handleHandshake(ec, iter->host_name()); - }); + })); } void AsyncSecureStream::_handleHandshake(std::error_code ec, const std::string& hostName) { diff --git a/src/mongo/executor/async_secure_stream.h b/src/mongo/executor/async_secure_stream.h index 699832f1ffd..cdbb54c8a31 100644 --- a/src/mongo/executor/async_secure_stream.h +++ b/src/mongo/executor/async_secure_stream.h @@ -42,7 +42,7 @@ namespace executor { class AsyncSecureStream final : public AsyncStreamInterface { public: - AsyncSecureStream(asio::io_service* io_service, asio::ssl::context* sslContext); + AsyncSecureStream(asio::io_service::strand* strand, asio::ssl::context* sslContext); ~AsyncSecureStream(); @@ -60,6 +60,7 @@ private: void _handleHandshake(std::error_code ec, const std::string& hostName); + asio::io_service::strand* const _strand; asio::ssl::stream<asio::ip::tcp::socket> _stream; ConnectHandler _userHandler; bool _connected = false; diff --git a/src/mongo/executor/async_secure_stream_factory.cpp b/src/mongo/executor/async_secure_stream_factory.cpp index a933975a981..ce6bd73f2d0 100644 --- a/src/mongo/executor/async_secure_stream_factory.cpp +++ b/src/mongo/executor/async_secure_stream_factory.cpp @@ -52,12 +52,12 @@ AsyncSecureStreamFactory::AsyncSecureStreamFactory(SSLManagerInterface* sslManag } std::unique_ptr<AsyncStreamInterface> AsyncSecureStreamFactory::makeStream( - asio::io_service* io_service, const HostAndPort&) { + asio::io_service::strand* strand, const HostAndPort&) { int sslModeVal = getSSLGlobalParams().sslMode.load(); if (sslModeVal == SSLParams::SSLMode_preferSSL || sslModeVal == SSLParams::SSLMode_requireSSL) { - return stdx::make_unique<AsyncSecureStream>(io_service, &_sslContext); + return stdx::make_unique<AsyncSecureStream>(strand, &_sslContext); } - return stdx::make_unique<AsyncStream>(io_service); + return stdx::make_unique<AsyncStream>(strand); } } // namespace executor diff --git a/src/mongo/executor/async_secure_stream_factory.h b/src/mongo/executor/async_secure_stream_factory.h index ecc03939cda..4d08af71eff 100644 --- a/src/mongo/executor/async_secure_stream_factory.h +++ b/src/mongo/executor/async_secure_stream_factory.h @@ -47,7 +47,7 @@ class AsyncSecureStreamFactory final : public AsyncStreamFactoryInterface { public: AsyncSecureStreamFactory(SSLManagerInterface* sslManager); - std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service* io_service, + std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service::strand* strand, const HostAndPort&) override; private: diff --git a/src/mongo/executor/async_stream.cpp b/src/mongo/executor/async_stream.cpp index 1ac8a47470f..398faaf2b6e 100644 --- a/src/mongo/executor/async_stream.cpp +++ b/src/mongo/executor/async_stream.cpp @@ -40,7 +40,8 @@ namespace executor { using asio::ip::tcp; -AsyncStream::AsyncStream(asio::io_service* io_service) : _stream(*io_service) {} +AsyncStream::AsyncStream(asio::io_service::strand* strand) + : _strand(strand), _stream(_strand->get_io_service()) {} AsyncStream::~AsyncStream() { destroyStream(&_stream, _connected); @@ -52,22 +53,22 @@ void AsyncStream::connect(tcp::resolver::iterator iter, ConnectHandler&& connect std::move(iter), // We need to wrap this with a lambda of the right signature so it compiles, even // if we don't actually use the resolver iterator. - [this, connectHandler](std::error_code ec, tcp::resolver::iterator iter) { + _strand->wrap([this, connectHandler](std::error_code ec, tcp::resolver::iterator iter) { if (!ec) { // We assume that our owner is responsible for keeping us alive until we call // connectHandler, so _connected should always be a valid memory location. _connected = true; } return connectHandler(ec); - }); + })); } void AsyncStream::write(asio::const_buffer buffer, StreamHandler&& streamHandler) { - writeStream(&_stream, _connected, buffer, std::move(streamHandler)); + writeStream(&_stream, _strand, _connected, buffer, std::move(streamHandler)); } void AsyncStream::read(asio::mutable_buffer buffer, StreamHandler&& streamHandler) { - readStream(&_stream, _connected, buffer, std::move(streamHandler)); + readStream(&_stream, _strand, _connected, buffer, std::move(streamHandler)); } void AsyncStream::cancel() { diff --git a/src/mongo/executor/async_stream.h b/src/mongo/executor/async_stream.h index c49014c860f..1e4e6982196 100644 --- a/src/mongo/executor/async_stream.h +++ b/src/mongo/executor/async_stream.h @@ -37,7 +37,7 @@ namespace executor { class AsyncStream final : public AsyncStreamInterface { public: - AsyncStream(asio::io_service* io_service); + AsyncStream(asio::io_service::strand* strand); ~AsyncStream(); @@ -50,6 +50,7 @@ public: void cancel() override; private: + asio::io_service::strand* const _strand; asio::ip::tcp::socket _stream; bool _connected = false; }; diff --git a/src/mongo/executor/async_stream_common.h b/src/mongo/executor/async_stream_common.h index 7b06881551c..ef154c0cc6f 100644 --- a/src/mongo/executor/async_stream_common.h +++ b/src/mongo/executor/async_stream_common.h @@ -51,17 +51,27 @@ void destroyStream(ASIOStream* stream, bool connected) { } template <typename ASIOStream, typename Buffer, typename Handler> -void writeStream(ASIOStream* stream, bool connected, Buffer&& buffer, Handler&& handler) { +void writeStream(ASIOStream* stream, + asio::io_service::strand* strand, + bool connected, + Buffer&& buffer, + Handler&& handler) { invariant(connected); - asio::async_write( - *stream, asio::buffer(std::forward<Buffer>(buffer)), std::forward<Handler>(handler)); + asio::async_write(*stream, + asio::buffer(std::forward<Buffer>(buffer)), + strand->wrap(std::forward<Handler>(handler))); } template <typename ASIOStream, typename Buffer, typename Handler> -void readStream(ASIOStream* stream, bool connected, Buffer&& buffer, Handler&& handler) { +void readStream(ASIOStream* stream, + asio::io_service::strand* strand, + bool connected, + Buffer&& buffer, + Handler&& handler) { invariant(connected); - asio::async_read( - *stream, asio::buffer(std::forward<Buffer>(buffer)), std::forward<Handler>(handler)); + asio::async_read(*stream, + asio::buffer(std::forward<Buffer>(buffer)), + strand->wrap(std::forward<Handler>(handler))); } template <typename ASIOStream> diff --git a/src/mongo/executor/async_stream_factory.cpp b/src/mongo/executor/async_stream_factory.cpp index 8e8fb93b6d7..5f24adf341b 100644 --- a/src/mongo/executor/async_stream_factory.cpp +++ b/src/mongo/executor/async_stream_factory.cpp @@ -38,9 +38,9 @@ namespace mongo { namespace executor { -std::unique_ptr<AsyncStreamInterface> AsyncStreamFactory::makeStream(asio::io_service* io_service, - const HostAndPort&) { - return stdx::make_unique<AsyncStream>(io_service); +std::unique_ptr<AsyncStreamInterface> AsyncStreamFactory::makeStream( + asio::io_service::strand* strand, const HostAndPort&) { + return stdx::make_unique<AsyncStream>(strand); } } // namespace executor diff --git a/src/mongo/executor/async_stream_factory.h b/src/mongo/executor/async_stream_factory.h index 198ea68b877..bbf9767427b 100644 --- a/src/mongo/executor/async_stream_factory.h +++ b/src/mongo/executor/async_stream_factory.h @@ -41,7 +41,7 @@ class AsyncStreamFactory final : public AsyncStreamFactoryInterface { public: AsyncStreamFactory() = default; - std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service* io_service, + std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service::strand* strand, const HostAndPort&) override; }; diff --git a/src/mongo/executor/async_stream_factory_interface.h b/src/mongo/executor/async_stream_factory_interface.h index c2f6261556a..c894a1fee5a 100644 --- a/src/mongo/executor/async_stream_factory_interface.h +++ b/src/mongo/executor/async_stream_factory_interface.h @@ -48,7 +48,7 @@ class AsyncStreamFactoryInterface { public: virtual ~AsyncStreamFactoryInterface() = default; - virtual std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service* io_service, + virtual std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service::strand* strand, const HostAndPort& target) = 0; protected: diff --git a/src/mongo/executor/async_timer_asio.cpp b/src/mongo/executor/async_timer_asio.cpp index bbd1373a0d3..293ab3e137a 100644 --- a/src/mongo/executor/async_timer_asio.cpp +++ b/src/mongo/executor/async_timer_asio.cpp @@ -33,20 +33,20 @@ namespace mongo { namespace executor { -AsyncTimerASIO::AsyncTimerASIO(asio::io_service* service, Milliseconds expiration) - : _timer(*service, expiration) {} +AsyncTimerASIO::AsyncTimerASIO(asio::io_service::strand* strand, Milliseconds expiration) + : _strand(strand), _timer(_strand->get_io_service(), expiration) {} void AsyncTimerASIO::cancel() { _timer.cancel(); } void AsyncTimerASIO::asyncWait(AsyncTimerInterface::Handler handler) { - _timer.async_wait(std::move(handler)); + _timer.async_wait(_strand->wrap(std::move(handler))); } -std::unique_ptr<AsyncTimerInterface> AsyncTimerFactoryASIO::make(asio::io_service* service, +std::unique_ptr<AsyncTimerInterface> AsyncTimerFactoryASIO::make(asio::io_service::strand* strand, Milliseconds expiration) { - return stdx::make_unique<AsyncTimerASIO>(service, expiration); + return stdx::make_unique<AsyncTimerASIO>(strand, expiration); } } // namespace executor diff --git a/src/mongo/executor/async_timer_asio.h b/src/mongo/executor/async_timer_asio.h index ae09b4b9247..eb3ae02e769 100644 --- a/src/mongo/executor/async_timer_asio.h +++ b/src/mongo/executor/async_timer_asio.h @@ -37,13 +37,14 @@ namespace executor { class AsyncTimerASIO final : public AsyncTimerInterface { public: - AsyncTimerASIO(asio::io_service* service, Milliseconds expiration); + AsyncTimerASIO(asio::io_service::strand* strand, Milliseconds expiration); void cancel() override; void asyncWait(AsyncTimerInterface::Handler handler) override; private: + asio::io_service::strand* const _strand; asio::steady_timer _timer; }; @@ -51,7 +52,7 @@ class AsyncTimerFactoryASIO final : public AsyncTimerFactoryInterface { public: AsyncTimerFactoryASIO() = default; - std::unique_ptr<AsyncTimerInterface> make(asio::io_service* service, + std::unique_ptr<AsyncTimerInterface> make(asio::io_service::strand* strand, Milliseconds expiration) override; }; diff --git a/src/mongo/executor/async_timer_interface.h b/src/mongo/executor/async_timer_interface.h index 6a63850a0ef..6f18d4ece7f 100644 --- a/src/mongo/executor/async_timer_interface.h +++ b/src/mongo/executor/async_timer_interface.h @@ -81,7 +81,7 @@ class AsyncTimerFactoryInterface { public: virtual ~AsyncTimerFactoryInterface() = default; - virtual std::unique_ptr<AsyncTimerInterface> make(asio::io_service* io_service, + virtual std::unique_ptr<AsyncTimerInterface> make(asio::io_service::strand* strand, Milliseconds expiration) = 0; protected: diff --git a/src/mongo/executor/async_timer_mock.cpp b/src/mongo/executor/async_timer_mock.cpp index 7460e840993..9807947dcf4 100644 --- a/src/mongo/executor/async_timer_mock.cpp +++ b/src/mongo/executor/async_timer_mock.cpp @@ -94,7 +94,7 @@ std::unique_ptr<AsyncTimerInterface> AsyncTimerFactoryMock::make(Milliseconds ex return make(nullptr, expiration); } -std::unique_ptr<AsyncTimerInterface> AsyncTimerFactoryMock::make(asio::io_service* io_service, +std::unique_ptr<AsyncTimerInterface> AsyncTimerFactoryMock::make(asio::io_service::strand* strand, Milliseconds expiration) { stdx::lock_guard<stdx::mutex> lk(_timersMutex); auto elem = _timers.emplace(std::make_shared<AsyncTimerMockImpl>(expiration)); diff --git a/src/mongo/executor/async_timer_mock.h b/src/mongo/executor/async_timer_mock.h index a20bf84a6ff..6c0e4c755f2 100644 --- a/src/mongo/executor/async_timer_mock.h +++ b/src/mongo/executor/async_timer_mock.h @@ -119,7 +119,7 @@ public: /** * Create and return a new AsyncTimerMock object. */ - std::unique_ptr<AsyncTimerInterface> make(asio::io_service* io_service, + std::unique_ptr<AsyncTimerInterface> make(asio::io_service::strand* strand, Milliseconds expiration) override; /** diff --git a/src/mongo/executor/connection_pool_asio.cpp b/src/mongo/executor/connection_pool_asio.cpp index 39d70cd6c4e..12e6d0bd062 100644 --- a/src/mongo/executor/connection_pool_asio.cpp +++ b/src/mongo/executor/connection_pool_asio.cpp @@ -42,9 +42,9 @@ namespace mongo { namespace executor { namespace connection_pool_asio { -ASIOTimer::ASIOTimer(asio::io_service* io_service) - : _io_service(io_service), - _impl(*io_service), +ASIOTimer::ASIOTimer(asio::io_service::strand* strand) + : _strand(strand), + _impl(strand->get_io_service()), _callbackSharedState(std::make_shared<CallbackSharedState>()) {} ASIOTimer::~ASIOTimer() { @@ -52,55 +52,60 @@ ASIOTimer::~ASIOTimer() { } void ASIOTimer::setTimeout(Milliseconds timeout, TimeoutCallback cb) { - _cb = std::move(cb); + _strand->dispatch([this, timeout, cb] { + _cb = std::move(cb); - cancelTimeout(); - _impl.expires_after(timeout); - - decltype(_callbackSharedState->id) id; - decltype(_callbackSharedState) sharedState; + cancelTimeout(); + _impl.expires_after(timeout); - { - stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex); - id = ++_callbackSharedState->id; - sharedState = _callbackSharedState; - } + decltype(_callbackSharedState->id) id; + decltype(_callbackSharedState) sharedState; - _impl.async_wait([this, id, sharedState](const asio::error_code& error) { - if (error == asio::error::operation_aborted) { - return; + { + stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex); + id = ++_callbackSharedState->id; + sharedState = _callbackSharedState; } - stdx::unique_lock<stdx::mutex> lk(sharedState->mutex); - - // If the id in shared state doesn't match the id in our callback, it - // means we were cancelled, but still executed. This can occur if we - // were cancelled just before our timeout. We need a generation, rather - // than just a bool here, because we could have been cancelled and - // another callback set, in which case we shouldn't run and the we - // should let the other callback execute instead. - if (sharedState->id == id) { - auto cb = std::move(_cb); - lk.unlock(); - cb(); - } + _impl.async_wait(_strand->wrap([this, id, sharedState](const asio::error_code& error) { + if (error == asio::error::operation_aborted) { + return; + } + + stdx::unique_lock<stdx::mutex> lk(sharedState->mutex); + + // If the id in shared state doesn't match the id in our callback, it + // means we were cancelled, but still executed. This can occur if we + // were cancelled just before our timeout. We need a generation, rather + // than just a bool here, because we could have been cancelled and + // another callback set, in which case we shouldn't run and the we + // should let the other callback execute instead. + if (sharedState->id == id) { + auto cb = std::move(_cb); + lk.unlock(); + cb(); + } + })); }); } void ASIOTimer::cancelTimeout() { - { - stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex); - _callbackSharedState->id++; - } - _impl.cancel(); + _strand->dispatch([this] { + { + stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex); + _callbackSharedState->id++; + } + + _impl.cancel(); + }); } ASIOConnection::ASIOConnection(const HostAndPort& hostAndPort, size_t generation, ASIOImpl* global) : _global(global), - _timer(&global->_impl->_io_service), _hostAndPort(hostAndPort), _generation(generation), - _impl(makeAsyncOp(this)) {} + _impl(makeAsyncOp(this)), + _timer(&_impl->strand()) {} void ASIOConnection::indicateSuccess() { indicateUsed(); @@ -166,9 +171,11 @@ void ASIOConnection::cancelTimeout() { } void ASIOConnection::setup(Milliseconds timeout, SetupCallback cb) { - _setupCallback = std::move(cb); + _impl->strand().dispatch([this, timeout, cb] { + _setupCallback = std::move(cb); - _global->_impl->_connect(_impl.get()); + _global->_impl->_connect(_impl.get()); + }); } void ASIOConnection::resetToUnknown() { @@ -176,48 +183,48 @@ void ASIOConnection::resetToUnknown() { } void ASIOConnection::refresh(Milliseconds timeout, RefreshCallback cb) { - auto op = _impl.get(); - - _refreshCallback = std::move(cb); - - // Actually timeout refreshes - setTimeout(timeout, - [this]() { - asio::post(_global->_impl->_io_service, - [this] { _impl->connection().stream().cancel(); }); - }); - - // Our pings are isMaster's - auto beginStatus = op->beginCommand(makeIsMasterRequest(this), - NetworkInterfaceASIO::AsyncCommand::CommandType::kRPC, - _hostAndPort); - if (!beginStatus.isOK()) { - auto cb = std::move(_refreshCallback); - cb(this, beginStatus); - return; - } - - // If we fail during refresh, the _onFinish function of the AsyncOp will get called. As such we - // need to intercept those calls so we can capture them. This will get cleared out when we fill - // in the real onFinish in startCommand. - op->setOnFinish([this](StatusWith<RemoteCommandResponse> failedResponse) { - invariant(!failedResponse.isOK()); - auto cb = std::move(_refreshCallback); - cb(this, failedResponse.getStatus()); - }); + _impl->strand().dispatch([this, timeout, cb] { + auto op = _impl.get(); - _global->_impl->_asyncRunCommand( - op, - [this, op](std::error_code ec, size_t bytes) { - cancelTimeout(); + _refreshCallback = std::move(cb); - auto cb = std::move(_refreshCallback); + // Actually timeout refreshes + setTimeout(timeout, [this]() { _impl->connection().stream().cancel(); }); - if (ec) - return cb(this, Status(ErrorCodes::HostUnreachable, ec.message())); + // Our pings are isMaster's + auto beginStatus = op->beginCommand(makeIsMasterRequest(this), + NetworkInterfaceASIO::AsyncCommand::CommandType::kRPC, + _hostAndPort); + if (!beginStatus.isOK()) { + auto cb = std::move(_refreshCallback); + cb(this, beginStatus); + return; + } - cb(this, Status::OK()); + // If we fail during refresh, the _onFinish function of the AsyncOp will get called. As such + // we + // need to intercept those calls so we can capture them. This will get cleared out when we + // fill + // in the real onFinish in startCommand. + op->setOnFinish([this](StatusWith<RemoteCommandResponse> failedResponse) { + invariant(!failedResponse.isOK()); + auto cb = std::move(_refreshCallback); + cb(this, failedResponse.getStatus()); }); + + _global->_impl->_asyncRunCommand( + op, + [this, op](std::error_code ec, size_t bytes) { + cancelTimeout(); + + auto cb = std::move(_refreshCallback); + + if (ec) + return cb(this, Status(ErrorCodes::HostUnreachable, ec.message())); + + cb(this, Status::OK()); + }); + }); } std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::releaseAsyncOp() { @@ -235,7 +242,7 @@ Date_t ASIOImpl::now() { } std::unique_ptr<ConnectionPool::TimerInterface> ASIOImpl::makeTimer() { - return stdx::make_unique<ASIOTimer>(&_impl->_io_service); + return stdx::make_unique<ASIOTimer>(&_impl->_strand); } std::unique_ptr<ConnectionPool::ConnectionInterface> ASIOImpl::makeConnection( diff --git a/src/mongo/executor/connection_pool_asio.h b/src/mongo/executor/connection_pool_asio.h index 6700819c63d..44235d23e16 100644 --- a/src/mongo/executor/connection_pool_asio.h +++ b/src/mongo/executor/connection_pool_asio.h @@ -44,7 +44,7 @@ namespace connection_pool_asio { */ class ASIOTimer final : public ConnectionPool::TimerInterface { public: - ASIOTimer(asio::io_service* service); + ASIOTimer(asio::io_service::strand* strand); ~ASIOTimer(); void setTimeout(Milliseconds timeout, TimeoutCallback cb) override; @@ -57,7 +57,7 @@ private: }; TimeoutCallback _cb; - asio::io_service* const _io_service; + asio::io_service::strand* const _strand; asio::steady_timer _impl; std::shared_ptr<CallbackSharedState> _callbackSharedState; }; @@ -99,12 +99,12 @@ private: SetupCallback _setupCallback; RefreshCallback _refreshCallback; ASIOImpl* const _global; - ASIOTimer _timer; Date_t _lastUsed; Status _status = ConnectionPool::kConnectionStateUnknown; HostAndPort _hostAndPort; size_t _generation; std::unique_ptr<NetworkInterfaceASIO::AsyncOp> _impl; + ASIOTimer _timer; }; /** diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp index 5f554b4fa94..489246d217b 100644 --- a/src/mongo/executor/network_interface_asio.cpp +++ b/src/mongo/executor/network_interface_asio.cpp @@ -52,6 +52,10 @@ namespace mongo { namespace executor { +namespace { +const std::size_t kIOServiceWorkers = 1; +} // namespace + #if defined(_MSC_VER) && _MSC_VER < 1900 NetworkInterfaceASIO::Options::Options(Options&& other) : connectionPoolOptions(std::move(other.connectionPoolOptions)), @@ -75,13 +79,13 @@ NetworkInterfaceASIO::NetworkInterfaceASIO(Options options) _io_service(), _metadataHook(std::move(_options.metadataHook)), _hook(std::move(_options.networkConnectionHook)), - _resolver(_io_service), _state(State::kReady), _timerFactory(std::move(_options.timerFactory)), _streamFactory(std::move(_options.streamFactory)), _connectionPool(stdx::make_unique<connection_pool_asio::ASIOImpl>(this), _options.connectionPoolOptions), - _isExecutorRunnable(false) {} + _isExecutorRunnable(false), + _strand(_io_service) {} std::string NetworkInterfaceASIO::getDiagnosticString() { str::stream output; @@ -99,25 +103,31 @@ std::string NetworkInterfaceASIO::getHostName() { } void NetworkInterfaceASIO::startup() { - _serviceRunner = stdx::thread([this]() { - setThreadName("NetworkInterfaceASIO"); - try { - LOG(2) << "The NetworkInterfaceASIO worker thread is spinning up"; - asio::io_service::work work(_io_service); - _io_service.run(); - } catch (...) { - severe() << "Uncaught exception in NetworkInterfaceASIO IO worker thread of type: " - << exceptionToStatus(); - fassertFailed(28820); - } - }); + std::generate_n(std::back_inserter(_serviceRunners), + kIOServiceWorkers, + [&] { + return stdx::thread([this]() { + setThreadName("NetworkInterfaceASIO"); + try { + LOG(2) << "The NetworkInterfaceASIO worker thread is spinning up"; + asio::io_service::work work(_io_service); + _io_service.run(); + } catch (...) { + severe() << "Uncaught exception in NetworkInterfaceASIO IO " + "worker thread of type: " << exceptionToStatus(); + fassertFailed(28820); + } + }); + }); _state.store(State::kRunning); } void NetworkInterfaceASIO::shutdown() { _state.store(State::kShutdown); _io_service.stop(); - _serviceRunner.join(); + for (auto&& worker : _serviceRunners) { + worker.join(); + } LOG(2) << "NetworkInterfaceASIO shutdown successfully"; } @@ -197,35 +207,35 @@ void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHa AsyncOp* op = nullptr; - { - stdx::unique_lock<stdx::mutex> lk(_inProgressMutex); + stdx::unique_lock<stdx::mutex> lk(_inProgressMutex); - const auto eraseCount = _inGetConnection.erase(cbHandle); + const auto eraseCount = _inGetConnection.erase(cbHandle); - // If we didn't find the request, we've been canceled - if (eraseCount == 0) { - lk.unlock(); + // If we didn't find the request, we've been canceled + if (eraseCount == 0) { + lk.unlock(); - onFinish({ErrorCodes::CallbackCanceled, "Callback canceled"}); + onFinish({ErrorCodes::CallbackCanceled, "Callback canceled"}); - // Though we were canceled, we know that the stream is fine, so indicate success. - conn->indicateSuccess(); + // Though we were canceled, we know that the stream is fine, so indicate success. + conn->indicateSuccess(); - signalWorkAvailable(); + signalWorkAvailable(); - return; - } + return; + } - // We can't release the AsyncOp until we know we were not canceled. - auto ownedOp = conn->releaseAsyncOp(); - op = ownedOp.get(); + // We can't release the AsyncOp until we know we were not canceled. + auto ownedOp = conn->releaseAsyncOp(); + op = ownedOp.get(); - // Sanity check that we are getting a clean AsyncOp. - invariant(!op->canceled()); - invariant(!op->timedOut()); + // Sanity check that we are getting a clean AsyncOp. + invariant(!op->canceled()); + invariant(!op->timedOut()); - _inProgress.emplace(op, std::move(ownedOp)); - } + // Now that we're inProgress, an external cancel can touch our op, but + // not until we release the inProgressMutex. + _inProgress.emplace(op, std::move(ownedOp)); op->_cbHandle = std::move(cbHandle); op->_request = std::move(request); @@ -233,50 +243,53 @@ void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHa op->_connectionPoolHandle = std::move(swConn.getValue()); op->_start = startTime; - // Set timeout now that we have the correct request object - if (op->_request.timeout != RemoteCommandRequest::kNoTimeout) { - op->_timeoutAlarm = op->_owner->_timerFactory->make(&_io_service, op->_request.timeout); - - std::shared_ptr<AsyncOp::AccessControl> access; - std::size_t generation; - { - stdx::lock_guard<stdx::mutex> lk(op->_access->mutex); - access = op->_access; - generation = access->id; - } - - op->_timeoutAlarm->asyncWait([this, op, access, generation](std::error_code ec) { - if (!ec) { - // We must pass a check for safe access before using op inside the - // callback or we may attempt access on an invalid pointer. - stdx::lock_guard<stdx::mutex> lk(access->mutex); - if (generation != access->id) { - // The operation has been cleaned up, do not access. - return; - } - - LOG(2) << "Operation timed out: " << op->request().toString(); + // This ditches the lock and gets us onto the strand (so we're + // threadsafe) + op->_strand.post([this, op] { + // Set timeout now that we have the correct request object + if (op->_request.timeout != RemoteCommandRequest::kNoTimeout) { + op->_timeoutAlarm = + op->_owner->_timerFactory->make(&op->_strand, op->_request.timeout); + + std::shared_ptr<AsyncOp::AccessControl> access; + std::size_t generation; + { + stdx::lock_guard<stdx::mutex> lk(op->_access->mutex); + access = op->_access; + generation = access->id; + } - // An operation may be in mid-flight when it times out, so we - // cancel any in-progress async calls but do not complete the operation now. - if (op->_connection) { - op->_connection->cancel(); + op->_timeoutAlarm->asyncWait([this, op, access, generation](std::error_code ec) { + if (!ec) { + // We must pass a check for safe access before using op inside the + // callback or we may attempt access on an invalid pointer. + stdx::lock_guard<stdx::mutex> lk(access->mutex); + if (generation != access->id) { + // The operation has been cleaned up, do not access. + return; + } + + LOG(2) << "Operation timed out: " << op->request().toString(); + + // An operation may be in mid-flight when it times out, so we + // cancel any in-progress async calls but do not complete the operation now. + op->_timedOut = 1; + if (op->_connection) { + op->_connection->cancel(); + } + } else { + LOG(4) << "failed to time operation out: " << ec.message(); } - op->_timedOut.store(1); - } else { - LOG(4) << "failed to time operation out: " << ec.message(); - } - }); - } + }); + } - _beginCommunication(op); + _beginCommunication(op); + }); }; // TODO: thread some higher level timeout through, rather than 5 minutes, // once we make timeouts pervasive in this api. - asio::post( - _io_service, - [this, request, nextStep] { _connectionPool.get(request.target, Minutes(5), nextStep); }); + _connectionPool.get(request.target, Minutes(5), nextStep); } void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) { diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h index 74309aaf465..d7c73bc715e 100644 --- a/src/mongo/executor/network_interface_asio.h +++ b/src/mongo/executor/network_interface_asio.h @@ -278,6 +278,14 @@ private: void setOnFinish(RemoteCommandCompletionFn&& onFinish); + asio::io_service::strand& strand() { + return _strand; + } + + asio::ip::tcp::resolver& resolver() { + return _resolver; + } + private: NetworkInterfaceASIO* const _owner; // Information describing a task enqueued on the NetworkInterface @@ -305,8 +313,10 @@ private: Date_t _start; std::unique_ptr<AsyncTimerInterface> _timeoutAlarm; - AtomicUInt64 _canceled; - AtomicUInt64 _timedOut; + asio::ip::tcp::resolver _resolver; + + bool _canceled = false; + bool _timedOut = false; /** * We maintain a shared_ptr to an access control object. This ensures that tangent @@ -322,6 +332,15 @@ private: */ boost::optional<AsyncCommand> _command; bool _inSetup; + + /** + * The explicit strand that all operations for this op must run on. + * This must be the last member of AsyncOp because any pending + * operation for the strand are run when it's dtor is called. Any + * members that fall after it will have already been destroyed, which + * will make those fields illegal to touch from callbacks. + */ + asio::io_service::strand _strand; }; void _startCommand(AsyncOp* op); @@ -371,14 +390,12 @@ private: Options _options; asio::io_service _io_service; - stdx::thread _serviceRunner; + std::vector<stdx::thread> _serviceRunners; const std::unique_ptr<rpc::EgressMetadataHook> _metadataHook; const std::unique_ptr<NetworkConnectionHook> _hook; - asio::ip::tcp::resolver _resolver; - std::atomic<State> _state; // NOLINT std::unique_ptr<AsyncTimerFactoryInterface> _timerFactory; @@ -396,6 +413,15 @@ private: stdx::mutex _executorMutex; bool _isExecutorRunnable; stdx::condition_variable _isExecutorRunnableCondition; + + /** + * The explicit strand that all non-op operations run on. This must be the + * last member of NetworkInterfaceASIO because any pending operation for + * the strand are run when it's dtor is called. Any members that fall after + * it will have already been destroyed, which will make those fields + * illegal to touch from callbacks. + */ + asio::io_service::strand _strand; }; template <typename T, typename R, typename... MethodArgs, typename... DeducedArgs> diff --git a/src/mongo/executor/network_interface_asio_connect.cpp b/src/mongo/executor/network_interface_asio_connect.cpp index 696ee271a63..72d4cba8ac1 100644 --- a/src/mongo/executor/network_interface_asio_connect.cpp +++ b/src/mongo/executor/network_interface_asio_connect.cpp @@ -102,13 +102,13 @@ void NetworkInterfaceASIO::_connect(AsyncOp* op) { _validateAndRun( op, ec, [this, op, endpoints]() { _setupSocket(op, std::move(endpoints)); }); }; - _resolver.async_resolve(query, std::move(thenConnect)); + op->resolver().async_resolve(query, op->_strand.wrap(std::move(thenConnect))); } void NetworkInterfaceASIO::_setupSocket(AsyncOp* op, tcp::resolver::iterator endpoints) { // TODO: Consider moving this call to post-auth so we only assign completed connections. { - auto stream = _streamFactory->makeStream(&_io_service, op->request().target); + auto stream = _streamFactory->makeStream(&op->strand(), op->request().target); op->setConnection({std::move(stream), rpc::supports::kOpQueryOnly}); } diff --git a/src/mongo/executor/network_interface_asio_operation.cpp b/src/mongo/executor/network_interface_asio_operation.cpp index dbce0eb9c73..21929559e5c 100644 --- a/src/mongo/executor/network_interface_asio_operation.cpp +++ b/src/mongo/executor/network_interface_asio_operation.cpp @@ -99,42 +99,39 @@ NetworkInterfaceASIO::AsyncOp::AsyncOp(NetworkInterfaceASIO* const owner, _request(request), _onFinish(onFinish), _start(now), + _resolver(owner->_io_service), _canceled(0), _timedOut(0), _access(std::make_shared<AsyncOp::AccessControl>()), - _inSetup(true) {} + _inSetup(true), + _strand(owner->_io_service) {} void NetworkInterfaceASIO::AsyncOp::cancel() { LOG(2) << "Canceling operation; original request was: " << request().toString(); - std::shared_ptr<AsyncOp::AccessControl> access; - std::size_t generation; - { - stdx::lock_guard<stdx::mutex> lk(_access->mutex); - access = _access; - generation = access->id; - } + stdx::lock_guard<stdx::mutex> lk(_access->mutex); + auto access = _access; + auto generation = access->id; // An operation may be in mid-flight when it is canceled, so we cancel any // in-progress async ops but do not complete the operation now. - asio::post(_owner->_io_service, - [this, access, generation] { - // Ensure 'this' pointer is still valid. - stdx::lock_guard<stdx::mutex> lk(access->mutex); - if (generation == access->id) { - _canceled.store(1); - if (_connection) { - _connection->cancel(); - } - } - }); + + _strand.post([this, access, generation] { + stdx::lock_guard<stdx::mutex> lk(access->mutex); + if (generation == access->id) { + _canceled = true; + if (_connection) { + _connection->cancel(); + } + } + }); } bool NetworkInterfaceASIO::AsyncOp::canceled() const { - return (_canceled.load() == 1); + return _canceled; } bool NetworkInterfaceASIO::AsyncOp::timedOut() const { - return (_timedOut.load() == 1); + return _timedOut; } const TaskExecutor::CallbackHandle& NetworkInterfaceASIO::AsyncOp::cbHandle() const { @@ -237,8 +234,8 @@ void NetworkInterfaceASIO::AsyncOp::reset() { // Ditto for _operationProtocol. _start = {}; _timeoutAlarm.reset(); - _canceled.store(0u); - _timedOut.store(0u); + _canceled = false; + _timedOut = false; _command = boost::none; // _inSetup should always be false at this point. } |