summaryrefslogtreecommitdiff
path: root/src/mongo/client/async_client.cpp
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2020-03-25 11:10:52 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-03-25 16:07:17 +0000
commit6078a968b5583072503b020c03e0f386c726a8f6 (patch)
treedf1ffb8b013e5d1a03e21a25cfc44c0f50746961 /src/mongo/client/async_client.cpp
parent0ff2d7877ee6bb2ea1e745526e85a1a1df39a592 (diff)
downloadmongo-6078a968b5583072503b020c03e0f386c726a8f6.tar.gz
Revert "SERVER-44954 Streamable RSM uses exhaust isMaster"
This reverts commit 8124a8d047ce142f6d6defc089e5e71192721a5c.
Diffstat (limited to 'src/mongo/client/async_client.cpp')
-rw-r--r--src/mongo/client/async_client.cpp52
1 files changed, 33 insertions, 19 deletions
diff --git a/src/mongo/client/async_client.cpp b/src/mongo/client/async_client.cpp
index f25830de073..99d93dab68a 100644
--- a/src/mongo/client/async_client.cpp
+++ b/src/mongo/client/async_client.cpp
@@ -316,41 +316,55 @@ Future<executor::RemoteCommandResponse> AsyncDBClient::runCommandRequest(
});
}
-Future<executor::RemoteCommandResponse> AsyncDBClient::_continueReceiveExhaustResponse(
- ClockSource::StopWatch stopwatch, boost::optional<int32_t> msgId, const BatonHandle& baton) {
+Future<void> AsyncDBClient::_continueReceiveExhaustResponse(
+ ExhaustRequestParameters&& exhaustRequestParameters,
+ boost::optional<int32_t> msgId,
+ const BatonHandle& baton) {
return _waitForResponse(msgId, baton)
- .then([stopwatch, msgId, baton, this](Message responseMsg) mutable {
+ .then([exhaustParameters = std::move(exhaustRequestParameters), msgId, baton, this](
+ Message responseMsg) mutable -> Future<void> {
+ // Run callback
+ auto now = exhaustParameters.clkSource->now();
+ auto duration = duration_cast<Milliseconds>(now - exhaustParameters.start);
bool isMoreToComeSet = OpMsg::isFlagSet(responseMsg, OpMsg::kMoreToCome);
rpc::UniqueReply response = rpc::UniqueReply(responseMsg, rpc::makeReply(&responseMsg));
- auto rcResponse = executor::RemoteCommandResponse(
- *response, duration_cast<Milliseconds>(stopwatch.elapsed()), isMoreToComeSet);
- return rcResponse;
- });
-}
+ exhaustParameters.cb(executor::RemoteCommandResponse(*response, duration),
+ isMoreToComeSet);
-Future<executor::RemoteCommandResponse> AsyncDBClient::awaitExhaustCommand(
- const BatonHandle& baton) {
- return _continueReceiveExhaustResponse(ClockSource::StopWatch(), boost::none, baton);
+ if (!isMoreToComeSet) {
+ return Status::OK();
+ }
+
+ exhaustParameters.start = now;
+ return _continueReceiveExhaustResponse(
+ std::move(exhaustParameters), boost::none, baton);
+ });
}
-Future<executor::RemoteCommandResponse> AsyncDBClient::runExhaustCommand(OpMsgRequest request,
- const BatonHandle& baton) {
+Future<void> AsyncDBClient::runExhaustCommand(OpMsgRequest request,
+ RemoteCommandCallbackFn&& cb,
+ const BatonHandle& baton) {
invariant(_negotiatedProtocol);
auto requestMsg = rpc::messageFromOpMsgRequest(*_negotiatedProtocol, std::move(request));
OpMsg::setFlag(&requestMsg, OpMsg::kExhaustSupported);
+ auto clkSource = _svcCtx->getPreciseClockSource();
+ auto start = clkSource->now();
auto msgId = nextMessageId();
- return _call(std::move(requestMsg), msgId, baton).then([msgId, baton, this]() mutable {
- return _continueReceiveExhaustResponse(ClockSource::StopWatch(), msgId, baton);
- });
+ return _call(std::move(requestMsg), msgId, baton)
+ .then([msgId, baton, cb = std::move(cb), clkSource, start, this]() mutable {
+ ExhaustRequestParameters exhaustParameters{std::move(cb), clkSource, start};
+ return _continueReceiveExhaustResponse(std::move(exhaustParameters), msgId, baton);
+ });
}
-Future<executor::RemoteCommandResponse> AsyncDBClient::beginExhaustCommandRequest(
- executor::RemoteCommandRequest request, const BatonHandle& baton) {
+Future<void> AsyncDBClient::runExhaustCommandRequest(executor::RemoteCommandRequest request,
+ RemoteCommandCallbackFn&& cb,
+ const BatonHandle& baton) {
auto opMsgRequest = OpMsgRequest::fromDBAndBody(
std::move(request.dbname), std::move(request.cmdObj), std::move(request.metadata));
- return runExhaustCommand(std::move(opMsgRequest), baton);
+ return runExhaustCommand(std::move(opMsgRequest), std::move(cb), baton);
}
void AsyncDBClient::cancel(const BatonHandle& baton) {