summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2020-03-25 11:10:52 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-03-25 16:07:17 +0000
commit6078a968b5583072503b020c03e0f386c726a8f6 (patch)
treedf1ffb8b013e5d1a03e21a25cfc44c0f50746961 /src/mongo/executor
parent0ff2d7877ee6bb2ea1e745526e85a1a1df39a592 (diff)
downloadmongo-6078a968b5583072503b020c03e0f386c726a8f6.tar.gz
Revert "SERVER-44954 Streamable RSM uses exhaust isMaster"
This reverts commit 8124a8d047ce142f6d6defc089e5e71192721a5c.
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/network_interface.h3
-rw-r--r--src/mongo/executor/network_interface_integration_fixture.cpp6
-rw-r--r--src/mongo/executor/network_interface_tl.cpp67
-rw-r--r--src/mongo/executor/network_interface_tl.h6
-rw-r--r--src/mongo/executor/remote_command_response.cpp11
-rw-r--r--src/mongo/executor/remote_command_response.h7
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp44
-rw-r--r--src/mongo/executor/thread_pool_task_executor_integration_test.cpp5
8 files changed, 57 insertions, 92 deletions
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h
index abc02a2db61..d269fb2443f 100644
--- a/src/mongo/executor/network_interface.h
+++ b/src/mongo/executor/network_interface.h
@@ -58,7 +58,8 @@ public:
using Response = RemoteCommandResponse;
using RemoteCommandCompletionFn =
unique_function<void(const TaskExecutor::ResponseOnAnyStatus&)>;
- using RemoteCommandOnReplyFn = unique_function<void(const TaskExecutor::ResponseOnAnyStatus&)>;
+ using RemoteCommandOnReplyFn =
+ unique_function<void(const TaskExecutor::ResponseOnAnyStatus&, bool isMoreToComeSet)>;
virtual ~NetworkInterface();
diff --git a/src/mongo/executor/network_interface_integration_fixture.cpp b/src/mongo/executor/network_interface_integration_fixture.cpp
index ae22469902a..66c67b9079c 100644
--- a/src/mongo/executor/network_interface_integration_fixture.cpp
+++ b/src/mongo/executor/network_interface_integration_fixture.cpp
@@ -133,16 +133,16 @@ Future<void> NetworkInterfaceIntegrationFixture::startExhaustCommand(
cbHandle,
rcroa,
[p = std::move(pf.promise), exhaustUtilCB = std::move(exhaustUtilCB)](
- const TaskExecutor::ResponseOnAnyStatus& rs) mutable {
+ const TaskExecutor::ResponseOnAnyStatus& rs, bool isMoreToComeSet) mutable {
exhaustUtilCB(rs);
if (!rs.status.isOK()) {
- invariant(!rs.moreToCome);
+ invariant(!isMoreToComeSet);
p.setError(rs.status);
return;
}
- if (!rs.moreToCome) {
+ if (!isMoreToComeSet) {
p.emplaceValue();
}
},
diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp
index da7c3bcc11a..fcae4dee869 100644
--- a/src/mongo/executor/network_interface_tl.cpp
+++ b/src/mongo/executor/network_interface_tl.cpp
@@ -874,12 +874,10 @@ auto NetworkInterfaceTL::ExhaustCommandState::make(NetworkInterfaceTL* interface
.onError([state](Status error) {
stdx::lock_guard lk(state->_onReplyMutex);
state->onReplyFn(RemoteCommandOnAnyResponse(
- boost::none, std::move(error), state->stopwatch.elapsed()));
+ boost::none, std::move(error), state->stopwatch.elapsed()),
+ false);
})
- .getAsync([state](Status status) {
- state->tryFinish(
- Status{ErrorCodes::ExhaustCommandFinished, "Exhaust command finished"});
- });
+ .getAsync([state](Status status) { state->tryFinish(status); });
{
stdx::lock_guard lk(interface->_inProgressMutex);
@@ -893,64 +891,37 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::ExhaustCommandState::sendReque
auto requestState = requestManager->getRequest(reqId);
invariant(requestState);
- auto clientCallback = [this, requestState](const RemoteCommandResponse& response) {
+ auto clientCallback = [this, requestState](const RemoteCommandResponse& response,
+ bool isMoreToComeSet) {
+ // Stash this response on the command state to be used to fulfill the promise.
+ prevResponse = response;
auto onAnyResponse = RemoteCommandOnAnyResponse(requestState->host, response);
doMetadataHook(onAnyResponse);
// If the command failed, we will call 'onReply' as a part of the future chain paired with
// the promise. This is to be sure that all error paths will run 'onReply' only once upon
// future completion.
- if (!response.status.isOK() || !getStatusFromCommandResult(response.data).isOK()) {
+ if (!getStatusFromCommandResult(response.data).isOK()) {
// The moreToCome bit should *not* be set if the command failed
- invariant(!response.moreToCome);
- return;
- }
-
- stdx::lock_guard lk(_onReplyMutex);
- onReplyFn(onAnyResponse);
- };
-
- auto& reactor = requestState->interface()->_reactor;
- handleExhaustResponseFn =
- [ this, requestState, reactor, clientCallback = std::move(clientCallback) ](
- StatusWith<RemoteCommandResponse> swResponse) mutable noexcept {
- RemoteCommandResponse response;
- if (!swResponse.isOK()) {
- response = RemoteCommandResponse(std::move(swResponse.getStatus()));
- } else {
- response = std::move(swResponse.getValue());
- }
-
- clientCallback(response);
-
- if (!response.moreToCome) {
- finalResponsePromise.emplaceValue(response);
- return;
- }
-
- if (requestState->interface()->inShutdown()) {
+ invariant(!isMoreToComeSet);
return;
}
// Reset the stopwatch to measure the correct duration for the folowing reply
stopwatch.restart();
- if (deadline != RemoteCommandRequest::kNoExpirationDate) {
- deadline = stopwatch.start() + requestOnAny.timeout;
- }
setTimer();
- requestState->client()->awaitExhaustCommand(baton).thenRunOn(reactor).getAsync(
- handleExhaustResponseFn);
- };
- setTimer();
- requestState->client()
- ->beginExhaustCommandRequest(*requestState->request, baton)
- .thenRunOn(reactor)
- .getAsync(handleExhaustResponseFn);
+ stdx::lock_guard lk(_onReplyMutex);
+ onReplyFn(onAnyResponse, isMoreToComeSet);
+ };
- auto [promise, future] = makePromiseFuture<RemoteCommandResponse>();
- finalResponsePromise = std::move(promise);
- return std::move(future).then([this](const auto& finalResponse) { return finalResponse; });
+ return makeReadyFutureWith(
+ [this, requestState, clientCallback = std::move(clientCallback)]() mutable {
+ setTimer();
+ return requestState->client()->runExhaustCommandRequest(
+ *requestState->request, std::move(clientCallback), baton);
+ })
+ .then([this, requestState] { return prevResponse; });
}
void NetworkInterfaceTL::ExhaustCommandState::fulfillFinalPromise(
diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h
index c35d9d30023..34bec18a411 100644
--- a/src/mongo/executor/network_interface_tl.h
+++ b/src/mongo/executor/network_interface_tl.h
@@ -199,10 +199,10 @@ private:
void fulfillFinalPromise(StatusWith<RemoteCommandOnAnyResponse> response) override;
Promise<void> promise;
- Promise<RemoteCommandResponse> finalResponsePromise;
- Mutex _onReplyMutex = MONGO_MAKE_LATCH("NetworkInterfaceTL::_onReplyMutex");
+ RemoteCommandResponse prevResponse;
+ Mutex _onReplyMutex =
+ MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "NetworkInterfaceTL::_onReplyMutex");
RemoteCommandOnReplyFn onReplyFn;
- std::function<void(StatusWith<RemoteCommandResponse>)> handleExhaustResponseFn;
};
enum class ConnStatus { Unset, OK, Failed };
diff --git a/src/mongo/executor/remote_command_response.cpp b/src/mongo/executor/remote_command_response.cpp
index 0b88b3d340d..8baafdb3d67 100644
--- a/src/mongo/executor/remote_command_response.cpp
+++ b/src/mongo/executor/remote_command_response.cpp
@@ -55,10 +55,8 @@ RemoteCommandResponseBase::RemoteCommandResponseBase(Status s, Milliseconds mill
invariant(!isOK());
};
-RemoteCommandResponseBase::RemoteCommandResponseBase(BSONObj dataObj,
- Milliseconds millis,
- bool moreToCome)
- : data(std::move(dataObj)), elapsedMillis(millis), moreToCome(moreToCome) {
+RemoteCommandResponseBase::RemoteCommandResponseBase(BSONObj dataObj, Milliseconds millis)
+ : data(std::move(dataObj)), elapsedMillis(millis) {
// The buffer backing the default empty BSONObj has static duration so it is effectively
// owned.
invariant(data.isOwned() || data.objdata() == BSONObj().objdata());
@@ -67,9 +65,8 @@ RemoteCommandResponseBase::RemoteCommandResponseBase(BSONObj dataObj,
// TODO(amidvidy): we currently discard output docs when we use this constructor. We should
// have RCR hold those too, but we need more machinery before that is possible.
RemoteCommandResponseBase::RemoteCommandResponseBase(const rpc::ReplyInterface& rpcReply,
- Milliseconds millis,
- bool moreToCome)
- : RemoteCommandResponseBase(rpcReply.getCommandReply(), std::move(millis), moreToCome) {}
+ Milliseconds millis)
+ : RemoteCommandResponseBase(rpcReply.getCommandReply(), std::move(millis)) {}
bool RemoteCommandResponseBase::isOK() const {
return status.isOK();
diff --git a/src/mongo/executor/remote_command_response.h b/src/mongo/executor/remote_command_response.h
index 7842ebdfe14..2cd90b37974 100644
--- a/src/mongo/executor/remote_command_response.h
+++ b/src/mongo/executor/remote_command_response.h
@@ -63,18 +63,15 @@ struct RemoteCommandResponseBase {
RemoteCommandResponseBase(Status s, Milliseconds millis);
- RemoteCommandResponseBase(BSONObj dataObj, Milliseconds millis, bool moreToCome = false);
+ RemoteCommandResponseBase(BSONObj dataObj, Milliseconds millis);
- RemoteCommandResponseBase(const rpc::ReplyInterface& rpcReply,
- Milliseconds millis,
- bool moreToCome = false);
+ RemoteCommandResponseBase(const rpc::ReplyInterface& rpcReply, Milliseconds millis);
bool isOK() const;
BSONObj data; // Always owned. May point into message.
boost::optional<Milliseconds> elapsedMillis;
Status status = Status::OK();
- bool moreToCome = false; // Whether or not the moreToCome bit is set on an exhaust message.
protected:
~RemoteCommandResponseBase() = default;
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 7b76b6853b8..10a57699045 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -690,7 +690,8 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleExhaust
auto commandStatus = _net->startExhaustCommand(
swCbHandle.getValue(),
scheduledRequest,
- [this, scheduledRequest, cbState, cb, baton](const ResponseOnAnyStatus& response) {
+ [this, scheduledRequest, cbState, cb, baton](const ResponseOnAnyStatus& response,
+ bool isMoreToComeSet) {
using std::swap;
LOGV2_DEBUG(
@@ -705,26 +706,16 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleExhaust
return;
}
- if (cbState->canceled.load()) {
- _networkInProgressQueue.erase(cbState->iter);
- return;
- }
-
// Swap the callback function with the new one
CallbackFn newCb = [cb, scheduledRequest, response](const CallbackArgs& cbData) {
remoteCommandFinished(cbData, cb, scheduledRequest, response);
};
swap(cbState->callback, newCb);
+
// If this is the last response, invoke the non-exhaust path. This will mark cbState as
// finished and remove the task from _networkInProgressQueue
- if (!response.moreToCome) {
- _networkInProgressQueue.erase(cbState->iter);
-
- WorkQueue result;
- result.emplace_front(cbState);
- result.front()->iter = result.begin();
-
- scheduleIntoPool_inlock(&result, std::move(lk));
+ if (!isMoreToComeSet) {
+ scheduleIntoPool_inlock(&_networkInProgressQueue, cbState->iter, std::move(lk));
return;
}
@@ -782,23 +773,26 @@ void ThreadPoolTaskExecutor::scheduleExhaustIntoPool_inlock(std::shared_ptr<Call
void ThreadPoolTaskExecutor::runCallbackExhaust(std::shared_ptr<CallbackState> cbState) {
CallbackHandle cbHandle;
setCallbackForHandle(&cbHandle, cbState);
- auto canceled = cbState->canceled.load();
CallbackArgs args(this,
std::move(cbHandle),
- canceled ? Status({ErrorCodes::CallbackCanceled, "Callback canceled"})
- : Status::OK());
- if (!cbState->isFinished.load()) {
- cbState->callback(std::move(args));
+ cbState->canceled.load()
+ ? Status({ErrorCodes::CallbackCanceled, "Callback canceled"})
+ : Status::OK());
+ invariant(!cbState->isFinished.load());
+ {
+ // After running callback function, clear 'cbStateArg->callback' to release any resources
+ // that might be held by this function object.
+ // Swap 'cbStateArg->callback' with temporary copy before running callback for exception
+ // safety.
+ TaskExecutor::CallbackFn callback;
+ std::swap(cbState->callback, callback);
+ callback(std::move(args));
}
// Do not mark cbState as finished. It will be marked as finished on the last reply.
stdx::lock_guard<Latch> lk(_mutex);
-
- if (cbState->exhaustIter) {
- _poolInProgressQueue.erase(cbState->exhaustIter.get());
- cbState->exhaustIter = boost::none;
- }
-
+ invariant(cbState->exhaustIter);
+ _poolInProgressQueue.erase(cbState->exhaustIter.get());
if (_inShutdown_inlock() && _poolInProgressQueue.empty()) {
_stateChange.notify_all();
}
diff --git a/src/mongo/executor/thread_pool_task_executor_integration_test.cpp b/src/mongo/executor/thread_pool_task_executor_integration_test.cpp
index f6a8ccf4f75..d5eeeac0504 100644
--- a/src/mongo/executor/thread_pool_task_executor_integration_test.cpp
+++ b/src/mongo/executor/thread_pool_task_executor_integration_test.cpp
@@ -163,6 +163,11 @@ TEST_F(TaskExecutorFixture, RunExhaustShouldReceiveMultipleResponses) {
ASSERT(cbHandle.isValid());
executor()->cancel(cbHandle);
ASSERT(cbHandle.isCanceled());
+ auto counters = exhaustRequestHandler.getCountersWhenReady();
+
+ // The command was cancelled so the 'fail' counter should be incremented
+ ASSERT_EQ(counters._success, 2);
+ ASSERT_EQ(counters._failed, 1);
// The tasks should be removed after 'isMaster' fails
ASSERT_TRUE(waitUntilNoTasksOrDeadline(Date_t::now() + Seconds(5)));