diff options
author | Jason Carey <jcarey@argv.me> | 2018-03-21 00:15:35 -0400 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2018-04-27 19:49:28 -0400 |
commit | 4ddf18bcf4d517c3dc0f005f9222ffaab9a86ffa (patch) | |
tree | 438865c1065d0a96c427b1ed3a89e5163d85699a /src/mongo/client/async_client.cpp | |
parent | 91eaa878c4feeebd9397c49180631fc719238aaf (diff) | |
download | mongo-4ddf18bcf4d517c3dc0f005f9222ffaab9a86ffa.tar.gz |
SERVER-34739 Migrate to 1 connpool in ARS
Migrate to 1 connection pool in mongos.
This change involves the introduction of a transport layer baton, which
improves perf for a particular transport layer when doing local
scatter/gather operations.
Diffstat (limited to 'src/mongo/client/async_client.cpp')
-rw-r--r-- | src/mongo/client/async_client.cpp | 24 |
1 files changed, 13 insertions, 11 deletions
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp index c1499fe394d..44b3d91e521 100644 --- a/src/mongo/client/async_client.cpp +++ b/src/mongo/client/async_client.cpp @@ -190,7 +190,7 @@ Future<void> AsyncDBClient::initWireVersion(const std::string& appName, }); } -Future<Message> AsyncDBClient::_call(Message request) { +Future<Message> AsyncDBClient::_call(Message request, const transport::BatonHandle& baton) { auto swm = _compressorManager.compressMessage(request); if (!swm.isOK()) { return swm.getStatus(); @@ -201,8 +201,8 @@ Future<Message> AsyncDBClient::_call(Message request) { request.header().setId(msgId); request.header().setResponseToMsgId(0); - return _session->asyncSinkMessage(request) - .then([this] { return _session->asyncSourceMessage(); }) + return _session->asyncSinkMessage(request, baton) + .then([this, baton] { return _session->asyncSourceMessage(baton); }) .then([this, msgId](Message response) -> StatusWith<Message> { uassert(50787, "ResponseId did not match sent message ID.", @@ -216,21 +216,23 @@ Future<Message> AsyncDBClient::_call(Message request) { }); } -Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request) { +Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request, + const transport::BatonHandle& baton) { invariant(_negotiatedProtocol); auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request)); - return _call(std::move(requestMsg)).then([this](Message response) -> Future<rpc::UniqueReply> { - return rpc::UniqueReply(response, rpc::makeReply(&response)); - }); + return _call(std::move(requestMsg), baton) + .then([this](Message response) -> Future<rpc::UniqueReply> { + return rpc::UniqueReply(response, rpc::makeReply(&response)); + }); } Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest( - executor::RemoteCommandRequest request) { + executor::RemoteCommandRequest request, const transport::BatonHandle& baton) { auto clkSource = _svcCtx->getPreciseClockSource(); auto start = clkSource->now(); auto opMsgRequest = OpMsgRequest::fromDBAndBody( std::move(request.dbname), std::move(request.cmdObj), std::move(request.metadata)); - return runCommand(std::move(opMsgRequest)) + return runCommand(std::move(opMsgRequest), baton) .then([start, clkSource, this](rpc::UniqueReply response) { auto duration = duration_cast<Milliseconds>(clkSource->now() - start); return executor::RemoteCommandResponse(*response, duration); @@ -241,8 +243,8 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest( }); } -void AsyncDBClient::cancel() { - _session->cancelAsyncOperations(); +void AsyncDBClient::cancel(const transport::BatonHandle& baton) { + _session->cancelAsyncOperations(baton); } bool AsyncDBClient::isStillConnected() { |