diff options
author | jannaerin <golden.janna@gmail.com> | 2020-03-25 11:10:52 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-03-25 16:07:17 +0000 |
commit | 6078a968b5583072503b020c03e0f386c726a8f6 (patch) | |
tree | df1ffb8b013e5d1a03e21a25cfc44c0f50746961 /src/mongo/client/async_client.cpp | |
parent | 0ff2d7877ee6bb2ea1e745526e85a1a1df39a592 (diff) | |
download | mongo-6078a968b5583072503b020c03e0f386c726a8f6.tar.gz |
Revert "SERVER-44954 Streamable RSM uses exhaust isMaster"
This reverts commit 8124a8d047ce142f6d6defc089e5e71192721a5c.
Diffstat (limited to 'src/mongo/client/async_client.cpp')
-rw-r--r-- | src/mongo/client/async_client.cpp | 52 |
1 files changed, 33 insertions, 19 deletions
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp index f25830de073..99d93dab68a 100644 --- a/src/mongo/client/async_client.cpp +++ b/src/mongo/client/async_client.cpp @@ -316,41 +316,55 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest( }); } -Future<executor::RemoteCommandResponse> AsyncDBClient::_continueReceiveExhaustResponse( - ClockSource::StopWatch stopwatch, boost::optional<int32_t> msgId, const BatonHandle& baton) { +Future<void> AsyncDBClient::_continueReceiveExhaustResponse( + ExhaustRequestParameters&& exhaustRequestParameters, + boost::optional<int32_t> msgId, + const BatonHandle& baton) { return _waitForResponse(msgId, baton) - .then([stopwatch, msgId, baton, this](Message responseMsg) mutable { + .then([exhaustParameters = std::move(exhaustRequestParameters), msgId, baton, this]( + Message responseMsg) mutable -> Future<void> { + // Run callback + auto now = exhaustParameters.clkSource->now(); + auto duration = duration_cast<Milliseconds>(now - exhaustParameters.start); bool isMoreToComeSet = OpMsg::isFlagSet(responseMsg, OpMsg::kMoreToCome); rpc::UniqueReply response = rpc::UniqueReply(responseMsg, rpc::makeReply(&responseMsg)); - auto rcResponse = executor::RemoteCommandResponse( - *response, duration_cast<Milliseconds>(stopwatch.elapsed()), isMoreToComeSet); - return rcResponse; - }); -} + exhaustParameters.cb(executor::RemoteCommandResponse(*response, duration), + isMoreToComeSet); -Future<executor::RemoteCommandResponse> AsyncDBClient::awaitExhaustCommand( - const BatonHandle& baton) { - return _continueReceiveExhaustResponse(ClockSource::StopWatch(), boost::none, baton); + if (!isMoreToComeSet) { + return Status::OK(); + } + + exhaustParameters.start = now; + return _continueReceiveExhaustResponse( + std::move(exhaustParameters), boost::none, baton); + }); } -Future<executor::RemoteCommandResponse> AsyncDBClient::runExhaustCommand(OpMsgRequest request, - const BatonHandle& baton) { +Future<void> AsyncDBClient::runExhaustCommand(OpMsgRequest request, + RemoteCommandCallbackFn&& cb, + const BatonHandle& baton) { invariant(_negotiatedProtocol); auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request)); OpMsg::setFlag(&requestMsg, OpMsg::kExhaustSupported); + auto clkSource = _svcCtx->getPreciseClockSource(); + auto start = clkSource->now(); auto msgId = nextMessageId(); - return _call(std::move(requestMsg), msgId, baton).then([msgId, baton, this]() mutable { - return _continueReceiveExhaustResponse(ClockSource::StopWatch(), msgId, baton); - }); + return _call(std::move(requestMsg), msgId, baton) + .then([msgId, baton, cb = std::move(cb), clkSource, start, this]() mutable { + ExhaustRequestParameters exhaustParameters{std::move(cb), clkSource, start}; + return _continueReceiveExhaustResponse(std::move(exhaustParameters), msgId, baton); + }); } -Future<executor::RemoteCommandResponse> AsyncDBClient::beginExhaustCommandRequest( - executor::RemoteCommandRequest request, const BatonHandle& baton) { +Future<void> AsyncDBClient::runExhaustCommandRequest(executor::RemoteCommandRequest request, + RemoteCommandCallbackFn&& cb, + const BatonHandle& baton) { auto opMsgRequest = OpMsgRequest::fromDBAndBody( std::move(request.dbname), std::move(request.cmdObj), std::move(request.metadata)); - return runExhaustCommand(std::move(opMsgRequest), baton); + return runExhaustCommand(std::move(opMsgRequest), std::move(cb), baton); } void AsyncDBClient::cancel(const BatonHandle& baton) { |