diff options
Diffstat (limited to 'src/mongo/client/async_client.cpp')
-rw-r--r-- | src/mongo/client/async_client.cpp | 24 |
1 files changed, 13 insertions, 11 deletions
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp index c1499fe394d..44b3d91e521 100644 --- a/src/mongo/client/async_client.cpp +++ b/src/mongo/client/async_client.cpp @@ -190,7 +190,7 @@ Future<void> AsyncDBClient::initWireVersion(const std::string& appName, }); } -Future<Message> AsyncDBClient::_call(Message request) { +Future<Message> AsyncDBClient::_call(Message request, const transport::BatonHandle& baton) { auto swm = _compressorManager.compressMessage(request); if (!swm.isOK()) { return swm.getStatus(); @@ -201,8 +201,8 @@ Future<Message> AsyncDBClient::_call(Message request) { request.header().setId(msgId); request.header().setResponseToMsgId(0); - return _session->asyncSinkMessage(request) - .then([this] { return _session->asyncSourceMessage(); }) + return _session->asyncSinkMessage(request, baton) + .then([this, baton] { return _session->asyncSourceMessage(baton); }) .then([this, msgId](Message response) -> StatusWith<Message> { uassert(50787, "ResponseId did not match sent message ID.", @@ -216,21 +216,23 @@ Future<Message> AsyncDBClient::_call(Message request) { }); } -Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request) { +Future<rpc::UniqueReply> AsyncDBClient::runCommand(OpMsgRequest request, + const transport::BatonHandle& baton) { invariant(_negotiatedProtocol); auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request)); - return _call(std::move(requestMsg)).then([this](Message response) -> Future<rpc::UniqueReply> { - return rpc::UniqueReply(response, rpc::makeReply(&response)); - }); + return _call(std::move(requestMsg), baton) + .then([this](Message response) -> Future<rpc::UniqueReply> { + return rpc::UniqueReply(response, rpc::makeReply(&response)); + }); } Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest( - executor::RemoteCommandRequest request) { + executor::RemoteCommandRequest request, const transport::BatonHandle& baton) { auto clkSource = _svcCtx->getPreciseClockSource(); auto start = clkSource->now(); auto opMsgRequest = OpMsgRequest::fromDBAndBody( std::move(request.dbname), std::move(request.cmdObj), std::move(request.metadata)); - return runCommand(std::move(opMsgRequest)) + return runCommand(std::move(opMsgRequest), baton) .then([start, clkSource, this](rpc::UniqueReply response) { auto duration = duration_cast<Milliseconds>(clkSource->now() - start); return executor::RemoteCommandResponse(*response, duration); @@ -241,8 +243,8 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest( }); } -void AsyncDBClient::cancel() { - _session->cancelAsyncOperations(); +void AsyncDBClient::cancel(const transport::BatonHandle& baton) { + _session->cancelAsyncOperations(baton); } bool AsyncDBClient::isStillConnected() { |