diff options
-rw-r--r-- | src/mongo/db/repl/task_executor_mock.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/repl/task_executor_mock.h | 5 | ||||
-rw-r--r-- | src/mongo/executor/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/executor/scoped_task_executor.cpp | 15 | ||||
-rw-r--r-- | src/mongo/executor/task_executor.cpp | 19 | ||||
-rw-r--r-- | src/mongo/executor/task_executor.h | 27 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.cpp | 162 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.h | 21 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor_integration_test.cpp | 258 | ||||
-rw-r--r-- | src/mongo/s/sharding_task_executor.cpp | 11 | ||||
-rw-r--r-- | src/mongo/s/sharding_task_executor.h | 5 | ||||
-rw-r--r-- | src/mongo/unittest/task_executor_proxy.cpp | 12 | ||||
-rw-r--r-- | src/mongo/unittest/task_executor_proxy.h | 5 |
13 files changed, 550 insertions, 6 deletions
diff --git a/src/mongo/db/repl/task_executor_mock.cpp b/src/mongo/db/repl/task_executor_mock.cpp index d3eb7fdefa4..6909e354633 100644 --- a/src/mongo/db/repl/task_executor_mock.cpp +++ b/src/mongo/db/repl/task_executor_mock.cpp @@ -68,5 +68,20 @@ StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleRem return getExecutor()->scheduleRemoteCommandOnAny(request, cb, baton); } +StatusWith<executor::TaskExecutor::CallbackHandle> +TaskExecutorMock::scheduleExhaustRemoteCommandOnAny( + const executor::RemoteCommandRequestOnAny& request, + const RemoteCommandOnAnyCallbackFn& cb, + const BatonHandle& baton) { + if (shouldFailScheduleRemoteCommandRequest(request)) { + return Status(ErrorCodes::OperationFailed, "failed to schedule remote command"); + } + return getExecutor()->scheduleExhaustRemoteCommandOnAny(request, cb, baton); +} + +bool TaskExecutorMock::hasTasks() { + return getExecutor()->hasTasks(); +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/task_executor_mock.h b/src/mongo/db/repl/task_executor_mock.h index daae7c198b1..6f4ec02546c 100644 --- a/src/mongo/db/repl/task_executor_mock.h +++ b/src/mongo/db/repl/task_executor_mock.h @@ -52,6 +52,11 @@ public: const executor::RemoteCommandRequestOnAny& request, const RemoteCommandOnAnyCallbackFn& cb, const BatonHandle& baton = nullptr) override; + StatusWith<CallbackHandle> scheduleExhaustRemoteCommandOnAny( + const executor::RemoteCommandRequestOnAny& request, + const RemoteCommandOnAnyCallbackFn& cb, + const BatonHandle& baton = nullptr) override; + bool hasTasks() override; // Override to make scheduleWork() fail during testing. ShouldFailScheduleWorkRequestFn shouldFailScheduleWorkRequest = []() { return false; }; diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript index ea00837604e..27d49b0f0ae 100644 --- a/src/mongo/executor/SConscript +++ b/src/mongo/executor/SConscript @@ -275,6 +275,7 @@ env.CppIntegrationTest( 'network_interface_integration_test.cpp', 'task_executor_cursor_integration_test.cpp', 'non_auth_task_executor_integration_test.cpp', + 'thread_pool_task_executor_integration_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/client/clientdriver_network', diff --git a/src/mongo/executor/scoped_task_executor.cpp b/src/mongo/executor/scoped_task_executor.cpp index 057ff4aa971..0a8fd531479 100644 --- a/src/mongo/executor/scoped_task_executor.cpp +++ b/src/mongo/executor/scoped_task_executor.cpp @@ -142,6 +142,21 @@ public: cb); } + StatusWith<CallbackHandle> scheduleExhaustRemoteCommandOnAny( + const RemoteCommandRequestOnAny& request, + const RemoteCommandOnAnyCallbackFn& cb, + const BatonHandle& baton = nullptr) override { + return _wrapCallback( + [&](auto&& x) { + return _executor->scheduleExhaustRemoteCommandOnAny(request, std::move(x), baton); + }, + cb); + } + + bool hasTasks() { + return _executor->hasTasks(); + } + void cancel(const CallbackHandle& cbHandle) override { return _executor->cancel(cbHandle); } diff --git a/src/mongo/executor/task_executor.cpp b/src/mongo/executor/task_executor.cpp index 83d69b525fd..e2937372613 100644 --- a/src/mongo/executor/task_executor.cpp +++ b/src/mongo/executor/task_executor.cpp @@ -115,9 +115,22 @@ StatusWith<TaskExecutor::CallbackHandle> TaskExecutor::scheduleRemoteCommand( const RemoteCommandRequest& request, const RemoteCommandCallbackFn& cb, const BatonHandle& baton) { - return scheduleRemoteCommandOnAny(request, [cb](const RemoteCommandOnAnyCallbackArgs& args) { - cb({args, 0}); - }); + return scheduleRemoteCommandOnAny(request, + [cb](const RemoteCommandOnAnyCallbackArgs& args) { + cb({args, 0}); + }, + baton); +} + +StatusWith<TaskExecutor::CallbackHandle> TaskExecutor::scheduleExhaustRemoteCommand( + const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const BatonHandle& baton) { + return scheduleExhaustRemoteCommandOnAny(request, + [cb](const RemoteCommandOnAnyCallbackArgs& args) { + cb({args, 0}); + }, + baton); } } // namespace executor diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index d36f5c9bac6..d4c65e500b6 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -267,6 +267,33 @@ public: const BatonHandle& baton = nullptr) = 0; /** + * Schedules "cb" to be run by the executor on each reply recevied from executing the exhaust + * remote command described by "request". + * + * Returns a handle for waiting on or canceling the callback, or + * ErrorCodes::ShutdownInProgress. + * + * May be called by client threads or callbacks running in the executor. + * + * Contract: Implementations should guarantee that callback should be called *after* doing any + * processing related to the callback. + */ + virtual StatusWith<CallbackHandle> scheduleExhaustRemoteCommand( + const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const BatonHandle& baton = nullptr); + + virtual StatusWith<CallbackHandle> scheduleExhaustRemoteCommandOnAny( + const RemoteCommandRequestOnAny& request, + const RemoteCommandOnAnyCallbackFn& cb, + const BatonHandle& baton = nullptr) = 0; + + /** + * Returns true if there are any tasks scheduled on the executor. + */ + virtual bool hasTasks() = 0; + + /** * If the callback referenced by "cbHandle" hasn't already executed, marks it as * canceled and runnable. * diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 16391afb13b..520a6b12e04 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -94,6 +94,7 @@ public: CallbackFn callback; AtomicWord<unsigned> canceled{0U}; WorkQueue::iterator iter; + boost::optional<WorkQueue::iterator> exhaustIter; // Used only in the exhaust path Date_t readyDate; bool isNetworkOperation = false; bool isTimerOperation = false; @@ -178,8 +179,8 @@ void ThreadPoolTaskExecutor::join() { stdx::unique_lock<Latch> ThreadPoolTaskExecutor::_join(stdx::unique_lock<Latch> lk) { _stateChange.wait(lk, [this] { - // All tasks are spliced into the _poolInProgressQueue immediately after we accept them. - // This occurs in scheduleIntoPool_inlock. + // All non-exhaust tasks are spliced into the _poolInProgressQueue immediately after we + // accept them. This occurs in scheduleIntoPool_inlock. // // On the other side, all tasks are spliced out of the _poolInProgressQueue in runCallback, // which removes them from this list after executing the users callback. @@ -595,7 +596,11 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, return; } - cbState->canceled.store(1); + { + stdx::lock_guard<Latch> lk(_mutex); + cbState->canceled.store(1); + } + _pool->schedule([this, cbState](auto status) { invariant(status.isOK() || ErrorCodes::isCancelationError(status.code())); @@ -648,6 +653,157 @@ void ThreadPoolTaskExecutor::runCallback(std::shared_ptr<CallbackState> cbStateA } } +StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleExhaustRemoteCommandOnAny( + const RemoteCommandRequestOnAny& request, + const RemoteCommandOnAnyCallbackFn& cb, + const BatonHandle& baton) { + RemoteCommandRequestOnAny scheduledRequest = request; + if (request.timeout == RemoteCommandRequest::kNoTimeout) { + scheduledRequest.expirationDate = RemoteCommandRequest::kNoExpirationDate; + } else { + scheduledRequest.expirationDate = _net->now() + scheduledRequest.timeout; + } + + // In case the request fails to even get a connection from the pool, + // we wrap the callback in a method that prepares its input parameters. + auto wq = makeSingletonWorkQueue( + [scheduledRequest, cb](const CallbackArgs& cbData) { + remoteCommandFailedEarly(cbData, cb, scheduledRequest); + }, + baton); + wq.front()->isNetworkOperation = true; + stdx::unique_lock<Latch> lk(_mutex); + auto swCbHandle = enqueueCallbackState_inlock(&_networkInProgressQueue, &wq); + if (!swCbHandle.isOK()) + return swCbHandle; + std::shared_ptr<CallbackState> cbState = _networkInProgressQueue.back(); + lk.unlock(); + LOGV2_DEBUG(4495133, + 3, + "Scheduling exhaust remote command request: {scheduledRequest}", + "scheduledRequest"_attr = redact(scheduledRequest.toString())); + + auto commandStatus = _net->startExhaustCommand( + swCbHandle.getValue(), + scheduledRequest, + [this, scheduledRequest, cbState, cb, baton](const ResponseOnAnyStatus& response, + bool isMoreToComeSet) { + using std::swap; + + LOGV2_DEBUG( + 4495134, + 3, + "Received remote response: {response_isOK_response_response_status_toString}", + "response_isOK_response_response_status_toString"_attr = + redact(response.isOK() ? response.toString() : response.status.toString())); + + stdx::unique_lock<Latch> lk(_mutex); + if (_inShutdown_inlock()) { + return; + } + + // Swap the callback function with the new one + CallbackFn newCb = [cb, scheduledRequest, response](const CallbackArgs& cbData) { + remoteCommandFinished(cbData, cb, scheduledRequest, response); + }; + swap(cbState->callback, newCb); + + // If this is the last response, invoke the non-exhaust path. This will mark cbState as + // finished and remove the task from _networkInProgressQueue + if (!isMoreToComeSet) { + scheduleIntoPool_inlock(&_networkInProgressQueue, cbState->iter, std::move(lk)); + return; + } + + scheduleExhaustIntoPool_inlock(cbState, std::move(lk)); + }, + baton); + + if (!commandStatus.isOK()) + return commandStatus; + + return swCbHandle; +} + +void ThreadPoolTaskExecutor::scheduleExhaustIntoPool_inlock(std::shared_ptr<CallbackState> cbState, + stdx::unique_lock<Latch> lk) { + _poolInProgressQueue.push_back(cbState); + cbState->exhaustIter = --_poolInProgressQueue.end(); + lk.unlock(); + + if (cbState->baton) { + cbState->baton->schedule([this, cbState](Status status) { + if (status.isOK()) { + runCallbackExhaust(cbState); + return; + } + + { + stdx::lock_guard<Latch> lk(_mutex); + cbState->canceled.store(1); + } + + _pool->schedule([this, cbState](auto status) { + invariant(status.isOK() || ErrorCodes::isCancelationError(status.code())); + + runCallbackExhaust(cbState); + }); + }); + } else { + _pool->schedule([this, cbState](auto status) { + if (ErrorCodes::isCancelationError(status.code())) { + stdx::lock_guard<Latch> lk(_mutex); + + cbState->canceled.store(1); + } else { + fassert(4615617, status); + } + + runCallbackExhaust(cbState); + }); + } + + _net->signalWorkAvailable(); +} + +void ThreadPoolTaskExecutor::runCallbackExhaust(std::shared_ptr<CallbackState> cbState) { + CallbackHandle cbHandle; + setCallbackForHandle(&cbHandle, cbState); + CallbackArgs args(this, + std::move(cbHandle), + cbState->canceled.load() + ? Status({ErrorCodes::CallbackCanceled, "Callback canceled"}) + : Status::OK()); + invariant(!cbState->isFinished.load()); + { + // After running callback function, clear 'cbStateArg->callback' to release any resources + // that might be held by this function object. + // Swap 'cbStateArg->callback' with temporary copy before running callback for exception + // safety. + TaskExecutor::CallbackFn callback; + std::swap(cbState->callback, callback); + callback(std::move(args)); + } + + // Do not mark cbState as finished. It will be marked as finished on the last reply. + stdx::lock_guard<Latch> lk(_mutex); + invariant(cbState->exhaustIter); + _poolInProgressQueue.erase(cbState->exhaustIter.get()); + if (_inShutdown_inlock() && _poolInProgressQueue.empty()) { + _stateChange.notify_all(); + } +} + +bool ThreadPoolTaskExecutor::hasTasks() { + stdx::unique_lock<Latch> lk(_mutex); + if (!_poolInProgressQueue.empty() || !_networkInProgressQueue.empty() || + !_sleepersQueue.empty()) { + return true; + } + + return false; +} + bool ThreadPoolTaskExecutor::_inShutdown_inlock() const { return _state >= joinRequired; } diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h index 1bcfe806a39..16bf459f6f0 100644 --- a/src/mongo/executor/thread_pool_task_executor.h +++ b/src/mongo/executor/thread_pool_task_executor.h @@ -87,6 +87,10 @@ public: const RemoteCommandRequestOnAny& request, const RemoteCommandOnAnyCallbackFn& cb, const BatonHandle& baton = nullptr) override; + StatusWith<CallbackHandle> scheduleExhaustRemoteCommandOnAny( + const RemoteCommandRequestOnAny& request, + const RemoteCommandOnAnyCallbackFn& cb, + const BatonHandle& baton = nullptr); void cancel(const CallbackHandle& cbHandle) override; void wait(const CallbackHandle& cbHandle, Interruptible* interruptible = Interruptible::notInterruptible()) override; @@ -98,6 +102,12 @@ public: */ void dropConnections(const HostAndPort& hostAndPort); + /** + * Returns true if there are any tasks in any of _poolInProgressQueue, _networkInProgressQueue, + * or _sleepersQueue. + */ + bool hasTasks(); + private: class CallbackState; class EventState; @@ -174,10 +184,21 @@ private: stdx::unique_lock<Latch> lk); /** + * Schedules cbState into the thread pool and places it into _poolInProgressQueue. Does not + * remove the entry from the original queue. + */ + void scheduleExhaustIntoPool_inlock(std::shared_ptr<CallbackState> cbState, + stdx::unique_lock<Latch> lk); + /** * Executes the callback specified by "cbState". */ void runCallback(std::shared_ptr<CallbackState> cbState); + /** + * Executes the callback specified by "cbState". Will not mark cbState as finished. + */ + void runCallbackExhaust(std::shared_ptr<CallbackState> cbState); + bool _inShutdown_inlock() const; void _setState_inlock(State newState); stdx::unique_lock<Latch> _join(stdx::unique_lock<Latch> lk); diff --git a/src/mongo/executor/thread_pool_task_executor_integration_test.cpp b/src/mongo/executor/thread_pool_task_executor_integration_test.cpp new file mode 100644 index 00000000000..b41d9648f52 --- /dev/null +++ b/src/mongo/executor/thread_pool_task_executor_integration_test.cpp @@ -0,0 +1,258 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include "mongo/platform/basic.h" + +#include "mongo/executor/task_executor.h" + +#include "mongo/db/namespace_string.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/network_interface_thread_pool.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/rpc/topology_version_gen.h" +#include "mongo/unittest/integration_test.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace executor { +namespace { + +class TaskExecutorFixture : public mongo::unittest::Test { +public: + void setUp() override { + std::shared_ptr<NetworkInterface> net = makeNetworkInterface("TaskExecutorTest"); + auto tp = std::make_unique<NetworkInterfaceThreadPool>(net.get()); + + _executor = std::make_unique<ThreadPoolTaskExecutor>(std::move(tp), std::move(net)); + _executor->startup(); + }; + + void tearDown() override { + _executor->shutdown(); + _executor.reset(); + }; + + TaskExecutor* executor() { + return _executor.get(); + } + + bool waitUntilNoTasksOrDeadline(Date_t deadline) { + while (Date_t::now() <= deadline) { + if (!_executor->hasTasks()) { + return true; + } + } + + return false; + } + + ServiceContext::UniqueServiceContext _serviceCtx = ServiceContext::make(); + std::unique_ptr<ThreadPoolTaskExecutor> _executor; +}; + +class RequestHandlerUtil { +public: + struct responseOutcomeCount { + int _success = 0; + int _failed = 0; + }; + + std::function<void(const executor::TaskExecutor::RemoteCommandCallbackArgs&)>&& + getRequestCallbackFn() { + return std::move(_callbackFn); + } + + RequestHandlerUtil::responseOutcomeCount getCountersWhenReady() { + stdx::unique_lock<Latch> lk(_mutex); + _cv.wait(_mutex, [&] { return _replyUpdated; }); + _replyUpdated = false; + return _responseOutcomeCount; + } + +private: + // set to true once '_responseOutcomeCount' has been updated. Used to indicate that a new + // response has been sent. + bool _replyUpdated = false; + + // counter of how many successful and failed responses were received. + responseOutcomeCount _responseOutcomeCount; + + Mutex _mutex = MONGO_MAKE_LATCH("ExhaustRequestHandlerUtil::_mutex"); + stdx::condition_variable _cv; + + // called when a server sends a new isMaster exhaust response. Updates _responseOutcomeCount + // and _replyUpdated. + std::function<void(const executor::TaskExecutor::RemoteCommandCallbackArgs&)> _callbackFn = + [&](const executor::TaskExecutor::RemoteCommandCallbackArgs& result) { + { + stdx::unique_lock<Latch> lk(_mutex); + if (result.response.isOK()) { + _responseOutcomeCount._success++; + } else { + _responseOutcomeCount._failed++; + } + _replyUpdated = true; + } + + _cv.notify_all(); + }; +}; + +TEST_F(TaskExecutorFixture, RunExhaustShouldReceiveMultipleResponses) { + auto client = _serviceCtx->makeClient("TaskExecutorExhaustTest"); + auto opCtx = client->makeOperationContext(); + + RemoteCommandRequest rcr(unittest::getFixtureConnectionString().getServers().front(), + "admin", + BSON("isMaster" << 1 << "maxAwaitTimeMS" << 1000 << "topologyVersion" + << TopologyVersion(OID::max(), 0).toBSON()), + opCtx.get()); + + RequestHandlerUtil exhaustRequestHandler; + auto swCbHandle = executor()->scheduleExhaustRemoteCommand( + std::move(rcr), exhaustRequestHandler.getRequestCallbackFn()); + ASSERT_OK(swCbHandle.getStatus()); + auto cbHandle = swCbHandle.getValue(); + + { + auto counters = exhaustRequestHandler.getCountersWhenReady(); + ASSERT(cbHandle.isValid()); + + // The first response should be successful + ASSERT_EQ(counters._success, 1); + ASSERT_EQ(counters._failed, 0); + } + + { + auto counters = exhaustRequestHandler.getCountersWhenReady(); + ASSERT(cbHandle.isValid()); + + // The second response should also be successful + ASSERT_EQ(counters._success, 2); + ASSERT_EQ(counters._failed, 0); + } + + // Cancel the callback + ASSERT(cbHandle.isValid()); + executor()->cancel(cbHandle); + ASSERT(cbHandle.isCanceled()); + auto counters = exhaustRequestHandler.getCountersWhenReady(); + + // The command was cancelled so the 'fail' counter should be incremented + ASSERT_EQ(counters._success, 2); + ASSERT_EQ(counters._failed, 1); + + // The tasks should be removed after 'isMaster' fails + ASSERT_TRUE(waitUntilNoTasksOrDeadline(Date_t::now() + Seconds(5))); +} + +TEST_F(TaskExecutorFixture, RunExhaustShouldStopOnFailure) { + // Turn on the failCommand failpoint for 'isMaster' on the server that we will schedule + // 'isMaster' on below + auto failCmdClient = _serviceCtx->makeClient("TaskExecutorExhaustTest"); + auto opCtx = failCmdClient->makeOperationContext(); + + auto configureFailpointCmd = BSON("configureFailPoint" + << "failCommand" + << "mode" + << "alwaysOn" + << "data" + << BSON("errorCode" << ErrorCodes::CommandFailed + << "failCommands" + << BSON_ARRAY("isMaster"))); + RemoteCommandRequest failCmd(unittest::getFixtureConnectionString().getServers().front(), + "admin", + configureFailpointCmd, + opCtx.get()); + RequestHandlerUtil failCmdRequestHandler; + auto swCbHandle = _executor->scheduleRemoteCommand( + std::move(failCmd), failCmdRequestHandler.getRequestCallbackFn()); + ASSERT_OK(swCbHandle.getStatus()); + auto cbHandle = swCbHandle.getValue(); + + // Assert 'configureFailPoint' was successful + auto counters = failCmdRequestHandler.getCountersWhenReady(); + ASSERT(cbHandle.isValid()); + ASSERT_EQ(counters._success, 1); + ASSERT_EQ(counters._failed, 0); + + ON_BLOCK_EXIT([&] { + auto stopFpCmd = BSON("configureFailPoint" + << "failCommand" + << "mode" + << "off"); + RemoteCommandRequest stopFpRequest( + unittest::getFixtureConnectionString().getServers().front(), + "admin", + stopFpCmd, + opCtx.get()); + auto swCbHandle = _executor->scheduleRemoteCommand( + std::move(stopFpRequest), failCmdRequestHandler.getRequestCallbackFn()); + + // Assert the failpoint is correctly turned off + auto counters = failCmdRequestHandler.getCountersWhenReady(); + ASSERT(cbHandle.isValid()); + + ASSERT_EQ(counters._success, 2); + ASSERT_EQ(counters._failed, 0); + }); + + { + auto client = _serviceCtx->makeClient("TaskExecutorExhaustTest"); + auto opCtx = client->makeOperationContext(); + + RemoteCommandRequest rcr(unittest::getFixtureConnectionString().getServers().front(), + "admin", + BSON("isMaster" << 1 << "maxAwaitTimeMS" << 1000 + << "topologyVersion" + << TopologyVersion(OID::max(), 0).toBSON()), + opCtx.get()); + + RequestHandlerUtil exhaustRequestHandler; + auto swCbHandle = executor()->scheduleExhaustRemoteCommand( + std::move(rcr), exhaustRequestHandler.getRequestCallbackFn()); + ASSERT_OK(swCbHandle.getStatus()); + auto cbHandle = swCbHandle.getValue(); + + auto counters = exhaustRequestHandler.getCountersWhenReady(); + + // The response should be marked as failed + ASSERT_EQ(counters._success, 0); + ASSERT_EQ(counters._failed, 1); + + // The tasks should be removed after 'isMaster' fails + ASSERT_TRUE(waitUntilNoTasksOrDeadline(Date_t::now() + Seconds(5))); + } +} + +} // namespace +} // namespace executor +} // namespace mongo diff --git a/src/mongo/s/sharding_task_executor.cpp b/src/mongo/s/sharding_task_executor.cpp index c9c47f9c3f9..8c114162fe5 100644 --- a/src/mongo/s/sharding_task_executor.cpp +++ b/src/mongo/s/sharding_task_executor.cpp @@ -257,6 +257,17 @@ StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleRemoteCom requestWithFixedLsid ? *requestWithFixedLsid : request, shardingCb, baton); } +StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleExhaustRemoteCommandOnAny( + const RemoteCommandRequestOnAny& request, + const RemoteCommandOnAnyCallbackFn& cb, + const BatonHandle& baton) { + MONGO_UNREACHABLE; +} + +bool ShardingTaskExecutor::hasTasks() { + MONGO_UNREACHABLE; +} + void ShardingTaskExecutor::cancel(const CallbackHandle& cbHandle) { _executor->cancel(cbHandle); } diff --git a/src/mongo/s/sharding_task_executor.h b/src/mongo/s/sharding_task_executor.h index 3f763d120cd..bf5b1db067c 100644 --- a/src/mongo/s/sharding_task_executor.h +++ b/src/mongo/s/sharding_task_executor.h @@ -72,6 +72,11 @@ public: const RemoteCommandRequestOnAny& request, const RemoteCommandOnAnyCallbackFn& cb, const BatonHandle& baton = nullptr) override; + StatusWith<CallbackHandle> scheduleExhaustRemoteCommandOnAny( + const RemoteCommandRequestOnAny& request, + const RemoteCommandOnAnyCallbackFn& cb, + const BatonHandle& baton = nullptr) override; + bool hasTasks() override; void cancel(const CallbackHandle& cbHandle) override; void wait(const CallbackHandle& cbHandle, Interruptible* interruptible = Interruptible::notInterruptible()) override; diff --git a/src/mongo/unittest/task_executor_proxy.cpp b/src/mongo/unittest/task_executor_proxy.cpp index dd583c17056..911123d375c 100644 --- a/src/mongo/unittest/task_executor_proxy.cpp +++ b/src/mongo/unittest/task_executor_proxy.cpp @@ -106,6 +106,18 @@ StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleRe return _executor->scheduleRemoteCommandOnAny(request, cb, baton); } +StatusWith<executor::TaskExecutor::CallbackHandle> +TaskExecutorProxy::scheduleExhaustRemoteCommandOnAny( + const executor::RemoteCommandRequestOnAny& request, + const RemoteCommandOnAnyCallbackFn& cb, + const BatonHandle& baton) { + return _executor->scheduleExhaustRemoteCommandOnAny(request, cb, baton); +} + +bool TaskExecutorProxy::hasTasks() { + return _executor->hasTasks(); +} + void TaskExecutorProxy::cancel(const CallbackHandle& cbHandle) { _executor->cancel(cbHandle); } diff --git a/src/mongo/unittest/task_executor_proxy.h b/src/mongo/unittest/task_executor_proxy.h index 3d37f837e81..fa27e0cde05 100644 --- a/src/mongo/unittest/task_executor_proxy.h +++ b/src/mongo/unittest/task_executor_proxy.h @@ -69,6 +69,11 @@ public: const executor::RemoteCommandRequestOnAny& request, const RemoteCommandOnAnyCallbackFn& cb, const BatonHandle& baton = nullptr) override; + StatusWith<CallbackHandle> scheduleExhaustRemoteCommandOnAny( + const executor::RemoteCommandRequestOnAny& request, + const RemoteCommandOnAnyCallbackFn& cb, + const BatonHandle& baton = nullptr) override; + bool hasTasks() override; void cancel(const CallbackHandle& cbHandle) override; void wait(const CallbackHandle& cbHandle, Interruptible* interruptible = Interruptible::notInterruptible()) override; |