summaryrefslogtreecommitdiff
path: root/src/mongo/client
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2018-03-21 00:15:35 -0400
committerJason Carey <jcarey@argv.me>2018-04-27 19:49:28 -0400
commit4ddf18bcf4d517c3dc0f005f9222ffaab9a86ffa (patch)
tree438865c1065d0a96c427b1ed3a89e5163d85699a /src/mongo/client
parent91eaa878c4feeebd9397c49180631fc719238aaf (diff)
downloadmongo-4ddf18bcf4d517c3dc0f005f9222ffaab9a86ffa.tar.gz
SERVER-34739 Migrate to 1 connpool in ARS
Migrate to 1 connection pool in mongos. This change involves the introduction of a transport layer baton, which improves perf for a particular transport layer when doing local scatter/gather operations.
Diffstat (limited to 'src/mongo/client')
-rw-r--r--src/mongo/client/async_client.cpp24
-rw-r--r--src/mongo/client/async_client.h10
-rw-r--r--src/mongo/client/remote_command_retry_scheduler_test.cpp6
3 files changed, 23 insertions, 17 deletions
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp
index c1499fe394d..44b3d91e521 100644
--- a/src/mongo/client/async_client.cpp
+++ b/src/mongo/client/async_client.cpp
@@ -190,7 +190,7 @@ Future<void> AsyncDBClient::initWireVersion(const std::string& appName,
});
}
-Future<Message> AsyncDBClient::_call(Message request) {
+Future<Message> AsyncDBClient::_call(Message request, const transport::BatonHandle& baton) {
auto swm = _compressorManager.compressMessage(request);
if (!swm.isOK()) {
return swm.getStatus();
@@ -201,8 +201,8 @@ Future<Message> AsyncDBClient::_call(Message request) {
request.header().setId(msgId);
request.header().setResponseToMsgId(0);
- return _session->asyncSinkMessage(request)
- .then([this] { return _session->asyncSourceMessage(); })
+ return _session->asyncSinkMessage(request, baton)
+ .then([this, baton] { return _session->asyncSourceMessage(baton); })
.then([this, msgId](Message response) -> StatusWith<Message> {
uassert(50787,
"ResponseId did not match sent message ID.",
@@ -216,21 +216,23 @@ Future<Message> AsyncDBClient::_call(Message request) {
});
}
-Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request) {
+Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request,
+ const transport::BatonHandle& baton) {
invariant(_negotiatedProtocol);
auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request));
- return _call(std::move(requestMsg)).then([this](Message response) -> Future<rpc::UniqueReply> {
- return rpc::UniqueReply(response, rpc::makeReply(&response));
- });
+ return _call(std::move(requestMsg), baton)
+ .then([this](Message response) -> Future<rpc::UniqueReply> {
+ return rpc::UniqueReply(response, rpc::makeReply(&response));
+ });
}
Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest(
- executor::RemoteCommandRequest request) {
+ executor::RemoteCommandRequest request, const transport::BatonHandle& baton) {
auto clkSource = _svcCtx->getPreciseClockSource();
auto start = clkSource->now();
auto opMsgRequest = OpMsgRequest::fromDBAndBody(
std::move(request.dbname), std::move(request.cmdObj), std::move(request.metadata));
- return runCommand(std::move(opMsgRequest))
+ return runCommand(std::move(opMsgRequest), baton)
.then([start, clkSource, this](rpc::UniqueReply response) {
auto duration = duration_cast<Milliseconds>(clkSource->now() - start);
return executor::RemoteCommandResponse(*response, duration);
@@ -241,8 +243,8 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest(
});
}
-void AsyncDBClient::cancel() {
- _session->cancelAsyncOperations();
+void AsyncDBClient::cancel(const transport::BatonHandle& baton) {
+ _session->cancelAsyncOperations(baton);
}
bool AsyncDBClient::isStillConnected() {
diff --git a/src/mongo/client/async_client.h b/src/mongo/client/async_client.h
index ee7239a1d03..ec34dca021e 100644
--- a/src/mongo/client/async_client.h
+++ b/src/mongo/client/async_client.h
@@ -36,6 +36,7 @@
#include "mongo/executor/remote_command_response.h"
#include "mongo/rpc/protocol.h"
#include "mongo/rpc/unique_message.h"
+#include "mongo/transport/baton.h"
#include "mongo/transport/message_compressor_manager.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/util/future.h"
@@ -57,15 +58,16 @@ public:
transport::ReactorHandle reactor);
Future<executor::RemoteCommandResponse> runCommandRequest(
- executor::RemoteCommandRequest request);
- Future<rpc::UniqueReply> runCommand(OpMsgRequest request);
+ executor::RemoteCommandRequest request, const transport::BatonHandle& baton = nullptr);
+ Future<rpc::UniqueReply> runCommand(OpMsgRequest request,
+ const transport::BatonHandle& baton = nullptr);
Future<void> authenticate(const BSONObj& params);
Future<void> initWireVersion(const std::string& appName,
executor::NetworkConnectionHook* const hook);
- void cancel();
+ void cancel(const transport::BatonHandle& baton = nullptr);
bool isStillConnected();
@@ -75,7 +77,7 @@ public:
const HostAndPort& local() const;
private:
- Future<Message> _call(Message request);
+ Future<Message> _call(Message request, const transport::BatonHandle& baton = nullptr);
BSONObj _buildIsMasterRequest(const std::string& appName);
void _parseIsMasterResponse(BSONObj request,
const std::unique_ptr<rpc::ReplyInterface>& response);
diff --git a/src/mongo/client/remote_command_retry_scheduler_test.cpp b/src/mongo/client/remote_command_retry_scheduler_test.cpp
index dca50732e93..a3c23c6b05d 100644
--- a/src/mongo/client/remote_command_retry_scheduler_test.cpp
+++ b/src/mongo/client/remote_command_retry_scheduler_test.cpp
@@ -90,12 +90,14 @@ public:
TaskExecutorWithFailureInScheduleRemoteCommand(executor::TaskExecutor* executor)
: unittest::TaskExecutorProxy(executor) {}
virtual StatusWith<executor::TaskExecutor::CallbackHandle> scheduleRemoteCommand(
- const executor::RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb) override {
+ const executor::RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const transport::BatonHandle& baton = nullptr) override {
if (scheduleRemoteCommandFailPoint) {
return Status(ErrorCodes::ShutdownInProgress,
"failed to send remote command - shutdown in progress");
}
- return getExecutor()->scheduleRemoteCommand(request, cb);
+ return getExecutor()->scheduleRemoteCommand(request, cb, baton);
}
bool scheduleRemoteCommandFailPoint = false;