diff options
author | jannaerin <golden.janna@gmail.com> | 2020-03-25 11:10:52 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-03-25 16:07:17 +0000 |
commit | 6078a968b5583072503b020c03e0f386c726a8f6 (patch) | |
tree | df1ffb8b013e5d1a03e21a25cfc44c0f50746961 /src/mongo/executor | |
parent | 0ff2d7877ee6bb2ea1e745526e85a1a1df39a592 (diff) | |
download | mongo-6078a968b5583072503b020c03e0f386c726a8f6.tar.gz |
Revert "SERVER-44954 Streamable RSM uses exhaust isMaster"
This reverts commit 8124a8d047ce142f6d6defc089e5e71192721a5c.
Diffstat (limited to 'src/mongo/executor')
-rw-r--r-- | src/mongo/executor/network_interface.h | 3 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_integration_fixture.cpp | 6 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.cpp | 67 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_tl.h | 6 | ||||
-rw-r--r-- | src/mongo/executor/remote_command_response.cpp | 11 | ||||
-rw-r--r-- | src/mongo/executor/remote_command_response.h | 7 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.cpp | 44 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor_integration_test.cpp | 5 |
8 files changed, 57 insertions, 92 deletions
diff --git a/src/mongo/executor/network_interface.h b/src/mongo/executor/network_interface.h index abc02a2db61..d269fb2443f 100644 --- a/src/mongo/executor/network_interface.h +++ b/src/mongo/executor/network_interface.h @@ -58,7 +58,8 @@ public: using Response = RemoteCommandResponse; using RemoteCommandCompletionFn = unique_function<void(const TaskExecutor::ResponseOnAnyStatus&)>; - using RemoteCommandOnReplyFn = unique_function<void(const TaskExecutor::ResponseOnAnyStatus&)>; + using RemoteCommandOnReplyFn = + unique_function<void(const TaskExecutor::ResponseOnAnyStatus&, bool isMoreToComeSet)>; virtual ~NetworkInterface(); diff --git a/src/mongo/executor/network_interface_integration_fixture.cpp b/src/mongo/executor/network_interface_integration_fixture.cpp index ae22469902a..66c67b9079c 100644 --- a/src/mongo/executor/network_interface_integration_fixture.cpp +++ b/src/mongo/executor/network_interface_integration_fixture.cpp @@ -133,16 +133,16 @@ Future<void> NetworkInterfaceIntegrationFixture::startExhaustCommand( cbHandle, rcroa, [p = std::move(pf.promise), exhaustUtilCB = std::move(exhaustUtilCB)]( - const TaskExecutor::ResponseOnAnyStatus& rs) mutable { + const TaskExecutor::ResponseOnAnyStatus& rs, bool isMoreToComeSet) mutable { exhaustUtilCB(rs); if (!rs.status.isOK()) { - invariant(!rs.moreToCome); + invariant(!isMoreToComeSet); p.setError(rs.status); return; } - if (!rs.moreToCome) { + if (!isMoreToComeSet) { p.emplaceValue(); } }, diff --git a/src/mongo/executor/network_interface_tl.cpp b/src/mongo/executor/network_interface_tl.cpp index da7c3bcc11a..fcae4dee869 100644 --- a/src/mongo/executor/network_interface_tl.cpp +++ b/src/mongo/executor/network_interface_tl.cpp @@ -874,12 +874,10 @@ auto NetworkInterfaceTL::ExhaustCommandState::make(NetworkInterfaceTL* interface .onError([state](Status error) { stdx::lock_guard lk(state->_onReplyMutex); state->onReplyFn(RemoteCommandOnAnyResponse( - boost::none, std::move(error), state->stopwatch.elapsed())); + boost::none, std::move(error), state->stopwatch.elapsed()), + false); }) - .getAsync([state](Status status) { - state->tryFinish( - Status{ErrorCodes::ExhaustCommandFinished, "Exhaust command finished"}); - }); + .getAsync([state](Status status) { state->tryFinish(status); }); { stdx::lock_guard lk(interface->_inProgressMutex); @@ -893,64 +891,37 @@ Future<RemoteCommandResponse> NetworkInterfaceTL::ExhaustCommandState::sendReque auto requestState = requestManager->getRequest(reqId); invariant(requestState); - auto clientCallback = [this, requestState](const RemoteCommandResponse& response) { + auto clientCallback = [this, requestState](const RemoteCommandResponse& response, + bool isMoreToComeSet) { + // Stash this response on the command state to be used to fulfill the promise. + prevResponse = response; auto onAnyResponse = RemoteCommandOnAnyResponse(requestState->host, response); doMetadataHook(onAnyResponse); // If the command failed, we will call 'onReply' as a part of the future chain paired with // the promise. This is to be sure that all error paths will run 'onReply' only once upon // future completion. - if (!response.status.isOK() || !getStatusFromCommandResult(response.data).isOK()) { + if (!getStatusFromCommandResult(response.data).isOK()) { // The moreToCome bit should *not* be set if the command failed - invariant(!response.moreToCome); - return; - } - - stdx::lock_guard lk(_onReplyMutex); - onReplyFn(onAnyResponse); - }; - - auto& reactor = requestState->interface()->_reactor; - handleExhaustResponseFn = - [ this, requestState, reactor, clientCallback = std::move(clientCallback) ]( - StatusWith<RemoteCommandResponse> swResponse) mutable noexcept { - RemoteCommandResponse response; - if (!swResponse.isOK()) { - response = RemoteCommandResponse(std::move(swResponse.getStatus())); - } else { - response = std::move(swResponse.getValue()); - } - - clientCallback(response); - - if (!response.moreToCome) { - finalResponsePromise.emplaceValue(response); - return; - } - - if (requestState->interface()->inShutdown()) { + invariant(!isMoreToComeSet); return; } // Reset the stopwatch to measure the correct duration for the folowing reply stopwatch.restart(); - if (deadline != RemoteCommandRequest::kNoExpirationDate) { - deadline = stopwatch.start() + requestOnAny.timeout; - } setTimer(); - requestState->client()->awaitExhaustCommand(baton).thenRunOn(reactor).getAsync( - handleExhaustResponseFn); - }; - setTimer(); - requestState->client() - ->beginExhaustCommandRequest(*requestState->request, baton) - .thenRunOn(reactor) - .getAsync(handleExhaustResponseFn); + stdx::lock_guard lk(_onReplyMutex); + onReplyFn(onAnyResponse, isMoreToComeSet); + }; - auto [promise, future] = makePromiseFuture<RemoteCommandResponse>(); - finalResponsePromise = std::move(promise); - return std::move(future).then([this](const auto& finalResponse) { return finalResponse; }); + return makeReadyFutureWith( + [this, requestState, clientCallback = std::move(clientCallback)]() mutable { + setTimer(); + return requestState->client()->runExhaustCommandRequest( + *requestState->request, std::move(clientCallback), baton); + }) + .then([this, requestState] { return prevResponse; }); } void NetworkInterfaceTL::ExhaustCommandState::fulfillFinalPromise( diff --git a/src/mongo/executor/network_interface_tl.h b/src/mongo/executor/network_interface_tl.h index c35d9d30023..34bec18a411 100644 --- a/src/mongo/executor/network_interface_tl.h +++ b/src/mongo/executor/network_interface_tl.h @@ -199,10 +199,10 @@ private: void fulfillFinalPromise(StatusWith<RemoteCommandOnAnyResponse> response) override; Promise<void> promise; - Promise<RemoteCommandResponse> finalResponsePromise; - Mutex _onReplyMutex = MONGO_MAKE_LATCH("NetworkInterfaceTL::_onReplyMutex"); + RemoteCommandResponse prevResponse; + Mutex _onReplyMutex = + MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "NetworkInterfaceTL::_onReplyMutex"); RemoteCommandOnReplyFn onReplyFn; - std::function<void(StatusWith<RemoteCommandResponse>)> handleExhaustResponseFn; }; enum class ConnStatus { Unset, OK, Failed }; diff --git a/src/mongo/executor/remote_command_response.cpp b/src/mongo/executor/remote_command_response.cpp index 0b88b3d340d..8baafdb3d67 100644 --- a/src/mongo/executor/remote_command_response.cpp +++ b/src/mongo/executor/remote_command_response.cpp @@ -55,10 +55,8 @@ RemoteCommandResponseBase::RemoteCommandResponseBase(Status s, Milliseconds mill invariant(!isOK()); }; -RemoteCommandResponseBase::RemoteCommandResponseBase(BSONObj dataObj, - Milliseconds millis, - bool moreToCome) - : data(std::move(dataObj)), elapsedMillis(millis), moreToCome(moreToCome) { +RemoteCommandResponseBase::RemoteCommandResponseBase(BSONObj dataObj, Milliseconds millis) + : data(std::move(dataObj)), elapsedMillis(millis) { // The buffer backing the default empty BSONObj has static duration so it is effectively // owned. invariant(data.isOwned() || data.objdata() == BSONObj().objdata()); @@ -67,9 +65,8 @@ RemoteCommandResponseBase::RemoteCommandResponseBase(BSONObj dataObj, // TODO(amidvidy): we currently discard output docs when we use this constructor. We should // have RCR hold those too, but we need more machinery before that is possible. RemoteCommandResponseBase::RemoteCommandResponseBase(const rpc::ReplyInterface& rpcReply, - Milliseconds millis, - bool moreToCome) - : RemoteCommandResponseBase(rpcReply.getCommandReply(), std::move(millis), moreToCome) {} + Milliseconds millis) + : RemoteCommandResponseBase(rpcReply.getCommandReply(), std::move(millis)) {} bool RemoteCommandResponseBase::isOK() const { return status.isOK(); diff --git a/src/mongo/executor/remote_command_response.h b/src/mongo/executor/remote_command_response.h index 7842ebdfe14..2cd90b37974 100644 --- a/src/mongo/executor/remote_command_response.h +++ b/src/mongo/executor/remote_command_response.h @@ -63,18 +63,15 @@ struct RemoteCommandResponseBase { RemoteCommandResponseBase(Status s, Milliseconds millis); - RemoteCommandResponseBase(BSONObj dataObj, Milliseconds millis, bool moreToCome = false); + RemoteCommandResponseBase(BSONObj dataObj, Milliseconds millis); - RemoteCommandResponseBase(const rpc::ReplyInterface& rpcReply, - Milliseconds millis, - bool moreToCome = false); + RemoteCommandResponseBase(const rpc::ReplyInterface& rpcReply, Milliseconds millis); bool isOK() const; BSONObj data; // Always owned. May point into message. boost::optional<Milliseconds> elapsedMillis; Status status = Status::OK(); - bool moreToCome = false; // Whether or not the moreToCome bit is set on an exhaust message. protected: ~RemoteCommandResponseBase() = default; diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 7b76b6853b8..10a57699045 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -690,7 +690,8 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleExhaust auto commandStatus = _net->startExhaustCommand( swCbHandle.getValue(), scheduledRequest, - [this, scheduledRequest, cbState, cb, baton](const ResponseOnAnyStatus& response) { + [this, scheduledRequest, cbState, cb, baton](const ResponseOnAnyStatus& response, + bool isMoreToComeSet) { using std::swap; LOGV2_DEBUG( @@ -705,26 +706,16 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleExhaust return; } - if (cbState->canceled.load()) { - _networkInProgressQueue.erase(cbState->iter); - return; - } - // Swap the callback function with the new one CallbackFn newCb = [cb, scheduledRequest, response](const CallbackArgs& cbData) { remoteCommandFinished(cbData, cb, scheduledRequest, response); }; swap(cbState->callback, newCb); + // If this is the last response, invoke the non-exhaust path. This will mark cbState as // finished and remove the task from _networkInProgressQueue - if (!response.moreToCome) { - _networkInProgressQueue.erase(cbState->iter); - - WorkQueue result; - result.emplace_front(cbState); - result.front()->iter = result.begin(); - - scheduleIntoPool_inlock(&result, std::move(lk)); + if (!isMoreToComeSet) { + scheduleIntoPool_inlock(&_networkInProgressQueue, cbState->iter, std::move(lk)); return; } @@ -782,23 +773,26 @@ void ThreadPoolTaskExecutor::scheduleExhaustIntoPool_inlock(std::shared_ptr<Call void ThreadPoolTaskExecutor::runCallbackExhaust(std::shared_ptr<CallbackState> cbState) { CallbackHandle cbHandle; setCallbackForHandle(&cbHandle, cbState); - auto canceled = cbState->canceled.load(); CallbackArgs args(this, std::move(cbHandle), - canceled ? Status({ErrorCodes::CallbackCanceled, "Callback canceled"}) - : Status::OK()); - if (!cbState->isFinished.load()) { - cbState->callback(std::move(args)); + cbState->canceled.load() + ? Status({ErrorCodes::CallbackCanceled, "Callback canceled"}) + : Status::OK()); + invariant(!cbState->isFinished.load()); + { + // After running callback function, clear 'cbStateArg->callback' to release any resources + // that might be held by this function object. + // Swap 'cbStateArg->callback' with temporary copy before running callback for exception + // safety. + TaskExecutor::CallbackFn callback; + std::swap(cbState->callback, callback); + callback(std::move(args)); } // Do not mark cbState as finished. It will be marked as finished on the last reply. stdx::lock_guard<Latch> lk(_mutex); - - if (cbState->exhaustIter) { - _poolInProgressQueue.erase(cbState->exhaustIter.get()); - cbState->exhaustIter = boost::none; - } - + invariant(cbState->exhaustIter); + _poolInProgressQueue.erase(cbState->exhaustIter.get()); if (_inShutdown_inlock() && _poolInProgressQueue.empty()) { _stateChange.notify_all(); } diff --git a/src/mongo/executor/thread_pool_task_executor_integration_test.cpp b/src/mongo/executor/thread_pool_task_executor_integration_test.cpp index f6a8ccf4f75..d5eeeac0504 100644 --- a/src/mongo/executor/thread_pool_task_executor_integration_test.cpp +++ b/src/mongo/executor/thread_pool_task_executor_integration_test.cpp @@ -163,6 +163,11 @@ TEST_F(TaskExecutorFixture, RunExhaustShouldReceiveMultipleResponses) { ASSERT(cbHandle.isValid()); executor()->cancel(cbHandle); ASSERT(cbHandle.isCanceled()); + auto counters = exhaustRequestHandler.getCountersWhenReady(); + + // The command was cancelled so the 'fail' counter should be incremented + ASSERT_EQ(counters._success, 2); + ASSERT_EQ(counters._failed, 1); // The tasks should be removed after 'isMaster' fails ASSERT_TRUE(waitUntilNoTasksOrDeadline(Date_t::now() + Seconds(5))); |