summaryrefslogtreecommitdiff
path: root/src/mongo/executor/network_interface_asio.cpp
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2015-10-28 14:41:07 -0400
committerJason Carey <jcarey@argv.me>2015-11-09 18:03:18 -0500
commitc836472353e736424c9bb87868508c9e633b892d (patch)
tree71b6bd4303e722ea244ac5e3b538ad65ab417f5c /src/mongo/executor/network_interface_asio.cpp
parente8187cc8f07ac5fccd384430f33457d8a57f0381 (diff)
downloadmongo-c836472353e736424c9bb87868508c9e633b892d.tar.gz
SERVER-20143 Strand NetworkInterfaceASIO
Add strands (and an option for multiple io workers) in NetworkInterfaceASIO. strands are an asio specific mechanism for ensuring thread safety.
Diffstat (limited to 'src/mongo/executor/network_interface_asio.cpp')
-rw-r--r--src/mongo/executor/network_interface_asio.cpp157
1 files changed, 85 insertions, 72 deletions
diff --git a/src/mongo/executor/network_interface_asio.cpp b/src/mongo/executor/network_interface_asio.cpp
index 5f554b4fa94..489246d217b 100644
--- a/src/mongo/executor/network_interface_asio.cpp
+++ b/src/mongo/executor/network_interface_asio.cpp
@@ -52,6 +52,10 @@
namespace mongo {
namespace executor {
+namespace {
+const std::size_t kIOServiceWorkers = 1;
+} // namespace
+
#if defined(_MSC_VER) && _MSC_VER < 1900
NetworkInterfaceASIO::Options::Options(Options&& other)
: connectionPoolOptions(std::move(other.connectionPoolOptions)),
@@ -75,13 +79,13 @@ NetworkInterfaceASIO::NetworkInterfaceASIO(Options options)
_io_service(),
_metadataHook(std::move(_options.metadataHook)),
_hook(std::move(_options.networkConnectionHook)),
- _resolver(_io_service),
_state(State::kReady),
_timerFactory(std::move(_options.timerFactory)),
_streamFactory(std::move(_options.streamFactory)),
_connectionPool(stdx::make_unique<connection_pool_asio::ASIOImpl>(this),
_options.connectionPoolOptions),
- _isExecutorRunnable(false) {}
+ _isExecutorRunnable(false),
+ _strand(_io_service) {}
std::string NetworkInterfaceASIO::getDiagnosticString() {
str::stream output;
@@ -99,25 +103,31 @@ std::string NetworkInterfaceASIO::getHostName() {
}
void NetworkInterfaceASIO::startup() {
- _serviceRunner = stdx::thread([this]() {
- setThreadName("NetworkInterfaceASIO");
- try {
- LOG(2) << "The NetworkInterfaceASIO worker thread is spinning up";
- asio::io_service::work work(_io_service);
- _io_service.run();
- } catch (...) {
- severe() << "Uncaught exception in NetworkInterfaceASIO IO worker thread of type: "
- << exceptionToStatus();
- fassertFailed(28820);
- }
- });
+ std::generate_n(std::back_inserter(_serviceRunners),
+ kIOServiceWorkers,
+ [&] {
+ return stdx::thread([this]() {
+ setThreadName("NetworkInterfaceASIO");
+ try {
+ LOG(2) << "The NetworkInterfaceASIO worker thread is spinning up";
+ asio::io_service::work work(_io_service);
+ _io_service.run();
+ } catch (...) {
+ severe() << "Uncaught exception in NetworkInterfaceASIO IO "
+ "worker thread of type: " << exceptionToStatus();
+ fassertFailed(28820);
+ }
+ });
+ });
_state.store(State::kRunning);
}
void NetworkInterfaceASIO::shutdown() {
_state.store(State::kShutdown);
_io_service.stop();
- _serviceRunner.join();
+ for (auto&& worker : _serviceRunners) {
+ worker.join();
+ }
LOG(2) << "NetworkInterfaceASIO shutdown successfully";
}
@@ -197,35 +207,35 @@ void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHa
AsyncOp* op = nullptr;
- {
- stdx::unique_lock<stdx::mutex> lk(_inProgressMutex);
+ stdx::unique_lock<stdx::mutex> lk(_inProgressMutex);
- const auto eraseCount = _inGetConnection.erase(cbHandle);
+ const auto eraseCount = _inGetConnection.erase(cbHandle);
- // If we didn't find the request, we've been canceled
- if (eraseCount == 0) {
- lk.unlock();
+ // If we didn't find the request, we've been canceled
+ if (eraseCount == 0) {
+ lk.unlock();
- onFinish({ErrorCodes::CallbackCanceled, "Callback canceled"});
+ onFinish({ErrorCodes::CallbackCanceled, "Callback canceled"});
- // Though we were canceled, we know that the stream is fine, so indicate success.
- conn->indicateSuccess();
+ // Though we were canceled, we know that the stream is fine, so indicate success.
+ conn->indicateSuccess();
- signalWorkAvailable();
+ signalWorkAvailable();
- return;
- }
+ return;
+ }
- // We can't release the AsyncOp until we know we were not canceled.
- auto ownedOp = conn->releaseAsyncOp();
- op = ownedOp.get();
+ // We can't release the AsyncOp until we know we were not canceled.
+ auto ownedOp = conn->releaseAsyncOp();
+ op = ownedOp.get();
- // Sanity check that we are getting a clean AsyncOp.
- invariant(!op->canceled());
- invariant(!op->timedOut());
+ // Sanity check that we are getting a clean AsyncOp.
+ invariant(!op->canceled());
+ invariant(!op->timedOut());
- _inProgress.emplace(op, std::move(ownedOp));
- }
+ // Now that we're inProgress, an external cancel can touch our op, but
+ // not until we release the inProgressMutex.
+ _inProgress.emplace(op, std::move(ownedOp));
op->_cbHandle = std::move(cbHandle);
op->_request = std::move(request);
@@ -233,50 +243,53 @@ void NetworkInterfaceASIO::startCommand(const TaskExecutor::CallbackHandle& cbHa
op->_connectionPoolHandle = std::move(swConn.getValue());
op->_start = startTime;
- // Set timeout now that we have the correct request object
- if (op->_request.timeout != RemoteCommandRequest::kNoTimeout) {
- op->_timeoutAlarm = op->_owner->_timerFactory->make(&_io_service, op->_request.timeout);
-
- std::shared_ptr<AsyncOp::AccessControl> access;
- std::size_t generation;
- {
- stdx::lock_guard<stdx::mutex> lk(op->_access->mutex);
- access = op->_access;
- generation = access->id;
- }
-
- op->_timeoutAlarm->asyncWait([this, op, access, generation](std::error_code ec) {
- if (!ec) {
- // We must pass a check for safe access before using op inside the
- // callback or we may attempt access on an invalid pointer.
- stdx::lock_guard<stdx::mutex> lk(access->mutex);
- if (generation != access->id) {
- // The operation has been cleaned up, do not access.
- return;
- }
-
- LOG(2) << "Operation timed out: " << op->request().toString();
+ // This ditches the lock and gets us onto the strand (so we're
+ // threadsafe)
+ op->_strand.post([this, op] {
+ // Set timeout now that we have the correct request object
+ if (op->_request.timeout != RemoteCommandRequest::kNoTimeout) {
+ op->_timeoutAlarm =
+ op->_owner->_timerFactory->make(&op->_strand, op->_request.timeout);
+
+ std::shared_ptr<AsyncOp::AccessControl> access;
+ std::size_t generation;
+ {
+ stdx::lock_guard<stdx::mutex> lk(op->_access->mutex);
+ access = op->_access;
+ generation = access->id;
+ }
- // An operation may be in mid-flight when it times out, so we
- // cancel any in-progress async calls but do not complete the operation now.
- if (op->_connection) {
- op->_connection->cancel();
+ op->_timeoutAlarm->asyncWait([this, op, access, generation](std::error_code ec) {
+ if (!ec) {
+ // We must pass a check for safe access before using op inside the
+ // callback or we may attempt access on an invalid pointer.
+ stdx::lock_guard<stdx::mutex> lk(access->mutex);
+ if (generation != access->id) {
+ // The operation has been cleaned up, do not access.
+ return;
+ }
+
+ LOG(2) << "Operation timed out: " << op->request().toString();
+
+ // An operation may be in mid-flight when it times out, so we
+ // cancel any in-progress async calls but do not complete the operation now.
+ op->_timedOut = 1;
+ if (op->_connection) {
+ op->_connection->cancel();
+ }
+ } else {
+ LOG(4) << "failed to time operation out: " << ec.message();
}
- op->_timedOut.store(1);
- } else {
- LOG(4) << "failed to time operation out: " << ec.message();
- }
- });
- }
+ });
+ }
- _beginCommunication(op);
+ _beginCommunication(op);
+ });
};
// TODO: thread some higher level timeout through, rather than 5 minutes,
// once we make timeouts pervasive in this api.
- asio::post(
- _io_service,
- [this, request, nextStep] { _connectionPool.get(request.target, Minutes(5), nextStep); });
+ _connectionPool.get(request.target, Minutes(5), nextStep);
}
void NetworkInterfaceASIO::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) {