From 4ddf18bcf4d517c3dc0f005f9222ffaab9a86ffa Mon Sep 17 00:00:00 2001 From: Jason Carey Date: Wed, 21 Mar 2018 00:15:35 -0400 Subject: SERVER-34739 Migrate to 1 connpool in ARS Migrate to 1 connection pool in mongos. This change involves the introduction of a transport layer baton, which improves perf for a particular transport layer when doing local scatter/gather operations. --- src/mongo/executor/network_interface_tl.cpp | 227 ++++++++++++++++++---------- 1 file changed, 143 insertions(+), 84 deletions(-) (limited to 'src/mongo/executor/network_interface_tl.cpp') diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index eef2299e623..15947af2c0b 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -85,7 +85,7 @@ void NetworkInterfaceTL::startup() { auto typeFactory = std::make_unique( _reactor, _tl, std::move(_onConnectHook)); _pool = std::make_unique( - std::move(typeFactory), "NetworkInterfaceTL", _connPoolOpts); + std::move(typeFactory), std::string("NetworkInterfaceTL-") + _instanceName, _connPoolOpts); _ioThread = stdx::thread([this] { setThreadName(_instanceName); LOG(2) << "The NetworkInterfaceTL reactor thread is spinning up"; @@ -97,6 +97,7 @@ void NetworkInterfaceTL::shutdown() { _inShutdown.store(true); _reactor->stop(); _ioThread.join(); + _pool.reset(); LOG(2) << "NetworkInterfaceTL shutdown successfully"; } @@ -136,7 +137,8 @@ Date_t NetworkInterfaceTL::now() { Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHandle, RemoteCommandRequest& request, - const RemoteCommandCompletionFn& onFinish) { + const RemoteCommandCompletionFn& onFinish, + const transport::BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; } @@ -166,40 +168,86 @@ Status NetworkInterfaceTL::startCommand(const TaskExecutor::CallbackHandle& cbHa state->deadline = state->start + state->request.timeout; } - _pool->get(request.target, request.timeout) - .tapError([state](Status error) { - LOG(2) << "Failed to get connection from pool for request " << state->request.id << ": " - << error; - }) - .then([this, state](ConnectionPool::ConnectionHandle conn) mutable { - return _onAcquireConn(state, std::move(conn)); - }) - .onError([](Status error) -> StatusWith { - // The TransportLayer has, for historical reasons returned SocketException for - // network errors, but sharding assumes HostUnreachable on network errors. - if (error == ErrorCodes::SocketException) { - error = Status(ErrorCodes::HostUnreachable, error.reason()); - } - return error; - }) - .getAsync([this, state, onFinish](StatusWith response) { - auto duration = now() - state->start; - if (!response.isOK()) { - onFinish(RemoteCommandResponse(response.getStatus(), duration)); - } else { - auto rs = std::move(response.getValue()); - LOG(2) << "Request " << state->request.id << " finished with response: " - << redact(rs.isOK() ? rs.data.toString() : rs.status.toString()); - onFinish(rs); - } + // Interacting with the connection pool can involve more work than just getting a connection + // out. In particular, we can end up having to spin up new connections, and fulfilling promises + // for other requesters. Returning connections has the same issue. + // + // To work around it, we make sure to hop onto the reactor thread before getting a connection, + // then making sure to get back to the client thread to do the work (if on a baton). And we + // hook up a connection returning unique_ptr that ensures that however we exit, we always do the + // return on the reactor thread. + // + // TODO: get rid of this cruft once we have a connection pool that's executor aware. + auto connFuture = _reactor->execute([this, state, request, baton] { + return makeReadyFutureWith( + [this, request] { return _pool->get(request.target, request.timeout); }) + .tapError([state](Status error) { + LOG(2) << "Failed to get connection from pool for request " << state->request.id + << ": " << error; + }) + .then([this, baton](ConnectionPool::ConnectionHandle conn) { + auto deleter = conn.get_deleter(); + + // TODO: drop out this shared_ptr once we have a unique_function capable future + return std::make_shared( + conn.release(), CommandState::Deleter{deleter, _reactor}); + }); + }); + + auto remainingWork = [this, state, baton, onFinish]( + StatusWith> swConn) { + makeReadyFutureWith( + [&] { return _onAcquireConn(state, std::move(*uassertStatusOK(swConn)), baton); }) + .onError([](Status error) -> StatusWith { + // The TransportLayer has, for historical reasons returned SocketException for + // network errors, but sharding assumes HostUnreachable on network errors. + if (error == ErrorCodes::SocketException) { + error = Status(ErrorCodes::HostUnreachable, error.reason()); + } + return error; + }) + .getAsync([this, state, onFinish](StatusWith response) { + auto duration = now() - state->start; + if (!response.isOK()) { + onFinish(RemoteCommandResponse(response.getStatus(), duration)); + } else { + const auto& rs = response.getValue(); + LOG(2) << "Request " << state->request.id << " finished with response: " + << redact(rs.isOK() ? rs.data.toString() : rs.status.toString()); + onFinish(rs); + } + }); + }; + + if (baton) { + // If we have a baton, we want to get back to the baton thread immediately after we get a + // connection + std::move(connFuture).getAsync([ + baton, + rw = std::move(remainingWork) + ](StatusWith> swConn) mutable { + baton->schedule([ rw = std::move(rw), swConn = std::move(swConn) ]() mutable { + std::move(rw)(std::move(swConn)); + }); }); + } else { + // otherwise we're happy to run inline + std::move(connFuture) + .getAsync([rw = std::move(remainingWork)]( + StatusWith> swConn) mutable { + std::move(rw)(std::move(swConn)); + }); + } + return Status::OK(); } // This is only called from within a then() callback on a future, so throwing is equivalent to // returning a ready Future with a not-OK status. Future NetworkInterfaceTL::_onAcquireConn( - std::shared_ptr state, ConnectionPool::ConnectionHandle conn) { + std::shared_ptr state, + CommandState::ConnHandle conn, + const transport::BatonHandle& baton) { if (state->done.load()) { conn->indicateSuccess(); uasserted(ErrorCodes::CallbackCanceled, "Command was canceled"); @@ -222,44 +270,42 @@ Future NetworkInterfaceTL::_onAcquireConn( } state->timer = _reactor->makeTimer(); - state->timer->waitUntil(state->deadline).getAsync([client, state](Status status) { - if (status == ErrorCodes::CallbackCanceled) { - invariant(state->done.load()); - return; - } + state->timer->waitUntil(state->deadline, baton) + .getAsync([client, state, baton](Status status) { + if (status == ErrorCodes::CallbackCanceled) { + invariant(state->done.load()); + return; + } - if (state->done.swap(true)) { - return; - } + if (state->done.swap(true)) { + return; + } - LOG(2) << "Request " << state->request.id << " timed out" - << ", deadline was " << state->deadline << ", op was " - << redact(state->request.toString()); - state->promise.setError( - Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timed out")); + LOG(2) << "Request " << state->request.id << " timed out" + << ", deadline was " << state->deadline << ", op was " + << redact(state->request.toString()); + state->promise.setError( + Status(ErrorCodes::NetworkInterfaceExceededTimeLimit, "timed out")); - client->cancel(); - }); + client->cancel(baton); + }); } - client->runCommandRequest(state->request) + client->runCommandRequest(state->request, baton) .then([this, state](RemoteCommandResponse response) { if (state->done.load()) { uasserted(ErrorCodes::CallbackCanceled, "Callback was canceled"); } - // TODO Investigate whether this is necessary here. - return _reactor->execute([ this, state, response = std::move(response) ]() mutable { - if (_metadataHook && response.status.isOK()) { - auto target = state->conn->getHostAndPort().toString(); - response.status = _metadataHook->readReplyMetadata( - nullptr, std::move(target), response.metadata); - } + if (_metadataHook && response.status.isOK()) { + auto target = state->conn->getHostAndPort().toString(); + response.status = + _metadataHook->readReplyMetadata(nullptr, std::move(target), response.metadata); + } - return std::move(response); - }); + return RemoteCommandResponse(std::move(response)); }) - .getAsync([this, state](StatusWith swr) { + .getAsync([this, state, baton](StatusWith swr) { _eraseInUseConn(state->cbHandle); if (!swr.isOK()) { state->conn->indicateFailure(swr.getStatus()); @@ -273,10 +319,10 @@ Future NetworkInterfaceTL::_onAcquireConn( return; if (state->timer) { - state->timer->cancel(); + state->timer->cancel(baton); } - state->promise.setWith([&] { return std::move(swr); }); + state->promise.setFromStatusWith(std::move(swr)); }); return std::move(state->mergedFuture); @@ -287,7 +333,8 @@ void NetworkInterfaceTL::_eraseInUseConn(const TaskExecutor::CallbackHandle& cbH _inProgress.erase(cbHandle); } -void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle) { +void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHandle, + const transport::BatonHandle& baton) { stdx::unique_lock lk(_inProgressMutex); auto it = _inProgress.find(cbHandle); if (it == _inProgress.end()) { @@ -307,17 +354,23 @@ void NetworkInterfaceTL::cancelCommand(const TaskExecutor::CallbackHandle& cbHan << redact(state->request.toString())}); if (state->conn) { auto client = checked_cast(state->conn.get()); - client->client()->cancel(); + client->client()->cancel(baton); } } -Status NetworkInterfaceTL::setAlarm(Date_t when, const stdx::function& action) { +Status NetworkInterfaceTL::setAlarm(Date_t when, + const stdx::function& action, + const transport::BatonHandle& baton) { if (inShutdown()) { return {ErrorCodes::ShutdownInProgress, "NetworkInterface shutdown in progress"}; } if (when <= now()) { - _reactor->schedule(transport::Reactor::kPost, std::move(action)); + if (baton) { + baton->schedule(std::move(action)); + } else { + _reactor->schedule(transport::Reactor::kPost, std::move(action)); + } return Status::OK(); } @@ -329,32 +382,38 @@ Status NetworkInterfaceTL::setAlarm(Date_t when, const stdx::function& a _inProgressAlarms.insert(alarmTimer); } - alarmTimer->waitUntil(when).getAsync([this, weakTimer, action, when](Status status) { - auto alarmTimer = weakTimer.lock(); - if (!alarmTimer) { - return; - } else { - stdx::lock_guard lk(_inProgressMutex); - _inProgressAlarms.erase(alarmTimer); - } - - auto nowVal = now(); - if (nowVal < when) { - warning() << "Alarm returned early. Expected at: " << when << ", fired at: " << nowVal; - const auto status = setAlarm(when, std::move(action)); - if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) { - fassertFailedWithStatus(50785, status); + alarmTimer->waitUntil(when, baton) + .getAsync([this, weakTimer, action, when, baton](Status status) { + auto alarmTimer = weakTimer.lock(); + if (!alarmTimer) { + return; + } else { + stdx::lock_guard lk(_inProgressMutex); + _inProgressAlarms.erase(alarmTimer); } - return; - } + auto nowVal = now(); + if (nowVal < when) { + warning() << "Alarm returned early. Expected at: " << when + << ", fired at: " << nowVal; + const auto status = setAlarm(when, std::move(action), baton); + if ((!status.isOK()) && (status != ErrorCodes::ShutdownInProgress)) { + fassertFailedWithStatus(50785, status); + } - if (status.isOK()) { - _reactor->schedule(transport::Reactor::kPost, std::move(action)); - } else if (status != ErrorCodes::CallbackCanceled) { - warning() << "setAlarm() received an error: " << status; - } - }); + return; + } + + if (status.isOK()) { + if (baton) { + baton->schedule(std::move(action)); + } else { + _reactor->schedule(transport::Reactor::kPost, std::move(action)); + } + } else if (status != ErrorCodes::CallbackCanceled) { + warning() << "setAlarm() received an error: " << status; + } + }); return Status::OK(); } -- cgit v1.2.1