diff options
author | Jason Carey <jcarey@argv.me> | 2015-10-28 14:41:07 -0400 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2015-11-09 18:03:18 -0500 |
commit | c836472353e736424c9bb87868508c9e633b892d (patch) | |
tree | 71b6bd4303e722ea244ac5e3b538ad65ab417f5c /src/mongo/executor/connection_pool_asio.cpp | |
parent | e8187cc8f07ac5fccd384430f33457d8a57f0381 (diff) | |
download | mongo-c836472353e736424c9bb87868508c9e633b892d.tar.gz |
SERVER-20143 Strand NetworkInterfaceASIO
Add strands (and an option for multiple io workers) in
NetworkInterfaceASIO.
strands are an asio specific mechanism for ensuring thread safety.
Diffstat (limited to 'src/mongo/executor/connection_pool_asio.cpp')
-rw-r--r-- | src/mongo/executor/connection_pool_asio.cpp | 161 |
1 files changed, 84 insertions, 77 deletions
diff --git a/src/mongo/executor/connection_pool_asio.cpp b/src/mongo/executor/connection_pool_asio.cpp index 39d70cd6c4e..12e6d0bd062 100644 --- a/src/mongo/executor/connection_pool_asio.cpp +++ b/src/mongo/executor/connection_pool_asio.cpp @@ -42,9 +42,9 @@ namespace mongo { namespace executor { namespace connection_pool_asio { -ASIOTimer::ASIOTimer(asio::io_service* io_service) - : _io_service(io_service), - _impl(*io_service), +ASIOTimer::ASIOTimer(asio::io_service::strand* strand) + : _strand(strand), + _impl(strand->get_io_service()), _callbackSharedState(std::make_shared<CallbackSharedState>()) {} ASIOTimer::~ASIOTimer() { @@ -52,55 +52,60 @@ ASIOTimer::~ASIOTimer() { } void ASIOTimer::setTimeout(Milliseconds timeout, TimeoutCallback cb) { - _cb = std::move(cb); + _strand->dispatch([this, timeout, cb] { + _cb = std::move(cb); - cancelTimeout(); - _impl.expires_after(timeout); - - decltype(_callbackSharedState->id) id; - decltype(_callbackSharedState) sharedState; + cancelTimeout(); + _impl.expires_after(timeout); - { - stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex); - id = ++_callbackSharedState->id; - sharedState = _callbackSharedState; - } + decltype(_callbackSharedState->id) id; + decltype(_callbackSharedState) sharedState; - _impl.async_wait([this, id, sharedState](const asio::error_code& error) { - if (error == asio::error::operation_aborted) { - return; + { + stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex); + id = ++_callbackSharedState->id; + sharedState = _callbackSharedState; } - stdx::unique_lock<stdx::mutex> lk(sharedState->mutex); - - // If the id in shared state doesn't match the id in our callback, it - // means we were cancelled, but still executed. This can occur if we - // were cancelled just before our timeout. We need a generation, rather - // than just a bool here, because we could have been cancelled and - // another callback set, in which case we shouldn't run and the we - // should let the other callback execute instead. - if (sharedState->id == id) { - auto cb = std::move(_cb); - lk.unlock(); - cb(); - } + _impl.async_wait(_strand->wrap([this, id, sharedState](const asio::error_code& error) { + if (error == asio::error::operation_aborted) { + return; + } + + stdx::unique_lock<stdx::mutex> lk(sharedState->mutex); + + // If the id in shared state doesn't match the id in our callback, it + // means we were cancelled, but still executed. This can occur if we + // were cancelled just before our timeout. We need a generation, rather + // than just a bool here, because we could have been cancelled and + // another callback set, in which case we shouldn't run and the we + // should let the other callback execute instead. + if (sharedState->id == id) { + auto cb = std::move(_cb); + lk.unlock(); + cb(); + } + })); }); } void ASIOTimer::cancelTimeout() { - { - stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex); - _callbackSharedState->id++; - } - _impl.cancel(); + _strand->dispatch([this] { + { + stdx::lock_guard<stdx::mutex> lk(_callbackSharedState->mutex); + _callbackSharedState->id++; + } + + _impl.cancel(); + }); } ASIOConnection::ASIOConnection(const HostAndPort& hostAndPort, size_t generation, ASIOImpl* global) : _global(global), - _timer(&global->_impl->_io_service), _hostAndPort(hostAndPort), _generation(generation), - _impl(makeAsyncOp(this)) {} + _impl(makeAsyncOp(this)), + _timer(&_impl->strand()) {} void ASIOConnection::indicateSuccess() { indicateUsed(); @@ -166,9 +171,11 @@ void ASIOConnection::cancelTimeout() { } void ASIOConnection::setup(Milliseconds timeout, SetupCallback cb) { - _setupCallback = std::move(cb); + _impl->strand().dispatch([this, timeout, cb] { + _setupCallback = std::move(cb); - _global->_impl->_connect(_impl.get()); + _global->_impl->_connect(_impl.get()); + }); } void ASIOConnection::resetToUnknown() { @@ -176,48 +183,48 @@ void ASIOConnection::resetToUnknown() { } void ASIOConnection::refresh(Milliseconds timeout, RefreshCallback cb) { - auto op = _impl.get(); - - _refreshCallback = std::move(cb); - - // Actually timeout refreshes - setTimeout(timeout, - [this]() { - asio::post(_global->_impl->_io_service, - [this] { _impl->connection().stream().cancel(); }); - }); - - // Our pings are isMaster's - auto beginStatus = op->beginCommand(makeIsMasterRequest(this), - NetworkInterfaceASIO::AsyncCommand::CommandType::kRPC, - _hostAndPort); - if (!beginStatus.isOK()) { - auto cb = std::move(_refreshCallback); - cb(this, beginStatus); - return; - } - - // If we fail during refresh, the _onFinish function of the AsyncOp will get called. As such we - // need to intercept those calls so we can capture them. This will get cleared out when we fill - // in the real onFinish in startCommand. - op->setOnFinish([this](StatusWith<RemoteCommandResponse> failedResponse) { - invariant(!failedResponse.isOK()); - auto cb = std::move(_refreshCallback); - cb(this, failedResponse.getStatus()); - }); + _impl->strand().dispatch([this, timeout, cb] { + auto op = _impl.get(); - _global->_impl->_asyncRunCommand( - op, - [this, op](std::error_code ec, size_t bytes) { - cancelTimeout(); + _refreshCallback = std::move(cb); - auto cb = std::move(_refreshCallback); + // Actually timeout refreshes + setTimeout(timeout, [this]() { _impl->connection().stream().cancel(); }); - if (ec) - return cb(this, Status(ErrorCodes::HostUnreachable, ec.message())); + // Our pings are isMaster's + auto beginStatus = op->beginCommand(makeIsMasterRequest(this), + NetworkInterfaceASIO::AsyncCommand::CommandType::kRPC, + _hostAndPort); + if (!beginStatus.isOK()) { + auto cb = std::move(_refreshCallback); + cb(this, beginStatus); + return; + } - cb(this, Status::OK()); + // If we fail during refresh, the _onFinish function of the AsyncOp will get called. As such + // we + // need to intercept those calls so we can capture them. This will get cleared out when we + // fill + // in the real onFinish in startCommand. + op->setOnFinish([this](StatusWith<RemoteCommandResponse> failedResponse) { + invariant(!failedResponse.isOK()); + auto cb = std::move(_refreshCallback); + cb(this, failedResponse.getStatus()); }); + + _global->_impl->_asyncRunCommand( + op, + [this, op](std::error_code ec, size_t bytes) { + cancelTimeout(); + + auto cb = std::move(_refreshCallback); + + if (ec) + return cb(this, Status(ErrorCodes::HostUnreachable, ec.message())); + + cb(this, Status::OK()); + }); + }); } std::unique_ptr<NetworkInterfaceASIO::AsyncOp> ASIOConnection::releaseAsyncOp() { @@ -235,7 +242,7 @@ Date_t ASIOImpl::now() { } std::unique_ptr<ConnectionPool::TimerInterface> ASIOImpl::makeTimer() { - return stdx::make_unique<ASIOTimer>(&_impl->_io_service); + return stdx::make_unique<ASIOTimer>(&_impl->_strand); } std::unique_ptr<ConnectionPool::ConnectionInterface> ASIOImpl::makeConnection( |