summaryrefslogtreecommitdiff
path: root/src/mongo/executor/connection_pool_asio.cpp
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2015-10-28 14:41:07 -0400
committerJason Carey <jcarey@argv.me>2015-11-09 18:03:18 -0500
commitc836472353e736424c9bb87868508c9e633b892d (patch)
tree71b6bd4303e722ea244ac5e3b538ad65ab417f5c /src/mongo/executor/connection_pool_asio.cpp
parente8187cc8f07ac5fccd384430f33457d8a57f0381 (diff)
downloadmongo-c836472353e736424c9bb87868508c9e633b892d.tar.gz
SERVER-20143 Strand NetworkInterfaceASIO
Add strands (and an option for multiple io workers) in NetworkInterfaceASIO. strands are an asio specific mechanism for ensuring thread safety.
Diffstat (limited to 'src/mongo/executor/connection_pool_asio.cpp')
-rw-r--r--src/mongo/executor/connection_pool_asio.cpp161
1 files changed, 84 insertions, 77 deletions
diff --git a/src/mongo/executor/connection_pool_asio.cpp b/src/mongo/executor/connection_pool_asio.cpp
index 39d70cd6c4e..12e6d0bd062 100644
--- a/src/mongo/executor/connection_pool_asio.cpp
+++ b/src/mongo/executor/connection_pool_asio.cpp
@@ -42,9 +42,9 @@ namespace mongo {
namespace executor {
namespace connection_pool_asio {
-ASIOTimer::ASIOTimer(asio::io_service* io_service)
- : _io_service(io_service),
- _impl(*io_service),
+ASIOTimer::ASIOTimer(asio::io_service::strand* strand)
+ : _strand(strand),
+ _impl(strand->get_io_service()),
_callbackSharedState(std::make_shared<CallbackSharedState>()) {}
ASIOTimer::~ASIOTimer() {
@@ -52,55 +52,60 @@ ASIOTimer::~ASIOTimer() {
}
void ASIOTimer::setTimeout(Milliseconds timeout, TimeoutCallback cb) {
- _cb = std::move(cb);
+ _strand->dispatch([this, timeout, cb] {
+ _cb = std::move(cb);
- cancelTimeout();
- _impl.expires_after(timeout);
-
- decltype(_callbackSharedState->id) id;
- decltype(_callbackSharedState) sharedState;
+ cancelTimeout();
+ _impl.expires_after(timeout);
- {
- stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex);
- id = ++_callbackSharedState->id;
- sharedState = _callbackSharedState;
- }
+ decltype(_callbackSharedState->id) id;
+ decltype(_callbackSharedState) sharedState;
- _impl.async_wait([this, id, sharedState](const asio::error_code& error) {
- if (error == asio::error::operation_aborted) {
- return;
+ {
+ stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex);
+ id = ++_callbackSharedState->id;
+ sharedState = _callbackSharedState;
}
- stdx::unique_lock<stdx::mutex> lk(sharedState->mutex);
-
- // If the id in shared state doesn't match the id in our callback, it
- // means we were cancelled, but still executed. This can occur if we
- // were cancelled just before our timeout. We need a generation, rather
- // than just a bool here, because we could have been cancelled and
- // another callback set, in which case we shouldn't run and the we
- // should let the other callback execute instead.
- if (sharedState->id == id) {
- auto cb = std::move(_cb);
- lk.unlock();
- cb();
- }
+ _impl.async_wait(_strand->wrap([this, id, sharedState](const asio::error_code& error) {
+ if (error == asio::error::operation_aborted) {
+ return;
+ }
+
+ stdx::unique_lock<stdx::mutex> lk(sharedState->mutex);
+
+ // If the id in shared state doesn't match the id in our callback, it
+ // means we were cancelled, but still executed. This can occur if we
+ // were cancelled just before our timeout. We need a generation, rather
+ // than just a bool here, because we could have been cancelled and
+ // another callback set, in which case we shouldn't run and the we
+ // should let the other callback execute instead.
+ if (sharedState->id == id) {
+ auto cb = std::move(_cb);
+ lk.unlock();
+ cb();
+ }
+ }));
});
}
void ASIOTimer::cancelTimeout() {
- {
- stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex);
- _callbackSharedState->id++;
- }
- _impl.cancel();
+ _strand->dispatch([this] {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex);
+ _callbackSharedState->id++;
+ }
+
+ _impl.cancel();
+ });
}
ASIOConnection::ASIOConnection(const HostAndPort& hostAndPort, size_t generation, ASIOImpl* global)
: _global(global),
- _timer(&global->_impl->_io_service),
_hostAndPort(hostAndPort),
_generation(generation),
- _impl(makeAsyncOp(this)) {}
+ _impl(makeAsyncOp(this)),
+ _timer(&_impl->strand()) {}
void ASIOConnection::indicateSuccess() {
indicateUsed();
@@ -166,9 +171,11 @@ void ASIOConnection::cancelTimeout() {
}
void ASIOConnection::setup(Milliseconds timeout, SetupCallback cb) {
- _setupCallback = std::move(cb);
+ _impl->strand().dispatch([this, timeout, cb] {
+ _setupCallback = std::move(cb);
- _global->_impl->_connect(_impl.get());
+ _global->_impl->_connect(_impl.get());
+ });
}
void ASIOConnection::resetToUnknown() {
@@ -176,48 +183,48 @@ void ASIOConnection::resetToUnknown() {
}
void ASIOConnection::refresh(Milliseconds timeout, RefreshCallback cb) {
- auto op = _impl.get();
-
- _refreshCallback = std::move(cb);
-
- // Actually timeout refreshes
- setTimeout(timeout,
- [this]() {
- asio::post(_global->_impl->_io_service,
- [this] { _impl->connection().stream().cancel(); });
- });
-
- // Our pings are isMaster's
- auto beginStatus = op->beginCommand(makeIsMasterRequest(this),
- NetworkInterfaceASIO::AsyncCommand::CommandType::kRPC,
- _hostAndPort);
- if (!beginStatus.isOK()) {
- auto cb = std::move(_refreshCallback);
- cb(this, beginStatus);
- return;
- }
-
- // If we fail during refresh, the _onFinish function of the AsyncOp will get called. As such we
- // need to intercept those calls so we can capture them. This will get cleared out when we fill
- // in the real onFinish in startCommand.
- op->setOnFinish([this](StatusWith<RemoteCommandResponse> failedResponse) {
- invariant(!failedResponse.isOK());
- auto cb = std::move(_refreshCallback);
- cb(this, failedResponse.getStatus());
- });
+ _impl->strand().dispatch([this, timeout, cb] {
+ auto op = _impl.get();
- _global->_impl->_asyncRunCommand(
- op,
- [this, op](std::error_code ec, size_t bytes) {
- cancelTimeout();
+ _refreshCallback = std::move(cb);
- auto cb = std::move(_refreshCallback);
+ // Actually timeout refreshes
+ setTimeout(timeout, [this]() { _impl->connection().stream().cancel(); });
- if (ec)
- return cb(this, Status(ErrorCodes::HostUnreachable, ec.message()));
+ // Our pings are isMaster's
+ auto beginStatus = op->beginCommand(makeIsMasterRequest(this),
+ NetworkInterfaceASIO::AsyncCommand::CommandType::kRPC,
+ _hostAndPort);
+ if (!beginStatus.isOK()) {
+ auto cb = std::move(_refreshCallback);
+ cb(this, beginStatus);
+ return;
+ }
- cb(this, Status::OK());
+ // If we fail during refresh, the _onFinish function of the AsyncOp will get called. As such
+ // we
+ // need to intercept those calls so we can capture them. This will get cleared out when we
+ // fill
+ // in the real onFinish in startCommand.
+ op->setOnFinish([this](StatusWith<RemoteCommandResponse> failedResponse) {
+ invariant(!failedResponse.isOK());
+ auto cb = std::move(_refreshCallback);
+ cb(this, failedResponse.getStatus());
});
+
+ _global->_impl->_asyncRunCommand(
+ op,
+ [this, op](std::error_code ec, size_t bytes) {
+ cancelTimeout();
+
+ auto cb = std::move(_refreshCallback);
+
+ if (ec)
+ return cb(this, Status(ErrorCodes::HostUnreachable, ec.message()));
+
+ cb(this, Status::OK());
+ });
+ });
}
std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::releaseAsyncOp() {
@@ -235,7 +242,7 @@ Date_t ASIOImpl::now() {
}
std::unique_ptr<ConnectionPool::TimerInterface> ASIOImpl::makeTimer() {
- return stdx::make_unique<ASIOTimer>(&_impl->_io_service);
+ return stdx::make_unique<ASIOTimer>(&_impl->_strand);
}
std::unique_ptr<ConnectionPool::ConnectionInterface> ASIOImpl::makeConnection(