diff options
author | Jason Carey <jcarey@argv.me> | 2015-10-28 14:41:07 -0400 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2015-11-09 18:03:18 -0500 |
commit | c836472353e736424c9bb87868508c9e633b892d (patch) | |
tree | 71b6bd4303e722ea244ac5e3b538ad65ab417f5c /src/mongo/executor/network_interface_asio.cpp | |
parent | e8187cc8f07ac5fccd384430f33457d8a57f0381 (diff) | |
download | mongo-c836472353e736424c9bb87868508c9e633b892d.tar.gz |
SERVER-20143 Strand NetworkInterfaceASIO
Add strands (and an option for multiple io workers) in
NetworkInterfaceASIO.
strands are an asio specific mechanism for ensuring thread safety.
Diffstat (limited to 'src/mongo/executor/network_interface_asio.cpp')
-rw-r--r-- | src/mongo/executor/network_interface_asio.cpp | 157 |
1 files changed, 85 insertions, 72 deletions
diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp index 5f554b4fa94..489246d217b 100644 --- a/src/mongo/executor/network_interface_asio.cpp +++ b/src/mongo/executor/network_interface_asio.cpp @@ -52,6 +52,10 @@ namespace mongo { namespace executor { +namespace { +const std::size_t kIOServiceWorkers = 1; +} // namespace + #if defined(_MSC_VER) && _MSC_VER < 1900 NetworkInterfaceASIO::Options::Options(Options&& other) : connectionPoolOptions(std::move(other.connectionPoolOptions)), @@ -75,13 +79,13 @@ NetworkInterfaceASIO::NetworkInterfaceASIO(Options options) _io_service(), _metadataHook(std::move(_options.metadataHook)), _hook(std::move(_options.networkConnectionHook)), - _resolver(_io_service), _state(State::kReady), _timerFactory(std::move(_options.timerFactory)), _streamFactory(std::move(_options.streamFactory)), _connectionPool(stdx::make_unique<connection_pool_asio::ASIOImpl>(this), _options.connectionPoolOptions), - _isExecutorRunnable(false) {} + _isExecutorRunnable(false), + _strand(_io_service) {} std::string NetworkInterfaceASIO::getDiagnosticString() { str::stream output; @@ -99,25 +103,31 @@ std::string NetworkInterfaceASIO::getHostName() { } void NetworkInterfaceASIO::startup() { - _serviceRunner = stdx::thread([this]() { - setThreadName("NetworkInterfaceASIO"); - try { - LOG(2) << "The NetworkInterfaceASIO worker thread is spinning up"; - asio::io_service::work work(_io_service); - _io_service.run(); - } catch (...) { - severe() << "Uncaught exception in NetworkInterfaceASIO IO worker thread of type: " - << exceptionToStatus(); - fassertFailed(28820); - } - }); + std::generate_n(std::back_inserter(_serviceRunners), + kIOServiceWorkers, + [&] { + return stdx::thread([this]() { + setThreadName("NetworkInterfaceASIO"); + try { + LOG(2) << "The NetworkInterfaceASIO worker thread is spinning up"; + asio::io_service::work work(_io_service); + _io_service.run(); + } catch (...) { + severe() << "Uncaught exception in NetworkInterfaceASIO IO " + "worker thread of type: " << exceptionToStatus(); + fassertFailed(28820); + } + }); + }); _state.store(State::kRunning); } void NetworkInterfaceASIO::shutdown() { _state.store(State::kShutdown); _io_service.stop(); - _serviceRunner.join(); + for (auto&& worker : _serviceRunners) { + worker.join(); + } LOG(2) << "NetworkInterfaceASIO shutdown successfully"; } @@ -197,35 +207,35 @@ void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHa AsyncOp* op = nullptr; - { - stdx::unique_lock<stdx::mutex> lk(_inProgressMutex); + stdx::unique_lock<stdx::mutex> lk(_inProgressMutex); - const auto eraseCount = _inGetConnection.erase(cbHandle); + const auto eraseCount = _inGetConnection.erase(cbHandle); - // If we didn't find the request, we've been canceled - if (eraseCount == 0) { - lk.unlock(); + // If we didn't find the request, we've been canceled + if (eraseCount == 0) { + lk.unlock(); - onFinish({ErrorCodes::CallbackCanceled, "Callback canceled"}); + onFinish({ErrorCodes::CallbackCanceled, "Callback canceled"}); - // Though we were canceled, we know that the stream is fine, so indicate success. - conn->indicateSuccess(); + // Though we were canceled, we know that the stream is fine, so indicate success. + conn->indicateSuccess(); - signalWorkAvailable(); + signalWorkAvailable(); - return; - } + return; + } - // We can't release the AsyncOp until we know we were not canceled. - auto ownedOp = conn->releaseAsyncOp(); - op = ownedOp.get(); + // We can't release the AsyncOp until we know we were not canceled. + auto ownedOp = conn->releaseAsyncOp(); + op = ownedOp.get(); - // Sanity check that we are getting a clean AsyncOp. - invariant(!op->canceled()); - invariant(!op->timedOut()); + // Sanity check that we are getting a clean AsyncOp. + invariant(!op->canceled()); + invariant(!op->timedOut()); - _inProgress.emplace(op, std::move(ownedOp)); - } + // Now that we're inProgress, an external cancel can touch our op, but + // not until we release the inProgressMutex. + _inProgress.emplace(op, std::move(ownedOp)); op->_cbHandle = std::move(cbHandle); op->_request = std::move(request); @@ -233,50 +243,53 @@ void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHa op->_connectionPoolHandle = std::move(swConn.getValue()); op->_start = startTime; - // Set timeout now that we have the correct request object - if (op->_request.timeout != RemoteCommandRequest::kNoTimeout) { - op->_timeoutAlarm = op->_owner->_timerFactory->make(&_io_service, op->_request.timeout); - - std::shared_ptr<AsyncOp::AccessControl> access; - std::size_t generation; - { - stdx::lock_guard<stdx::mutex> lk(op->_access->mutex); - access = op->_access; - generation = access->id; - } - - op->_timeoutAlarm->asyncWait([this, op, access, generation](std::error_code ec) { - if (!ec) { - // We must pass a check for safe access before using op inside the - // callback or we may attempt access on an invalid pointer. - stdx::lock_guard<stdx::mutex> lk(access->mutex); - if (generation != access->id) { - // The operation has been cleaned up, do not access. - return; - } - - LOG(2) << "Operation timed out: " << op->request().toString(); + // This ditches the lock and gets us onto the strand (so we're + // threadsafe) + op->_strand.post([this, op] { + // Set timeout now that we have the correct request object + if (op->_request.timeout != RemoteCommandRequest::kNoTimeout) { + op->_timeoutAlarm = + op->_owner->_timerFactory->make(&op->_strand, op->_request.timeout); + + std::shared_ptr<AsyncOp::AccessControl> access; + std::size_t generation; + { + stdx::lock_guard<stdx::mutex> lk(op->_access->mutex); + access = op->_access; + generation = access->id; + } - // An operation may be in mid-flight when it times out, so we - // cancel any in-progress async calls but do not complete the operation now. - if (op->_connection) { - op->_connection->cancel(); + op->_timeoutAlarm->asyncWait([this, op, access, generation](std::error_code ec) { + if (!ec) { + // We must pass a check for safe access before using op inside the + // callback or we may attempt access on an invalid pointer. + stdx::lock_guard<stdx::mutex> lk(access->mutex); + if (generation != access->id) { + // The operation has been cleaned up, do not access. + return; + } + + LOG(2) << "Operation timed out: " << op->request().toString(); + + // An operation may be in mid-flight when it times out, so we + // cancel any in-progress async calls but do not complete the operation now. + op->_timedOut = 1; + if (op->_connection) { + op->_connection->cancel(); + } + } else { + LOG(4) << "failed to time operation out: " << ec.message(); } - op->_timedOut.store(1); - } else { - LOG(4) << "failed to time operation out: " << ec.message(); - } - }); - } + }); + } - _beginCommunication(op); + _beginCommunication(op); + }); }; // TODO: thread some higher level timeout through, rather than 5 minutes, // once we make timeouts pervasive in this api. - asio::post( - _io_service, - [this, request, nextStep] { _connectionPool.get(request.target, Minutes(5), nextStep); }); + _connectionPool.get(request.target, Minutes(5), nextStep); } void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) { |