summaryrefslogtreecommitdiff
path: root/src/mongo/executor/network_interface_tl.cpp
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2018-03-21 00:15:35 -0400
committerJason Carey <jcarey@argv.me>2018-04-27 19:49:28 -0400
commit4ddf18bcf4d517c3dc0f005f9222ffaab9a86ffa (patch)
tree438865c1065d0a96c427b1ed3a89e5163d85699a /src/mongo/executor/network_interface_tl.cpp
parent91eaa878c4feeebd9397c49180631fc719238aaf (diff)
downloadmongo-4ddf18bcf4d517c3dc0f005f9222ffaab9a86ffa.tar.gz
SERVER-34739 Migrate to 1 connpool in ARS
Migrate to 1 connection pool in mongos. This change involves the introduction of a transport layer baton, which improves perf for a particular transport layer when doing local scatter/gather operations.
Diffstat (limited to 'src/mongo/executor/network_interface_tl.cpp')
-rw-r--r--src/mongo/executor/network_interface_tl.cpp227
1 files changed, 143 insertions, 84 deletions
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index eef2299e623..15947af2c0b 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -85,7 +85,7 @@ void NetworkInterfaceTL::startup() {
auto typeFactory = std::make_unique<connection_pool_tl::TLTypeFactory>(
_reactor, _tl, std::move(_onConnectHook));
_pool = std::make_unique<ConnectionPool>(
- std::move(typeFactory), "NetworkInterfaceTL", _connPoolOpts);
+ std::move(typeFactory), std::string("NetworkInterfaceTL-") + _instanceName, _connPoolOpts);
_ioThread = stdx::thread([this] {
setThreadName(_instanceName);
LOG(2) << "The NetworkInterfaceTL reactor thread is spinning up";
@@ -97,6 +97,7 @@ void NetworkInterfaceTL::shutdown() {
_inShutdown.store(true);
_reactor->stop();
_ioThread.join();
+ _pool.reset();
LOG(2) << "NetworkInterfaceTL shutdown successfully";
}
@@ -136,7 +137,8 @@ Date_t NetworkInterfaceTL::now() {
Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle,
RemoteCommandRequest& request,
- const RemoteCommandCompletionFn& onFinish) {
+ const RemoteCommandCompletionFn& onFinish,
+ const transport::BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
}
@@ -166,40 +168,86 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa
state->deadline = state->start + state->request.timeout;
}
- _pool->get(request.target, request.timeout)
- .tapError([state](Status error) {
- LOG(2) << "Failed to get connection from pool for request " << state->request.id << ": "
- << error;
- })
- .then([this, state](ConnectionPool::ConnectionHandle conn) mutable {
- return _onAcquireConn(state, std::move(conn));
- })
- .onError([](Status error) -> StatusWith<RemoteCommandResponse> {
- // The TransportLayer has, for historical reasons returned SocketException for
- // network errors, but sharding assumes HostUnreachable on network errors.
- if (error == ErrorCodes::SocketException) {
- error = Status(ErrorCodes::HostUnreachable, error.reason());
- }
- return error;
- })
- .getAsync([this, state, onFinish](StatusWith<RemoteCommandResponse> response) {
- auto duration = now() - state->start;
- if (!response.isOK()) {
- onFinish(RemoteCommandResponse(response.getStatus(), duration));
- } else {
- auto rs = std::move(response.getValue());
- LOG(2) << "Request " << state->request.id << " finished with response: "
- << redact(rs.isOK() ? rs.data.toString() : rs.status.toString());
- onFinish(rs);
- }
+ // Interacting with the connection pool can involve more work than just getting a connection
+ // out. In particular, we can end up having to spin up new connections, and fulfilling promises
+ // for other requesters. Returning connections has the same issue.
+ //
+ // To work around it, we make sure to hop onto the reactor thread before getting a connection,
+ // then making sure to get back to the client thread to do the work (if on a baton). And we
+ // hook up a connection returning unique_ptr that ensures that however we exit, we always do the
+ // return on the reactor thread.
+ //
+ // TODO: get rid of this cruft once we have a connection pool that's executor aware.
+ auto connFuture = _reactor->execute([this, state, request, baton] {
+ return makeReadyFutureWith(
+ [this, request] { return _pool->get(request.target, request.timeout); })
+ .tapError([state](Status error) {
+ LOG(2) << "Failed to get connection from pool for request " << state->request.id
+ << ": " << error;
+ })
+ .then([this, baton](ConnectionPool::ConnectionHandle conn) {
+ auto deleter = conn.get_deleter();
+
+ // TODO: drop out this shared_ptr once we have a unique_function capable future
+ return std::make_shared<CommandState::ConnHandle>(
+ conn.release(), CommandState::Deleter{deleter, _reactor});
+ });
+ });
+
+ auto remainingWork = [this, state, baton, onFinish](
+ StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) {
+ makeReadyFutureWith(
+ [&] { return _onAcquireConn(state, std::move(*uassertStatusOK(swConn)), baton); })
+ .onError([](Status error) -> StatusWith<RemoteCommandResponse> {
+ // The TransportLayer has, for historical reasons returned SocketException for
+ // network errors, but sharding assumes HostUnreachable on network errors.
+ if (error == ErrorCodes::SocketException) {
+ error = Status(ErrorCodes::HostUnreachable, error.reason());
+ }
+ return error;
+ })
+ .getAsync([this, state, onFinish](StatusWith<RemoteCommandResponse> response) {
+ auto duration = now() - state->start;
+ if (!response.isOK()) {
+ onFinish(RemoteCommandResponse(response.getStatus(), duration));
+ } else {
+ const auto& rs = response.getValue();
+ LOG(2) << "Request " << state->request.id << " finished with response: "
+ << redact(rs.isOK() ? rs.data.toString() : rs.status.toString());
+ onFinish(rs);
+ }
+ });
+ };
+
+ if (baton) {
+ // If we have a baton, we want to get back to the baton thread immediately after we get a
+ // connection
+ std::move(connFuture).getAsync([
+ baton,
+ rw = std::move(remainingWork)
+ ](StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
+ baton->schedule([ rw = std::move(rw), swConn = std::move(swConn) ]() mutable {
+ std::move(rw)(std::move(swConn));
+ });
});
+ } else {
+ // otherwise we're happy to run inline
+ std::move(connFuture)
+ .getAsync([rw = std::move(remainingWork)](
+ StatusWith<std::shared_ptr<CommandState::ConnHandle>> swConn) mutable {
+ std::move(rw)(std::move(swConn));
+ });
+ }
+
return Status::OK();
}
// This is only called from within a then() callback on a future, so throwing is equivalent to
// returning a ready Future with a not-OK status.
Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
- std::shared_ptr<CommandState> state, ConnectionPool::ConnectionHandle conn) {
+ std::shared_ptr<CommandState> state,
+ CommandState::ConnHandle conn,
+ const transport::BatonHandle& baton) {
if (state->done.load()) {
conn->indicateSuccess();
uasserted(ErrorCodes::CallbackCanceled, "Command was canceled");
@@ -222,44 +270,42 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
}
state->timer = _reactor->makeTimer();
- state->timer->waitUntil(state->deadline).getAsync([client, state](Status status) {
- if (status == ErrorCodes::CallbackCanceled) {
- invariant(state->done.load());
- return;
- }
+ state->timer->waitUntil(state->deadline, baton)
+ .getAsync([client, state, baton](Status status) {
+ if (status == ErrorCodes::CallbackCanceled) {
+ invariant(state->done.load());
+ return;
+ }
- if (state->done.swap(true)) {
- return;
- }
+ if (state->done.swap(true)) {
+ return;
+ }
- LOG(2) << "Request " << state->request.id << " timed out"
- << ", deadline was " << state->deadline << ", op was "
- << redact(state->request.toString());
- state->promise.setError(
- Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timed out"));
+ LOG(2) << "Request " << state->request.id << " timed out"
+ << ", deadline was " << state->deadline << ", op was "
+ << redact(state->request.toString());
+ state->promise.setError(
+ Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timed out"));
- client->cancel();
- });
+ client->cancel(baton);
+ });
}
- client->runCommandRequest(state->request)
+ client->runCommandRequest(state->request, baton)
.then([this, state](RemoteCommandResponse response) {
if (state->done.load()) {
uasserted(ErrorCodes::CallbackCanceled, "Callback was canceled");
}
- // TODO Investigate whether this is necessary here.
- return _reactor->execute([ this, state, response = std::move(response) ]() mutable {
- if (_metadataHook && response.status.isOK()) {
- auto target = state->conn->getHostAndPort().toString();
- response.status = _metadataHook->readReplyMetadata(
- nullptr, std::move(target), response.metadata);
- }
+ if (_metadataHook && response.status.isOK()) {
+ auto target = state->conn->getHostAndPort().toString();
+ response.status =
+ _metadataHook->readReplyMetadata(nullptr, std::move(target), response.metadata);
+ }
- return std::move(response);
- });
+ return RemoteCommandResponse(std::move(response));
})
- .getAsync([this, state](StatusWith<RemoteCommandResponse> swr) {
+ .getAsync([this, state, baton](StatusWith<RemoteCommandResponse> swr) {
_eraseInUseConn(state->cbHandle);
if (!swr.isOK()) {
state->conn->indicateFailure(swr.getStatus());
@@ -273,10 +319,10 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::_onAcquireConn(
return;
if (state->timer) {
- state->timer->cancel();
+ state->timer->cancel(baton);
}
- state->promise.setWith([&] { return std::move(swr); });
+ state->promise.setFromStatusWith(std::move(swr));
});
return std::move(state->mergedFuture);
@@ -287,7 +333,8 @@ void NetworkInterfaceTL::_eraseInUseConn(const TaskExecutor::CallbackHandle& cbH
_inProgress.erase(cbHandle);
}
-void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) {
+void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle,
+ const transport::BatonHandle& baton) {
stdx::unique_lock<stdx::mutex> lk(_inProgressMutex);
auto it = _inProgress.find(cbHandle);
if (it == _inProgress.end()) {
@@ -307,17 +354,23 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan
<< redact(state->request.toString())});
if (state->conn) {
auto client = checked_cast<connection_pool_tl::TLConnection*>(state->conn.get());
- client->client()->cancel();
+ client->client()->cancel(baton);
}
}
-Status NetworkInterfaceTL::setAlarm(Date_t when, const stdx::function<void()>& action) {
+Status NetworkInterfaceTL::setAlarm(Date_t when,
+ const stdx::function<void()>& action,
+ const transport::BatonHandle& baton) {
if (inShutdown()) {
return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"};
}
if (when <= now()) {
- _reactor->schedule(transport::Reactor::kPost, std::move(action));
+ if (baton) {
+ baton->schedule(std::move(action));
+ } else {
+ _reactor->schedule(transport::Reactor::kPost, std::move(action));
+ }
return Status::OK();
}
@@ -329,32 +382,38 @@ Status NetworkInterfaceTL::setAlarm(Date_t when, const stdx::function<void()>& a
_inProgressAlarms.insert(alarmTimer);
}
- alarmTimer->waitUntil(when).getAsync([this, weakTimer, action, when](Status status) {
- auto alarmTimer = weakTimer.lock();
- if (!alarmTimer) {
- return;
- } else {
- stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
- _inProgressAlarms.erase(alarmTimer);
- }
-
- auto nowVal = now();
- if (nowVal < when) {
- warning() << "Alarm returned early. Expected at: " << when << ", fired at: " << nowVal;
- const auto status = setAlarm(when, std::move(action));
- if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) {
- fassertFailedWithStatus(50785, status);
+ alarmTimer->waitUntil(when, baton)
+ .getAsync([this, weakTimer, action, when, baton](Status status) {
+ auto alarmTimer = weakTimer.lock();
+ if (!alarmTimer) {
+ return;
+ } else {
+ stdx::lock_guard<stdx::mutex> lk(_inProgressMutex);
+ _inProgressAlarms.erase(alarmTimer);
}
- return;
- }
+ auto nowVal = now();
+ if (nowVal < when) {
+ warning() << "Alarm returned early. Expected at: " << when
+ << ", fired at: " << nowVal;
+ const auto status = setAlarm(when, std::move(action), baton);
+ if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) {
+ fassertFailedWithStatus(50785, status);
+ }
- if (status.isOK()) {
- _reactor->schedule(transport::Reactor::kPost, std::move(action));
- } else if (status != ErrorCodes::CallbackCanceled) {
- warning() << "setAlarm() received an error: " << status;
- }
- });
+ return;
+ }
+
+ if (status.isOK()) {
+ if (baton) {
+ baton->schedule(std::move(action));
+ } else {
+ _reactor->schedule(transport::Reactor::kPost, std::move(action));
+ }
+ } else if (status != ErrorCodes::CallbackCanceled) {
+ warning() << "setAlarm() received an error: " << status;
+ }
+ });
return Status::OK();
}