diff options
author | Jason Carey <jcarey@argv.me> | 2018-03-21 00:15:35 -0400 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2018-04-27 19:49:28 -0400 |
commit | 4ddf18bcf4d517c3dc0f005f9222ffaab9a86ffa (patch) | |
tree | 438865c1065d0a96c427b1ed3a89e5163d85699a /src/mongo/client | |
parent | 91eaa878c4feeebd9397c49180631fc719238aaf (diff) | |
download | mongo-4ddf18bcf4d517c3dc0f005f9222ffaab9a86ffa.tar.gz |
SERVER-34739 Migrate to 1 connpool in ARS
Migrate to 1 connection pool in mongos.
This change involves the introduction of a transport layer baton, which
improves perf for a particular transport layer when doing local
scatter/gather operations.
Diffstat (limited to 'src/mongo/client')
-rw-r--r-- | src/mongo/client/async_client.cpp | 24 | ||||
-rw-r--r-- | src/mongo/client/async_client.h | 10 | ||||
-rw-r--r-- | src/mongo/client/remote_command_retry_scheduler_test.cpp | 6 |
3 files changed, 23 insertions, 17 deletions
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp index c1499fe394d..44b3d91e521 100644 --- a/src/mongo/client/async_client.cpp +++ b/src/mongo/client/async_client.cpp @@ -190,7 +190,7 @@ Future<void> AsyncDBClient::initWireVersion(const std::string& appName, }); } -Future<Message> AsyncDBClient::_call(Message request) { +Future<Message> AsyncDBClient::_call(Message request, const transport::BatonHandle& baton) { auto swm = _compressorManager.compressMessage(request); if (!swm.isOK()) { return swm.getStatus(); @@ -201,8 +201,8 @@ Future<Message> AsyncDBClient::_call(Message request) { request.header().setId(msgId); request.header().setResponseToMsgId(0); - return _session->asyncSinkMessage(request) - .then([this] { return _session->asyncSourceMessage(); }) + return _session->asyncSinkMessage(request, baton) + .then([this, baton] { return _session->asyncSourceMessage(baton); }) .then([this, msgId](Message response) -> StatusWith<Message> { uassert(50787, "ResponseId did not match sent message ID.", @@ -216,21 +216,23 @@ Future<Message> AsyncDBClient::_call(Message request) { }); } -Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request) { +Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request, + const transport::BatonHandle& baton) { invariant(_negotiatedProtocol); auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request)); - return _call(std::move(requestMsg)).then([this](Message response) -> Future<rpc::UniqueReply> { - return rpc::UniqueReply(response, rpc::makeReply(&response)); - }); + return _call(std::move(requestMsg), baton) + .then([this](Message response) -> Future<rpc::UniqueReply> { + return rpc::UniqueReply(response, rpc::makeReply(&response)); + }); } Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest( - executor::RemoteCommandRequest request) { + executor::RemoteCommandRequest request, const transport::BatonHandle& baton) { auto clkSource = _svcCtx->getPreciseClockSource(); auto start = clkSource->now(); auto opMsgRequest = OpMsgRequest::fromDBAndBody( std::move(request.dbname), std::move(request.cmdObj), std::move(request.metadata)); - return runCommand(std::move(opMsgRequest)) + return runCommand(std::move(opMsgRequest), baton) .then([start, clkSource, this](rpc::UniqueReply response) { auto duration = duration_cast<Milliseconds>(clkSource->now() - start); return executor::RemoteCommandResponse(*response, duration); @@ -241,8 +243,8 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest( }); } -void AsyncDBClient::cancel() { - _session->cancelAsyncOperations(); +void AsyncDBClient::cancel(const transport::BatonHandle& baton) { + _session->cancelAsyncOperations(baton); } bool AsyncDBClient::isStillConnected() { diff --git a/src/mongo/client/async_client.h b/src/mongo/client/async_client.h index ee7239a1d03..ec34dca021e 100644 --- a/src/mongo/client/async_client.h +++ b/src/mongo/client/async_client.h @@ -36,6 +36,7 @@ #include "mongo/executor/remote_command_response.h" #include "mongo/rpc/protocol.h" #include "mongo/rpc/unique_message.h" +#include "mongo/transport/baton.h" #include "mongo/transport/message_compressor_manager.h" #include "mongo/transport/transport_layer.h" #include "mongo/util/future.h" @@ -57,15 +58,16 @@ public: transport::ReactorHandle reactor); Future<executor::RemoteCommandResponse> runCommandRequest( - executor::RemoteCommandRequest request); - Future<rpc::UniqueReply> runCommand(OpMsgRequest request); + executor::RemoteCommandRequest request, const transport::BatonHandle& baton = nullptr); + Future<rpc::UniqueReply> runCommand(OpMsgRequest request, + const transport::BatonHandle& baton = nullptr); Future<void> authenticate(const BSONObj& params); Future<void> initWireVersion(const std::string& appName, executor::NetworkConnectionHook* const hook); - void cancel(); + void cancel(const transport::BatonHandle& baton = nullptr); bool isStillConnected(); @@ -75,7 +77,7 @@ public: const HostAndPort& local() const; private: - Future<Message> _call(Message request); + Future<Message> _call(Message request, const transport::BatonHandle& baton = nullptr); BSONObj _buildIsMasterRequest(const std::string& appName); void _parseIsMasterResponse(BSONObj request, const std::unique_ptr<rpc::ReplyInterface>& response); diff --git a/src/mongo/client/remote_command_retry_scheduler_test.cpp b/src/mongo/client/remote_command_retry_scheduler_test.cpp index dca50732e93..a3c23c6b05d 100644 --- a/src/mongo/client/remote_command_retry_scheduler_test.cpp +++ b/src/mongo/client/remote_command_retry_scheduler_test.cpp @@ -90,12 +90,14 @@ public: TaskExecutorWithFailureInScheduleRemoteCommand(executor::TaskExecutor* executor) : unittest::TaskExecutorProxy(executor) {} virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleRemoteCommand( - const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) override { + const executor::RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const transport::BatonHandle& baton = nullptr) override { if (scheduleRemoteCommandFailPoint) { return Status(ErrorCodes::ShutdownInProgress, "failed to send remote command - shutdown in progress"); } - return getExecutor()->scheduleRemoteCommand(request, cb); + return getExecutor()->scheduleRemoteCommand(request, cb, baton); } bool scheduleRemoteCommandFailPoint = false; |