summaryrefslogtreecommitdiff
path: root/src/mongo/client/async_client.cpp
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2018-03-21 00:15:35 -0400
committerJason Carey <jcarey@argv.me>2018-04-27 19:49:28 -0400
commit4ddf18bcf4d517c3dc0f005f9222ffaab9a86ffa (patch)
tree438865c1065d0a96c427b1ed3a89e5163d85699a /src/mongo/client/async_client.cpp
parent91eaa878c4feeebd9397c49180631fc719238aaf (diff)
downloadmongo-4ddf18bcf4d517c3dc0f005f9222ffaab9a86ffa.tar.gz
SERVER-34739 Migrate to 1 connpool in ARS
Migrate to 1 connection pool in mongos. This change involves the introduction of a transport layer baton, which improves perf for a particular transport layer when doing local scatter/gather operations.
Diffstat (limited to 'src/mongo/client/async_client.cpp')
-rw-r--r--src/mongo/client/async_client.cpp24
1 files changed, 13 insertions, 11 deletions
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp
index c1499fe394d..44b3d91e521 100644
--- a/src/mongo/client/async_client.cpp
+++ b/src/mongo/client/async_client.cpp
@@ -190,7 +190,7 @@ Future<void> AsyncDBClient::initWireVersion(const std::string& appName,
});
}
-Future<Message> AsyncDBClient::_call(Message request) {
+Future<Message> AsyncDBClient::_call(Message request, const transport::BatonHandle& baton) {
auto swm = _compressorManager.compressMessage(request);
if (!swm.isOK()) {
return swm.getStatus();
@@ -201,8 +201,8 @@ Future<Message> AsyncDBClient::_call(Message request) {
request.header().setId(msgId);
request.header().setResponseToMsgId(0);
- return _session->asyncSinkMessage(request)
- .then([this] { return _session->asyncSourceMessage(); })
+ return _session->asyncSinkMessage(request, baton)
+ .then([this, baton] { return _session->asyncSourceMessage(baton); })
.then([this, msgId](Message response) -> StatusWith<Message> {
uassert(50787,
"ResponseId did not match sent message ID.",
@@ -216,21 +216,23 @@ Future<Message> AsyncDBClient::_call(Message request) {
});
}
-Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request) {
+Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request,
+ const transport::BatonHandle& baton) {
invariant(_negotiatedProtocol);
auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request));
- return _call(std::move(requestMsg)).then([this](Message response) -> Future<rpc::UniqueReply> {
- return rpc::UniqueReply(response, rpc::makeReply(&response));
- });
+ return _call(std::move(requestMsg), baton)
+ .then([this](Message response) -> Future<rpc::UniqueReply> {
+ return rpc::UniqueReply(response, rpc::makeReply(&response));
+ });
}
Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest(
- executor::RemoteCommandRequest request) {
+ executor::RemoteCommandRequest request, const transport::BatonHandle& baton) {
auto clkSource = _svcCtx->getPreciseClockSource();
auto start = clkSource->now();
auto opMsgRequest = OpMsgRequest::fromDBAndBody(
std::move(request.dbname), std::move(request.cmdObj), std::move(request.metadata));
- return runCommand(std::move(opMsgRequest))
+ return runCommand(std::move(opMsgRequest), baton)
.then([start, clkSource, this](rpc::UniqueReply response) {
auto duration = duration_cast<Milliseconds>(clkSource->now() - start);
return executor::RemoteCommandResponse(*response, duration);
@@ -241,8 +243,8 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest(
});
}
-void AsyncDBClient::cancel() {
- _session->cancelAsyncOperations();
+void AsyncDBClient::cancel(const transport::BatonHandle& baton) {
+ _session->cancelAsyncOperations(baton);
}
bool AsyncDBClient::isStillConnected() {