summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2015-10-28 14:41:07 -0400
committerJason Carey <jcarey@argv.me>2015-11-09 18:03:18 -0500
commitc836472353e736424c9bb87868508c9e633b892d (patch)
tree71b6bd4303e722ea244ac5e3b538ad65ab417f5c
parente8187cc8f07ac5fccd384430f33457d8a57f0381 (diff)
downloadmongo-c836472353e736424c9bb87868508c9e633b892d.tar.gz
SERVER-20143 Strand NetworkInterfaceASIO
Add strands (and an option for multiple io workers) in NetworkInterfaceASIO. strands are an asio specific mechanism for ensuring thread safety.
-rw-r--r--src/mongo/executor/async_mock_stream_factory.cpp26
-rw-r--r--src/mongo/executor/async_mock_stream_factory.h6
-rw-r--r--src/mongo/executor/async_secure_stream.cpp32
-rw-r--r--src/mongo/executor/async_secure_stream.h3
-rw-r--r--src/mongo/executor/async_secure_stream_factory.cpp6
-rw-r--r--src/mongo/executor/async_secure_stream_factory.h2
-rw-r--r--src/mongo/executor/async_stream.cpp11
-rw-r--r--src/mongo/executor/async_stream.h3
-rw-r--r--src/mongo/executor/async_stream_common.h22
-rw-r--r--src/mongo/executor/async_stream_factory.cpp6
-rw-r--r--src/mongo/executor/async_stream_factory.h2
-rw-r--r--src/mongo/executor/async_stream_factory_interface.h2
-rw-r--r--src/mongo/executor/async_timer_asio.cpp10
-rw-r--r--src/mongo/executor/async_timer_asio.h5
-rw-r--r--src/mongo/executor/async_timer_interface.h2
-rw-r--r--src/mongo/executor/async_timer_mock.cpp2
-rw-r--r--src/mongo/executor/async_timer_mock.h2
-rw-r--r--src/mongo/executor/connection_pool_asio.cpp161
-rw-r--r--src/mongo/executor/connection_pool_asio.h6
-rw-r--r--src/mongo/executor/network_interface_asio.cpp157
-rw-r--r--src/mongo/executor/network_interface_asio.h36
-rw-r--r--src/mongo/executor/network_interface_asio_connect.cpp4
-rw-r--r--src/mongo/executor/network_interface_asio_operation.cpp43
23 files changed, 303 insertions, 246 deletions
diff --git a/src/mongo/executor/async_mock_stream_factory.cpp b/src/mongo/executor/async_mock_stream_factory.cpp
index 341615bdbce..da3bd426508 100644
--- a/src/mongo/executor/async_mock_stream_factory.cpp
+++ b/src/mongo/executor/async_mock_stream_factory.cpp
@@ -67,25 +67,23 @@ StringData stateToString(AsyncMockStreamFactory::MockStream::StreamState state)
}
template <typename Handler>
-void checkCanceled(asio::io_service* io_service,
+void checkCanceled(asio::io_service::strand* strand,
AsyncMockStreamFactory::MockStream::StreamState* state,
Handler&& handler,
std::size_t bytes,
std::error_code ec = std::error_code()) {
auto wasCancelled = (*state == AsyncMockStreamFactory::MockStream::StreamState::kCanceled);
*state = AsyncMockStreamFactory::MockStream::StreamState::kRunning;
- asio::post(*io_service,
- [handler, wasCancelled, bytes, ec] {
- handler(wasCancelled ? make_error_code(asio::error::operation_aborted) : ec,
- bytes);
- });
+ strand->post([handler, wasCancelled, bytes, ec] {
+ handler(wasCancelled ? make_error_code(asio::error::operation_aborted) : ec, bytes);
+ });
}
} // namespace
std::unique_ptr<AsyncStreamInterface> AsyncMockStreamFactory::makeStream(
- asio::io_service* io_service, const HostAndPort& target) {
- return stdx::make_unique<MockStream>(io_service, this, target);
+ asio::io_service::strand* strand, const HostAndPort& target) {
+ return stdx::make_unique<MockStream>(strand, this, target);
}
void AsyncMockStreamFactory::_createStream(const HostAndPort& host, MockStream* stream) {
@@ -112,10 +110,10 @@ AsyncMockStreamFactory::MockStream* AsyncMockStreamFactory::blockUntilStreamExis
return iter->second;
}
-AsyncMockStreamFactory::MockStream::MockStream(asio::io_service* io_service,
+AsyncMockStreamFactory::MockStream::MockStream(asio::io_service::strand* strand,
AsyncMockStreamFactory* factory,
const HostAndPort& target)
- : _io_service(io_service), _factory(factory), _target(target) {
+ : _strand(strand), _factory(factory), _target(target) {
_factory->_createStream(_target, this);
}
@@ -133,7 +131,7 @@ void AsyncMockStreamFactory::MockStream::connect(asio::ip::tcp::resolver::iterat
// We shim a lambda to give connectHandler the right signature since it doesn't take
// a size_t param.
checkCanceled(
- _io_service,
+ _strand,
&_state,
[connectHandler](std::error_code ec, std::size_t) { return connectHandler(ec); },
0);
@@ -152,7 +150,7 @@ void AsyncMockStreamFactory::MockStream::write(asio::const_buffer buf,
_defer_inlock(kBlockedAfterWrite,
[this, writeHandler, size]() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- checkCanceled(_io_service, &_state, std::move(writeHandler), size);
+ checkCanceled(_strand, &_state, std::move(writeHandler), size);
});
}
@@ -207,7 +205,7 @@ void AsyncMockStreamFactory::MockStream::read(asio::mutable_buffer buf,
};
}
- checkCanceled(_io_service, &_state, std::move(handler), nToCopy, _error);
+ checkCanceled(_strand, &_state, std::move(handler), nToCopy, _error);
_error.clear();
});
}
@@ -259,7 +257,7 @@ void AsyncMockStreamFactory::MockStream::_unblock_inlock() {
}
// Post our deferred action to resume state machine execution
invariant(_deferredAction);
- _io_service->post(std::move(_deferredAction));
+ _strand->post(std::move(_deferredAction));
_deferredAction = nullptr;
}
diff --git a/src/mongo/executor/async_mock_stream_factory.h b/src/mongo/executor/async_mock_stream_factory.h
index d7e1c258892..e64ab97e5b8 100644
--- a/src/mongo/executor/async_mock_stream_factory.h
+++ b/src/mongo/executor/async_mock_stream_factory.h
@@ -54,12 +54,12 @@ class AsyncMockStreamFactory final : public AsyncStreamFactoryInterface {
public:
AsyncMockStreamFactory() = default;
- std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service* io_service,
+ std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service::strand* strand,
const HostAndPort& host) override;
class MockStream final : public AsyncStreamInterface {
public:
- MockStream(asio::io_service* io_service,
+ MockStream(asio::io_service::strand* strand,
AsyncMockStreamFactory* factory,
const HostAndPort& target);
@@ -103,7 +103,7 @@ public:
void _defer_inlock(StreamState state, Action&& handler);
void _unblock_inlock();
- asio::io_service* _io_service;
+ asio::io_service::strand* _strand;
AsyncMockStreamFactory* _factory;
HostAndPort _target;
diff --git a/src/mongo/executor/async_secure_stream.cpp b/src/mongo/executor/async_secure_stream.cpp
index 964f805d07d..a3a1acc5fb7 100644
--- a/src/mongo/executor/async_secure_stream.cpp
+++ b/src/mongo/executor/async_secure_stream.cpp
@@ -43,8 +43,9 @@
namespace mongo {
namespace executor {
-AsyncSecureStream::AsyncSecureStream(asio::io_service* io_service, asio::ssl::context* sslContext)
- : _stream(*io_service, *sslContext) {}
+AsyncSecureStream::AsyncSecureStream(asio::io_service::strand* strand,
+ asio::ssl::context* sslContext)
+ : _strand(strand), _stream(_strand->get_io_service(), *sslContext) {}
AsyncSecureStream::~AsyncSecureStream() {
destroyStream(&_stream.lowest_layer(), _connected);
@@ -55,33 +56,34 @@ void AsyncSecureStream::connect(const asio::ip::tcp::resolver::iterator endpoint
// Stash the connectHandler as we won't be able to call it until we re-enter the state
// machine.
_userHandler = std::move(connectHandler);
- asio::async_connect(_stream.lowest_layer(),
- std::move(endpoints),
- [this](std::error_code ec, asio::ip::tcp::resolver::iterator iter) {
- if (ec) {
- return _userHandler(ec);
- }
- _connected = true;
- return _handleConnect(std::move(iter));
- });
+ asio::async_connect(
+ _stream.lowest_layer(),
+ std::move(endpoints),
+ _strand->wrap([this](std::error_code ec, asio::ip::tcp::resolver::iterator iter) {
+ if (ec) {
+ return _userHandler(ec);
+ }
+ _connected = true;
+ return _handleConnect(std::move(iter));
+ }));
}
void AsyncSecureStream::write(asio::const_buffer buffer, StreamHandler&& streamHandler) {
- writeStream(&_stream, _connected, buffer, std::move(streamHandler));
+ writeStream(&_stream, _strand, _connected, buffer, std::move(streamHandler));
}
void AsyncSecureStream::read(asio::mutable_buffer buffer, StreamHandler&& streamHandler) {
- readStream(&_stream, _connected, buffer, std::move(streamHandler));
+ readStream(&_stream, _strand, _connected, buffer, std::move(streamHandler));
}
void AsyncSecureStream::_handleConnect(asio::ip::tcp::resolver::iterator iter) {
_stream.async_handshake(decltype(_stream)::client,
- [this, iter](std::error_code ec) {
+ _strand->wrap([this, iter](std::error_code ec) {
if (ec) {
return _userHandler(ec);
}
return _handleHandshake(ec, iter->host_name());
- });
+ }));
}
void AsyncSecureStream::_handleHandshake(std::error_code ec, const std::string& hostName) {
diff --git a/src/mongo/executor/async_secure_stream.h b/src/mongo/executor/async_secure_stream.h
index 699832f1ffd..cdbb54c8a31 100644
--- a/src/mongo/executor/async_secure_stream.h
+++ b/src/mongo/executor/async_secure_stream.h
@@ -42,7 +42,7 @@ namespace executor {
class AsyncSecureStream final : public AsyncStreamInterface {
public:
- AsyncSecureStream(asio::io_service* io_service, asio::ssl::context* sslContext);
+ AsyncSecureStream(asio::io_service::strand* strand, asio::ssl::context* sslContext);
~AsyncSecureStream();
@@ -60,6 +60,7 @@ private:
void _handleHandshake(std::error_code ec, const std::string& hostName);
+ asio::io_service::strand* const _strand;
asio::ssl::stream<asio::ip::tcp::socket> _stream;
ConnectHandler _userHandler;
bool _connected = false;
diff --git a/src/mongo/executor/async_secure_stream_factory.cpp b/src/mongo/executor/async_secure_stream_factory.cpp
index a933975a981..ce6bd73f2d0 100644
--- a/src/mongo/executor/async_secure_stream_factory.cpp
+++ b/src/mongo/executor/async_secure_stream_factory.cpp
@@ -52,12 +52,12 @@ AsyncSecureStreamFactory::AsyncSecureStreamFactory(SSLManagerInterface* sslManag
}
std::unique_ptr<AsyncStreamInterface> AsyncSecureStreamFactory::makeStream(
- asio::io_service* io_service, const HostAndPort&) {
+ asio::io_service::strand* strand, const HostAndPort&) {
int sslModeVal = getSSLGlobalParams().sslMode.load();
if (sslModeVal == SSLParams::SSLMode_preferSSL || sslModeVal == SSLParams::SSLMode_requireSSL) {
- return stdx::make_unique<AsyncSecureStream>(io_service, &_sslContext);
+ return stdx::make_unique<AsyncSecureStream>(strand, &_sslContext);
}
- return stdx::make_unique<AsyncStream>(io_service);
+ return stdx::make_unique<AsyncStream>(strand);
}
} // namespace executor
diff --git a/src/mongo/executor/async_secure_stream_factory.h b/src/mongo/executor/async_secure_stream_factory.h
index ecc03939cda..4d08af71eff 100644
--- a/src/mongo/executor/async_secure_stream_factory.h
+++ b/src/mongo/executor/async_secure_stream_factory.h
@@ -47,7 +47,7 @@ class AsyncSecureStreamFactory final : public AsyncStreamFactoryInterface {
public:
AsyncSecureStreamFactory(SSLManagerInterface* sslManager);
- std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service* io_service,
+ std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service::strand* strand,
const HostAndPort&) override;
private:
diff --git a/src/mongo/executor/async_stream.cpp b/src/mongo/executor/async_stream.cpp
index 1ac8a47470f..398faaf2b6e 100644
--- a/src/mongo/executor/async_stream.cpp
+++ b/src/mongo/executor/async_stream.cpp
@@ -40,7 +40,8 @@ namespace executor {
using asio::ip::tcp;
-AsyncStream::AsyncStream(asio::io_service* io_service) : _stream(*io_service) {}
+AsyncStream::AsyncStream(asio::io_service::strand* strand)
+ : _strand(strand), _stream(_strand->get_io_service()) {}
AsyncStream::~AsyncStream() {
destroyStream(&_stream, _connected);
@@ -52,22 +53,22 @@ void AsyncStream::connect(tcp::resolver::iterator iter, ConnectHandler&& connect
std::move(iter),
// We need to wrap this with a lambda of the right signature so it compiles, even
// if we don't actually use the resolver iterator.
- [this, connectHandler](std::error_code ec, tcp::resolver::iterator iter) {
+ _strand->wrap([this, connectHandler](std::error_code ec, tcp::resolver::iterator iter) {
if (!ec) {
// We assume that our owner is responsible for keeping us alive until we call
// connectHandler, so _connected should always be a valid memory location.
_connected = true;
}
return connectHandler(ec);
- });
+ }));
}
void AsyncStream::write(asio::const_buffer buffer, StreamHandler&& streamHandler) {
- writeStream(&_stream, _connected, buffer, std::move(streamHandler));
+ writeStream(&_stream, _strand, _connected, buffer, std::move(streamHandler));
}
void AsyncStream::read(asio::mutable_buffer buffer, StreamHandler&& streamHandler) {
- readStream(&_stream, _connected, buffer, std::move(streamHandler));
+ readStream(&_stream, _strand, _connected, buffer, std::move(streamHandler));
}
void AsyncStream::cancel() {
diff --git a/src/mongo/executor/async_stream.h b/src/mongo/executor/async_stream.h
index c49014c860f..1e4e6982196 100644
--- a/src/mongo/executor/async_stream.h
+++ b/src/mongo/executor/async_stream.h
@@ -37,7 +37,7 @@ namespace executor {
class AsyncStream final : public AsyncStreamInterface {
public:
- AsyncStream(asio::io_service* io_service);
+ AsyncStream(asio::io_service::strand* strand);
~AsyncStream();
@@ -50,6 +50,7 @@ public:
void cancel() override;
private:
+ asio::io_service::strand* const _strand;
asio::ip::tcp::socket _stream;
bool _connected = false;
};
diff --git a/src/mongo/executor/async_stream_common.h b/src/mongo/executor/async_stream_common.h
index 7b06881551c..ef154c0cc6f 100644
--- a/src/mongo/executor/async_stream_common.h
+++ b/src/mongo/executor/async_stream_common.h
@@ -51,17 +51,27 @@ void destroyStream(ASIOStream* stream, bool connected) {
}
template <typename ASIOStream, typename Buffer, typename Handler>
-void writeStream(ASIOStream* stream, bool connected, Buffer&& buffer, Handler&& handler) {
+void writeStream(ASIOStream* stream,
+ asio::io_service::strand* strand,
+ bool connected,
+ Buffer&& buffer,
+ Handler&& handler) {
invariant(connected);
- asio::async_write(
- *stream, asio::buffer(std::forward<Buffer>(buffer)), std::forward<Handler>(handler));
+ asio::async_write(*stream,
+ asio::buffer(std::forward<Buffer>(buffer)),
+ strand->wrap(std::forward<Handler>(handler)));
}
template <typename ASIOStream, typename Buffer, typename Handler>
-void readStream(ASIOStream* stream, bool connected, Buffer&& buffer, Handler&& handler) {
+void readStream(ASIOStream* stream,
+ asio::io_service::strand* strand,
+ bool connected,
+ Buffer&& buffer,
+ Handler&& handler) {
invariant(connected);
- asio::async_read(
- *stream, asio::buffer(std::forward<Buffer>(buffer)), std::forward<Handler>(handler));
+ asio::async_read(*stream,
+ asio::buffer(std::forward<Buffer>(buffer)),
+ strand->wrap(std::forward<Handler>(handler)));
}
template <typename ASIOStream>
diff --git a/src/mongo/executor/async_stream_factory.cpp b/src/mongo/executor/async_stream_factory.cpp
index 8e8fb93b6d7..5f24adf341b 100644
--- a/src/mongo/executor/async_stream_factory.cpp
+++ b/src/mongo/executor/async_stream_factory.cpp
@@ -38,9 +38,9 @@
namespace mongo {
namespace executor {
-std::unique_ptr<AsyncStreamInterface> AsyncStreamFactory::makeStream(asio::io_service* io_service,
- const HostAndPort&) {
- return stdx::make_unique<AsyncStream>(io_service);
+std::unique_ptr<AsyncStreamInterface> AsyncStreamFactory::makeStream(
+ asio::io_service::strand* strand, const HostAndPort&) {
+ return stdx::make_unique<AsyncStream>(strand);
}
} // namespace executor
diff --git a/src/mongo/executor/async_stream_factory.h b/src/mongo/executor/async_stream_factory.h
index 198ea68b877..bbf9767427b 100644
--- a/src/mongo/executor/async_stream_factory.h
+++ b/src/mongo/executor/async_stream_factory.h
@@ -41,7 +41,7 @@ class AsyncStreamFactory final : public AsyncStreamFactoryInterface {
public:
AsyncStreamFactory() = default;
- std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service* io_service,
+ std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service::strand* strand,
const HostAndPort&) override;
};
diff --git a/src/mongo/executor/async_stream_factory_interface.h b/src/mongo/executor/async_stream_factory_interface.h
index c2f6261556a..c894a1fee5a 100644
--- a/src/mongo/executor/async_stream_factory_interface.h
+++ b/src/mongo/executor/async_stream_factory_interface.h
@@ -48,7 +48,7 @@ class AsyncStreamFactoryInterface {
public:
virtual ~AsyncStreamFactoryInterface() = default;
- virtual std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service* io_service,
+ virtual std::unique_ptr<AsyncStreamInterface> makeStream(asio::io_service::strand* strand,
const HostAndPort& target) = 0;
protected:
diff --git a/src/mongo/executor/async_timer_asio.cpp b/src/mongo/executor/async_timer_asio.cpp
index bbd1373a0d3..293ab3e137a 100644
--- a/src/mongo/executor/async_timer_asio.cpp
+++ b/src/mongo/executor/async_timer_asio.cpp
@@ -33,20 +33,20 @@
namespace mongo {
namespace executor {
-AsyncTimerASIO::AsyncTimerASIO(asio::io_service* service, Milliseconds expiration)
- : _timer(*service, expiration) {}
+AsyncTimerASIO::AsyncTimerASIO(asio::io_service::strand* strand, Milliseconds expiration)
+ : _strand(strand), _timer(_strand->get_io_service(), expiration) {}
void AsyncTimerASIO::cancel() {
_timer.cancel();
}
void AsyncTimerASIO::asyncWait(AsyncTimerInterface::Handler handler) {
- _timer.async_wait(std::move(handler));
+ _timer.async_wait(_strand->wrap(std::move(handler)));
}
-std::unique_ptr<AsyncTimerInterface> AsyncTimerFactoryASIO::make(asio::io_service* service,
+std::unique_ptr<AsyncTimerInterface> AsyncTimerFactoryASIO::make(asio::io_service::strand* strand,
Milliseconds expiration) {
- return stdx::make_unique<AsyncTimerASIO>(service, expiration);
+ return stdx::make_unique<AsyncTimerASIO>(strand, expiration);
}
} // namespace executor
diff --git a/src/mongo/executor/async_timer_asio.h b/src/mongo/executor/async_timer_asio.h
index ae09b4b9247..eb3ae02e769 100644
--- a/src/mongo/executor/async_timer_asio.h
+++ b/src/mongo/executor/async_timer_asio.h
@@ -37,13 +37,14 @@ namespace executor {
class AsyncTimerASIO final : public AsyncTimerInterface {
public:
- AsyncTimerASIO(asio::io_service* service, Milliseconds expiration);
+ AsyncTimerASIO(asio::io_service::strand* strand, Milliseconds expiration);
void cancel() override;
void asyncWait(AsyncTimerInterface::Handler handler) override;
private:
+ asio::io_service::strand* const _strand;
asio::steady_timer _timer;
};
@@ -51,7 +52,7 @@ class AsyncTimerFactoryASIO final : public AsyncTimerFactoryInterface {
public:
AsyncTimerFactoryASIO() = default;
- std::unique_ptr<AsyncTimerInterface> make(asio::io_service* service,
+ std::unique_ptr<AsyncTimerInterface> make(asio::io_service::strand* strand,
Milliseconds expiration) override;
};
diff --git a/src/mongo/executor/async_timer_interface.h b/src/mongo/executor/async_timer_interface.h
index 6a63850a0ef..6f18d4ece7f 100644
--- a/src/mongo/executor/async_timer_interface.h
+++ b/src/mongo/executor/async_timer_interface.h
@@ -81,7 +81,7 @@ class AsyncTimerFactoryInterface {
public:
virtual ~AsyncTimerFactoryInterface() = default;
- virtual std::unique_ptr<AsyncTimerInterface> make(asio::io_service* io_service,
+ virtual std::unique_ptr<AsyncTimerInterface> make(asio::io_service::strand* strand,
Milliseconds expiration) = 0;
protected:
diff --git a/src/mongo/executor/async_timer_mock.cpp b/src/mongo/executor/async_timer_mock.cpp
index 7460e840993..9807947dcf4 100644
--- a/src/mongo/executor/async_timer_mock.cpp
+++ b/src/mongo/executor/async_timer_mock.cpp
@@ -94,7 +94,7 @@ std::unique_ptr<AsyncTimerInterface> AsyncTimerFactoryMock::make(Milliseconds ex
return make(nullptr, expiration);
}
-std::unique_ptr<AsyncTimerInterface> AsyncTimerFactoryMock::make(asio::io_service* io_service,
+std::unique_ptr<AsyncTimerInterface> AsyncTimerFactoryMock::make(asio::io_service::strand* strand,
Milliseconds expiration) {
stdx::lock_guard<stdx::mutex> lk(_timersMutex);
auto elem = _timers.emplace(std::make_shared<AsyncTimerMockImpl>(expiration));
diff --git a/src/mongo/executor/async_timer_mock.h b/src/mongo/executor/async_timer_mock.h
index a20bf84a6ff..6c0e4c755f2 100644
--- a/src/mongo/executor/async_timer_mock.h
+++ b/src/mongo/executor/async_timer_mock.h
@@ -119,7 +119,7 @@ public:
/**
* Create and return a new AsyncTimerMock object.
*/
- std::unique_ptr<AsyncTimerInterface> make(asio::io_service* io_service,
+ std::unique_ptr<AsyncTimerInterface> make(asio::io_service::strand* strand,
Milliseconds expiration) override;
/**
diff --git a/src/mongo/executor/connection_pool_asio.cpp b/src/mongo/executor/connection_pool_asio.cpp
index 39d70cd6c4e..12e6d0bd062 100644
--- a/src/mongo/executor/connection_pool_asio.cpp
+++ b/src/mongo/executor/connection_pool_asio.cpp
@@ -42,9 +42,9 @@ namespace mongo {
namespace executor {
namespace connection_pool_asio {
-ASIOTimer::ASIOTimer(asio::io_service* io_service)
- : _io_service(io_service),
- _impl(*io_service),
+ASIOTimer::ASIOTimer(asio::io_service::strand* strand)
+ : _strand(strand),
+ _impl(strand->get_io_service()),
_callbackSharedState(std::make_shared<CallbackSharedState>()) {}
ASIOTimer::~ASIOTimer() {
@@ -52,55 +52,60 @@ ASIOTimer::~ASIOTimer() {
}
void ASIOTimer::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
- _cb = std::move(cb);
+ _strand->dispatch([this, timeout, cb] {
+ _cb = std::move(cb);
- cancelTimeout();
- _impl.expires_after(timeout);
-
- decltype(_callbackSharedState->id) id;
- decltype(_callbackSharedState) sharedState;
+ cancelTimeout();
+ _impl.expires_after(timeout);
- {
- stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex);
- id = ++_callbackSharedState->id;
- sharedState = _callbackSharedState;
- }
+ decltype(_callbackSharedState->id) id;
+ decltype(_callbackSharedState) sharedState;
- _impl.async_wait([this, id, sharedState](const asio::error_code& error) {
- if (error == asio::error::operation_aborted) {
- return;
+ {
+ stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex);
+ id = ++_callbackSharedState->id;
+ sharedState = _callbackSharedState;
}
- stdx::unique_lock<stdx::mutex> lk(sharedState->mutex);
-
- // If the id in shared state doesn't match the id in our callback, it
- // means we were cancelled, but still executed. This can occur if we
- // were cancelled just before our timeout. We need a generation, rather
- // than just a bool here, because we could have been cancelled and
- // another callback set, in which case we shouldn't run and the we
- // should let the other callback execute instead.
- if (sharedState->id == id) {
- auto cb = std::move(_cb);
- lk.unlock();
- cb();
- }
+ _impl.async_wait(_strand->wrap([this, id, sharedState](const asio::error_code& error) {
+ if (error == asio::error::operation_aborted) {
+ return;
+ }
+
+ stdx::unique_lock<stdx::mutex> lk(sharedState->mutex);
+
+ // If the id in shared state doesn't match the id in our callback, it
+ // means we were cancelled, but still executed. This can occur if we
+ // were cancelled just before our timeout. We need a generation, rather
+ // than just a bool here, because we could have been cancelled and
+ // another callback set, in which case we shouldn't run and the we
+ // should let the other callback execute instead.
+ if (sharedState->id == id) {
+ auto cb = std::move(_cb);
+ lk.unlock();
+ cb();
+ }
+ }));
});
}
void ASIOTimer::cancelTimeout() {
- {
- stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex);
- _callbackSharedState->id++;
- }
- _impl.cancel();
+ _strand->dispatch([this] {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex);
+ _callbackSharedState->id++;
+ }
+
+ _impl.cancel();
+ });
}
ASIOConnection::ASIOConnection(const HostAndPort& hostAndPort, size_t generation, ASIOImpl* global)
: _global(global),
- _timer(&global->_impl->_io_service),
_hostAndPort(hostAndPort),
_generation(generation),
- _impl(makeAsyncOp(this)) {}
+ _impl(makeAsyncOp(this)),
+ _timer(&_impl->strand()) {}
void ASIOConnection::indicateSuccess() {
indicateUsed();
@@ -166,9 +171,11 @@ void ASIOConnection::cancelTimeout() {
}
void ASIOConnection::setup(Milliseconds timeout, SetupCallback cb) {
- _setupCallback = std::move(cb);
+ _impl->strand().dispatch([this, timeout, cb] {
+ _setupCallback = std::move(cb);
- _global->_impl->_connect(_impl.get());
+ _global->_impl->_connect(_impl.get());
+ });
}
void ASIOConnection::resetToUnknown() {
@@ -176,48 +183,48 @@ void ASIOConnection::resetToUnknown() {
}
void ASIOConnection::refresh(Milliseconds timeout, RefreshCallback cb) {
- auto op = _impl.get();
-
- _refreshCallback = std::move(cb);
-
- // Actually timeout refreshes
- setTimeout(timeout,
- [this]() {
- asio::post(_global->_impl->_io_service,
- [this] { _impl->connection().stream().cancel(); });
- });
-
- // Our pings are isMaster's
- auto beginStatus = op->beginCommand(makeIsMasterRequest(this),
- NetworkInterfaceASIO::AsyncCommand::CommandType::kRPC,
- _hostAndPort);
- if (!beginStatus.isOK()) {
- auto cb = std::move(_refreshCallback);
- cb(this, beginStatus);
- return;
- }
-
- // If we fail during refresh, the _onFinish function of the AsyncOp will get called. As such we
- // need to intercept those calls so we can capture them. This will get cleared out when we fill
- // in the real onFinish in startCommand.
- op->setOnFinish([this](StatusWith<RemoteCommandResponse> failedResponse) {
- invariant(!failedResponse.isOK());
- auto cb = std::move(_refreshCallback);
- cb(this, failedResponse.getStatus());
- });
+ _impl->strand().dispatch([this, timeout, cb] {
+ auto op = _impl.get();
- _global->_impl->_asyncRunCommand(
- op,
- [this, op](std::error_code ec, size_t bytes) {
- cancelTimeout();
+ _refreshCallback = std::move(cb);
- auto cb = std::move(_refreshCallback);
+ // Actually timeout refreshes
+ setTimeout(timeout, [this]() { _impl->connection().stream().cancel(); });
- if (ec)
- return cb(this, Status(ErrorCodes::HostUnreachable, ec.message()));
+ // Our pings are isMaster's
+ auto beginStatus = op->beginCommand(makeIsMasterRequest(this),
+ NetworkInterfaceASIO::AsyncCommand::CommandType::kRPC,
+ _hostAndPort);
+ if (!beginStatus.isOK()) {
+ auto cb = std::move(_refreshCallback);
+ cb(this, beginStatus);
+ return;
+ }
- cb(this, Status::OK());
+ // If we fail during refresh, the _onFinish function of the AsyncOp will get called. As such
+ // we
+ // need to intercept those calls so we can capture them. This will get cleared out when we
+ // fill
+ // in the real onFinish in startCommand.
+ op->setOnFinish([this](StatusWith<RemoteCommandResponse> failedResponse) {
+ invariant(!failedResponse.isOK());
+ auto cb = std::move(_refreshCallback);
+ cb(this, failedResponse.getStatus());
});
+
+ _global->_impl->_asyncRunCommand(
+ op,
+ [this, op](std::error_code ec, size_t bytes) {
+ cancelTimeout();
+
+ auto cb = std::move(_refreshCallback);
+
+ if (ec)
+ return cb(this, Status(ErrorCodes::HostUnreachable, ec.message()));
+
+ cb(this, Status::OK());
+ });
+ });
}
std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::releaseAsyncOp() {
@@ -235,7 +242,7 @@ Date_t ASIOImpl::now() {
}
std::unique_ptr<ConnectionPool::TimerInterface> ASIOImpl::makeTimer() {
- return stdx::make_unique<ASIOTimer>(&_impl->_io_service);
+ return stdx::make_unique<ASIOTimer>(&_impl->_strand);
}
std::unique_ptr<ConnectionPool::ConnectionInterface> ASIOImpl::makeConnection(
diff --git a/src/mongo/executor/connection_pool_asio.h b/src/mongo/executor/connection_pool_asio.h
index 6700819c63d..44235d23e16 100644
--- a/src/mongo/executor/connection_pool_asio.h
+++ b/src/mongo/executor/connection_pool_asio.h
@@ -44,7 +44,7 @@ namespace connection_pool_asio {
*/
class ASIOTimer final : public ConnectionPool::TimerInterface {
public:
- ASIOTimer(asio::io_service* service);
+ ASIOTimer(asio::io_service::strand* strand);
~ASIOTimer();
void setTimeout(Milliseconds timeout, TimeoutCallback cb) override;
@@ -57,7 +57,7 @@ private:
};
TimeoutCallback _cb;
- asio::io_service* const _io_service;
+ asio::io_service::strand* const _strand;
asio::steady_timer _impl;
std::shared_ptr<CallbackSharedState> _callbackSharedState;
};
@@ -99,12 +99,12 @@ private:
SetupCallback _setupCallback;
RefreshCallback _refreshCallback;
ASIOImpl* const _global;
- ASIOTimer _timer;
Date_t _lastUsed;
Status _status = ConnectionPool::kConnectionStateUnknown;
HostAndPort _hostAndPort;
size_t _generation;
std::unique_ptr<NetworkInterfaceASIO::AsyncOp> _impl;
+ ASIOTimer _timer;
};
/**
diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp
index 5f554b4fa94..489246d217b 100644
--- a/src/mongo/executor/network_interface_asio.cpp
+++ b/src/mongo/executor/network_interface_asio.cpp
@@ -52,6 +52,10 @@
namespace mongo {
namespace executor {
+namespace {
+const std::size_t kIOServiceWorkers = 1;
+} // namespace
+
#if defined(_MSC_VER) && _MSC_VER < 1900
NetworkInterfaceASIO::Options::Options(Options&& other)
: connectionPoolOptions(std::move(other.connectionPoolOptions)),
@@ -75,13 +79,13 @@ NetworkInterfaceASIO::NetworkInterfaceASIO(Options options)
_io_service(),
_metadataHook(std::move(_options.metadataHook)),
_hook(std::move(_options.networkConnectionHook)),
- _resolver(_io_service),
_state(State::kReady),
_timerFactory(std::move(_options.timerFactory)),
_streamFactory(std::move(_options.streamFactory)),
_connectionPool(stdx::make_unique<connection_pool_asio::ASIOImpl>(this),
_options.connectionPoolOptions),
- _isExecutorRunnable(false) {}
+ _isExecutorRunnable(false),
+ _strand(_io_service) {}
std::string NetworkInterfaceASIO::getDiagnosticString() {
str::stream output;
@@ -99,25 +103,31 @@ std::string NetworkInterfaceASIO::getHostName() {
}
void NetworkInterfaceASIO::startup() {
- _serviceRunner = stdx::thread([this]() {
- setThreadName("NetworkInterfaceASIO");
- try {
- LOG(2) << "The NetworkInterfaceASIO worker thread is spinning up";
- asio::io_service::work work(_io_service);
- _io_service.run();
- } catch (...) {
- severe() << "Uncaught exception in NetworkInterfaceASIO IO worker thread of type: "
- << exceptionToStatus();
- fassertFailed(28820);
- }
- });
+ std::generate_n(std::back_inserter(_serviceRunners),
+ kIOServiceWorkers,
+ [&] {
+ return stdx::thread([this]() {
+ setThreadName("NetworkInterfaceASIO");
+ try {
+ LOG(2) << "The NetworkInterfaceASIO worker thread is spinning up";
+ asio::io_service::work work(_io_service);
+ _io_service.run();
+ } catch (...) {
+ severe() << "Uncaught exception in NetworkInterfaceASIO IO "
+ "worker thread of type: " << exceptionToStatus();
+ fassertFailed(28820);
+ }
+ });
+ });
_state.store(State::kRunning);
}
void NetworkInterfaceASIO::shutdown() {
_state.store(State::kShutdown);
_io_service.stop();
- _serviceRunner.join();
+ for (auto&& worker : _serviceRunners) {
+ worker.join();
+ }
LOG(2) << "NetworkInterfaceASIO shutdown successfully";
}
@@ -197,35 +207,35 @@ void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHa
AsyncOp* op = nullptr;
- {
- stdx::unique_lock<stdx::mutex> lk(_inProgressMutex);
+ stdx::unique_lock<stdx::mutex> lk(_inProgressMutex);
- const auto eraseCount = _inGetConnection.erase(cbHandle);
+ const auto eraseCount = _inGetConnection.erase(cbHandle);
- // If we didn't find the request, we've been canceled
- if (eraseCount == 0) {
- lk.unlock();
+ // If we didn't find the request, we've been canceled
+ if (eraseCount == 0) {
+ lk.unlock();
- onFinish({ErrorCodes::CallbackCanceled, "Callback canceled"});
+ onFinish({ErrorCodes::CallbackCanceled, "Callback canceled"});
- // Though we were canceled, we know that the stream is fine, so indicate success.
- conn->indicateSuccess();
+ // Though we were canceled, we know that the stream is fine, so indicate success.
+ conn->indicateSuccess();
- signalWorkAvailable();
+ signalWorkAvailable();
- return;
- }
+ return;
+ }
- // We can't release the AsyncOp until we know we were not canceled.
- auto ownedOp = conn->releaseAsyncOp();
- op = ownedOp.get();
+ // We can't release the AsyncOp until we know we were not canceled.
+ auto ownedOp = conn->releaseAsyncOp();
+ op = ownedOp.get();
- // Sanity check that we are getting a clean AsyncOp.
- invariant(!op->canceled());
- invariant(!op->timedOut());
+ // Sanity check that we are getting a clean AsyncOp.
+ invariant(!op->canceled());
+ invariant(!op->timedOut());
- _inProgress.emplace(op, std::move(ownedOp));
- }
+ // Now that we're inProgress, an external cancel can touch our op, but
+ // not until we release the inProgressMutex.
+ _inProgress.emplace(op, std::move(ownedOp));
op->_cbHandle = std::move(cbHandle);
op->_request = std::move(request);
@@ -233,50 +243,53 @@ void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHa
op->_connectionPoolHandle = std::move(swConn.getValue());
op->_start = startTime;
- // Set timeout now that we have the correct request object
- if (op->_request.timeout != RemoteCommandRequest::kNoTimeout) {
- op->_timeoutAlarm = op->_owner->_timerFactory->make(&_io_service, op->_request.timeout);
-
- std::shared_ptr<AsyncOp::AccessControl> access;
- std::size_t generation;
- {
- stdx::lock_guard<stdx::mutex> lk(op->_access->mutex);
- access = op->_access;
- generation = access->id;
- }
-
- op->_timeoutAlarm->asyncWait([this, op, access, generation](std::error_code ec) {
- if (!ec) {
- // We must pass a check for safe access before using op inside the
- // callback or we may attempt access on an invalid pointer.
- stdx::lock_guard<stdx::mutex> lk(access->mutex);
- if (generation != access->id) {
- // The operation has been cleaned up, do not access.
- return;
- }
-
- LOG(2) << "Operation timed out: " << op->request().toString();
+ // This ditches the lock and gets us onto the strand (so we're
+ // threadsafe)
+ op->_strand.post([this, op] {
+ // Set timeout now that we have the correct request object
+ if (op->_request.timeout != RemoteCommandRequest::kNoTimeout) {
+ op->_timeoutAlarm =
+ op->_owner->_timerFactory->make(&op->_strand, op->_request.timeout);
+
+ std::shared_ptr<AsyncOp::AccessControl> access;
+ std::size_t generation;
+ {
+ stdx::lock_guard<stdx::mutex> lk(op->_access->mutex);
+ access = op->_access;
+ generation = access->id;
+ }
- // An operation may be in mid-flight when it times out, so we
- // cancel any in-progress async calls but do not complete the operation now.
- if (op->_connection) {
- op->_connection->cancel();
+ op->_timeoutAlarm->asyncWait([this, op, access, generation](std::error_code ec) {
+ if (!ec) {
+ // We must pass a check for safe access before using op inside the
+ // callback or we may attempt access on an invalid pointer.
+ stdx::lock_guard<stdx::mutex> lk(access->mutex);
+ if (generation != access->id) {
+ // The operation has been cleaned up, do not access.
+ return;
+ }
+
+ LOG(2) << "Operation timed out: " << op->request().toString();
+
+ // An operation may be in mid-flight when it times out, so we
+ // cancel any in-progress async calls but do not complete the operation now.
+ op->_timedOut = 1;
+ if (op->_connection) {
+ op->_connection->cancel();
+ }
+ } else {
+ LOG(4) << "failed to time operation out: " << ec.message();
}
- op->_timedOut.store(1);
- } else {
- LOG(4) << "failed to time operation out: " << ec.message();
- }
- });
- }
+ });
+ }
- _beginCommunication(op);
+ _beginCommunication(op);
+ });
};
// TODO: thread some higher level timeout through, rather than 5 minutes,
// once we make timeouts pervasive in this api.
- asio::post(
- _io_service,
- [this, request, nextStep] { _connectionPool.get(request.target, Minutes(5), nextStep); });
+ _connectionPool.get(request.target, Minutes(5), nextStep);
}
void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) {
diff --git a/src/mongo/executor/network_interface_asio.h b/src/mongo/executor/network_interface_asio.h
index 74309aaf465..d7c73bc715e 100644
--- a/src/mongo/executor/network_interface_asio.h
+++ b/src/mongo/executor/network_interface_asio.h
@@ -278,6 +278,14 @@ private:
void setOnFinish(RemoteCommandCompletionFn&& onFinish);
+ asio::io_service::strand& strand() {
+ return _strand;
+ }
+
+ asio::ip::tcp::resolver& resolver() {
+ return _resolver;
+ }
+
private:
NetworkInterfaceASIO* const _owner;
// Information describing a task enqueued on the NetworkInterface
@@ -305,8 +313,10 @@ private:
Date_t _start;
std::unique_ptr<AsyncTimerInterface> _timeoutAlarm;
- AtomicUInt64 _canceled;
- AtomicUInt64 _timedOut;
+ asio::ip::tcp::resolver _resolver;
+
+ bool _canceled = false;
+ bool _timedOut = false;
/**
* We maintain a shared_ptr to an access control object. This ensures that tangent
@@ -322,6 +332,15 @@ private:
*/
boost::optional<AsyncCommand> _command;
bool _inSetup;
+
+ /**
+ * The explicit strand that all operations for this op must run on.
+ * This must be the last member of AsyncOp because any pending
+ * operation for the strand are run when it's dtor is called. Any
+ * members that fall after it will have already been destroyed, which
+ * will make those fields illegal to touch from callbacks.
+ */
+ asio::io_service::strand _strand;
};
void _startCommand(AsyncOp* op);
@@ -371,14 +390,12 @@ private:
Options _options;
asio::io_service _io_service;
- stdx::thread _serviceRunner;
+ std::vector<stdx::thread> _serviceRunners;
const std::unique_ptr<rpc::EgressMetadataHook> _metadataHook;
const std::unique_ptr<NetworkConnectionHook> _hook;
- asio::ip::tcp::resolver _resolver;
-
std::atomic<State> _state; // NOLINT
std::unique_ptr<AsyncTimerFactoryInterface> _timerFactory;
@@ -396,6 +413,15 @@ private:
stdx::mutex _executorMutex;
bool _isExecutorRunnable;
stdx::condition_variable _isExecutorRunnableCondition;
+
+ /**
+ * The explicit strand that all non-op operations run on. This must be the
+ * last member of NetworkInterfaceASIO because any pending operation for
+ * the strand are run when it's dtor is called. Any members that fall after
+ * it will have already been destroyed, which will make those fields
+ * illegal to touch from callbacks.
+ */
+ asio::io_service::strand _strand;
};
template <typename T, typename R, typename... MethodArgs, typename... DeducedArgs>
diff --git a/src/mongo/executor/network_interface_asio_connect.cpp b/src/mongo/executor/network_interface_asio_connect.cpp
index 696ee271a63..72d4cba8ac1 100644
--- a/src/mongo/executor/network_interface_asio_connect.cpp
+++ b/src/mongo/executor/network_interface_asio_connect.cpp
@@ -102,13 +102,13 @@ void NetworkInterfaceASIO::_connect(AsyncOp* op) {
_validateAndRun(
op, ec, [this, op, endpoints]() { _setupSocket(op, std::move(endpoints)); });
};
- _resolver.async_resolve(query, std::move(thenConnect));
+ op->resolver().async_resolve(query, op->_strand.wrap(std::move(thenConnect)));
}
void NetworkInterfaceASIO::_setupSocket(AsyncOp* op, tcp::resolver::iterator endpoints) {
// TODO: Consider moving this call to post-auth so we only assign completed connections.
{
- auto stream = _streamFactory->makeStream(&_io_service, op->request().target);
+ auto stream = _streamFactory->makeStream(&op->strand(), op->request().target);
op->setConnection({std::move(stream), rpc::supports::kOpQueryOnly});
}
diff --git a/src/mongo/executor/network_interface_asio_operation.cpp b/src/mongo/executor/network_interface_asio_operation.cpp
index dbce0eb9c73..21929559e5c 100644
--- a/src/mongo/executor/network_interface_asio_operation.cpp
+++ b/src/mongo/executor/network_interface_asio_operation.cpp
@@ -99,42 +99,39 @@ NetworkInterfaceASIO::AsyncOp::AsyncOp(NetworkInterfaceASIO* const owner,
_request(request),
_onFinish(onFinish),
_start(now),
+ _resolver(owner->_io_service),
_canceled(0),
_timedOut(0),
_access(std::make_shared<AsyncOp::AccessControl>()),
- _inSetup(true) {}
+ _inSetup(true),
+ _strand(owner->_io_service) {}
void NetworkInterfaceASIO::AsyncOp::cancel() {
LOG(2) << "Canceling operation; original request was: " << request().toString();
- std::shared_ptr<AsyncOp::AccessControl> access;
- std::size_t generation;
- {
- stdx::lock_guard<stdx::mutex> lk(_access->mutex);
- access = _access;
- generation = access->id;
- }
+ stdx::lock_guard<stdx::mutex> lk(_access->mutex);
+ auto access = _access;
+ auto generation = access->id;
// An operation may be in mid-flight when it is canceled, so we cancel any
// in-progress async ops but do not complete the operation now.
- asio::post(_owner->_io_service,
- [this, access, generation] {
- // Ensure 'this' pointer is still valid.
- stdx::lock_guard<stdx::mutex> lk(access->mutex);
- if (generation == access->id) {
- _canceled.store(1);
- if (_connection) {
- _connection->cancel();
- }
- }
- });
+
+ _strand.post([this, access, generation] {
+ stdx::lock_guard<stdx::mutex> lk(access->mutex);
+ if (generation == access->id) {
+ _canceled = true;
+ if (_connection) {
+ _connection->cancel();
+ }
+ }
+ });
}
bool NetworkInterfaceASIO::AsyncOp::canceled() const {
- return (_canceled.load() == 1);
+ return _canceled;
}
bool NetworkInterfaceASIO::AsyncOp::timedOut() const {
- return (_timedOut.load() == 1);
+ return _timedOut;
}
const TaskExecutor::CallbackHandle& NetworkInterfaceASIO::AsyncOp::cbHandle() const {
@@ -237,8 +234,8 @@ void NetworkInterfaceASIO::AsyncOp::reset() {
// Ditto for _operationProtocol.
_start = {};
_timeoutAlarm.reset();
- _canceled.store(0u);
- _timedOut.store(0u);
+ _canceled = false;
+ _timedOut = false;
_command = boost::none;
// _inSetup should always be false at this point.
}