diff options
author | Jason Carey <jcarey@argv.me> | 2018-03-21 00:15:35 -0400 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2018-04-27 19:49:28 -0400 |
commit | 4ddf18bcf4d517c3dc0f005f9222ffaab9a86ffa (patch) | |
tree | 438865c1065d0a96c427b1ed3a89e5163d85699a /src/mongo/s/async_requests_sender.cpp | |
parent | 91eaa878c4feeebd9397c49180631fc719238aaf (diff) | |
download | mongo-4ddf18bcf4d517c3dc0f005f9222ffaab9a86ffa.tar.gz |
SERVER-34739 Migrate to 1 connpool in ARS
Migrate to 1 connection pool in mongos.
This change involves the introduction of a transport layer baton, which
improves perf for a particular transport layer when doing local
scatter/gather operations.
Diffstat (limited to 'src/mongo/s/async_requests_sender.cpp')
-rw-r--r-- | src/mongo/s/async_requests_sender.cpp | 109 |
1 files changed, 102 insertions, 7 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp index 5e70f3630ff..d63429ff8e9 100644 --- a/src/mongo/s/async_requests_sender.cpp +++ b/src/mongo/s/async_requests_sender.cpp @@ -33,16 +33,22 @@ #include "mongo/s/async_requests_sender.h" #include "mongo/client/remote_command_targeter.h" +#include "mongo/db/server_parameters.h" #include "mongo/executor/remote_command_request.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/stdx/memory.h" +#include "mongo/transport/baton.h" +#include "mongo/transport/transport_layer.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" namespace mongo { + +MONGO_EXPORT_SERVER_PARAMETER(AsyncRequestsSenderUseBaton, bool, true); + namespace { // Maximum number of retries for network and replication notMaster errors (per host). @@ -58,6 +64,7 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx, Shard::RetryPolicy retryPolicy) : _opCtx(opCtx), _executor(executor), + _baton(opCtx), _db(dbName.toString()), _readPreference(readPreference), _retryPolicy(retryPolicy) { @@ -71,6 +78,7 @@ AsyncRequestsSender::AsyncRequestsSender(OperationContext* opCtx, // Schedule the requests immediately. _scheduleRequests(); } + AsyncRequestsSender::~AsyncRequestsSender() { _cancelPendingRequests(); @@ -90,7 +98,7 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() { // Otherwise, wait for some response to be received. if (_interruptStatus.isOK()) { try { - _handleResponse(_responseQueue.pop(_opCtx)); + _makeProgress(_opCtx); } catch (const AssertionException& ex) { // If the operation is interrupted, we cancel outstanding requests and switch to // waiting for the (canceled) callbacks to finish without checking for interrupts. @@ -99,7 +107,7 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() { continue; } } else { - _handleResponse(_responseQueue.pop()); + _makeProgress(nullptr); } } return *readyResponse; @@ -130,6 +138,11 @@ boost::optional<AsyncRequestsSender::Response> AsyncRequestsSender::_ready() { _scheduleRequests(); } + // If we have baton requests, we want to process those before proceeding + if (_batonRequests) { + return boost::none; + } + // Check if any remote is ready. invariant(!_remotes.empty()); for (auto& remote : _remotes) { @@ -200,6 +213,12 @@ void AsyncRequestsSender::_scheduleRequests() { auto scheduleStatus = _scheduleRequest(i); if (!scheduleStatus.isOK()) { remote.swResponse = std::move(scheduleStatus); + + if (_baton) { + _batonRequests++; + _baton->schedule([this] { _batonRequests--; }); + } + // Push a noop response to the queue to indicate that a remote is ready for // re-processing due to failure. _responseQueue.push(boost::none); @@ -214,7 +233,7 @@ Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) { invariant(!remote.cbHandle.isValid()); invariant(!remote.swResponse); - Status resolveStatus = remote.resolveShardIdToHostAndPort(_readPreference); + Status resolveStatus = remote.resolveShardIdToHostAndPort(this, _readPreference); if (!resolveStatus.isOK()) { return resolveStatus; } @@ -225,8 +244,14 @@ Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) { auto callbackStatus = _executor->scheduleRemoteCommand( request, [remoteIndex, this](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) { + if (_baton) { + _batonRequests++; + _baton->schedule([this] { _batonRequests--; }); + } + _responseQueue.push(Job{cbData, remoteIndex}); - }); + }, + _baton); if (!callbackStatus.isOK()) { return callbackStatus.getStatus(); } @@ -235,7 +260,21 @@ Status AsyncRequestsSender::_scheduleRequest(size_t remoteIndex) { return Status::OK(); } -void AsyncRequestsSender::_handleResponse(boost::optional<Job> job) { +void AsyncRequestsSender::_makeProgress(OperationContext* opCtx) { + boost::optional<Job> job; + + if (_baton) { + // If we're using a baton, we peek the queue, and block on the baton if it's empty + if (boost::optional<boost::optional<Job>> tryJob = _responseQueue.tryPop()) { + job = std::move(*tryJob); + } else { + _baton->run(_opCtx, boost::none); + } + } else { + // Otherwise we block on the queue + job = _opCtx ? _responseQueue.pop(_opCtx) : _responseQueue.pop(); + } + if (!job) { return; } @@ -274,14 +313,57 @@ AsyncRequestsSender::RemoteData::RemoteData(ShardId shardId, BSONObj cmdObj) : shardId(std::move(shardId)), cmdObj(std::move(cmdObj)) {} Status AsyncRequestsSender::RemoteData::resolveShardIdToHostAndPort( - const ReadPreferenceSetting& readPref) { + AsyncRequestsSender* ars, const ReadPreferenceSetting& readPref) { const auto shard = getShard(); if (!shard) { return Status(ErrorCodes::ShardNotFound, str::stream() << "Could not find shard " << shardId); } - auto findHostStatus = shard->getTargeter()->findHostWithMaxWait(readPref, Seconds{20}); + auto clock = ars->_opCtx->getServiceContext()->getFastClockSource(); + + auto deadline = clock->now() + Seconds(20); + + auto targeter = shard->getTargeter(); + + auto findHostStatus = [&] { + // If we don't have a baton, just go ahead and block in targeting + if (!ars->_baton) { + return targeter->findHostWithMaxWait(readPref, Seconds{20}); + } + + // If we do have a baton, and we can target quickly, just do that + { + auto findHostStatus = targeter->findHostNoWait(readPref); + if (findHostStatus.isOK()) { + return findHostStatus; + } + } + + // If it's going to take a while to target, we spin up a background thread to do our + // targeting, while running the baton on the calling thread. This allows us to make forward + // progress on previous requests. + Promise<HostAndPort> promise; + auto future = promise.getFuture(); + + ars->_batonRequests++; + stdx::thread bgChecker([&] { + promise.setWith( + [&] { return targeter->findHostWithMaxWait(readPref, deadline - clock->now()); }); + + ars->_baton->schedule([ars] { ars->_batonRequests--; }); + }); + const auto guard = MakeGuard([&] { bgChecker.join(); }); + + while (!future.isReady()) { + if (!ars->_baton->run(nullptr, deadline)) { + break; + } + } + + return future.getNoThrow(); + }(); + if (!findHostStatus.isOK()) { return findHostStatus.getStatus(); } @@ -296,4 +378,17 @@ std::shared_ptr<Shard> AsyncRequestsSender::RemoteData::getShard() { return grid.shardRegistry()->getShardNoReload(shardId); } +AsyncRequestsSender::BatonDetacher::BatonDetacher(OperationContext* opCtx) + : _baton(AsyncRequestsSenderUseBaton.load() + ? (opCtx->getServiceContext()->getTransportLayer() + ? opCtx->getServiceContext()->getTransportLayer()->makeBaton(opCtx) + : nullptr) + : nullptr) {} + +AsyncRequestsSender::BatonDetacher::~BatonDetacher() { + if (_baton) { + _baton->detach(); + } +} + } // namespace mongo |