summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/task_executor_mock.cpp15
-rw-r--r--src/mongo/db/repl/task_executor_mock.h5
-rw-r--r--src/mongo/executor/SConscript1
-rw-r--r--src/mongo/executor/scoped_task_executor.cpp15
-rw-r--r--src/mongo/executor/task_executor.cpp19
-rw-r--r--src/mongo/executor/task_executor.h27
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp162
-rw-r--r--src/mongo/executor/thread_pool_task_executor.h21
-rw-r--r--src/mongo/executor/thread_pool_task_executor_integration_test.cpp258
-rw-r--r--src/mongo/s/sharding_task_executor.cpp11
-rw-r--r--src/mongo/s/sharding_task_executor.h5
-rw-r--r--src/mongo/unittest/task_executor_proxy.cpp12
-rw-r--r--src/mongo/unittest/task_executor_proxy.h5
13 files changed, 550 insertions, 6 deletions
diff --git a/src/mongo/db/repl/task_executor_mock.cpp b/src/mongo/db/repl/task_executor_mock.cpp
index d3eb7fdefa4..6909e354633 100644
--- a/src/mongo/db/repl/task_executor_mock.cpp
+++ b/src/mongo/db/repl/task_executor_mock.cpp
@@ -68,5 +68,20 @@ StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorMock::scheduleRem
return getExecutor()->scheduleRemoteCommandOnAny(request, cb, baton);
}
+StatusWith<executor::TaskExecutor::CallbackHandle>
+TaskExecutorMock::scheduleExhaustRemoteCommandOnAny(
+ const executor::RemoteCommandRequestOnAny& request,
+ const RemoteCommandOnAnyCallbackFn& cb,
+ const BatonHandle& baton) {
+ if (shouldFailScheduleRemoteCommandRequest(request)) {
+ return Status(ErrorCodes::OperationFailed, "failed to schedule remote command");
+ }
+ return getExecutor()->scheduleExhaustRemoteCommandOnAny(request, cb, baton);
+}
+
+bool TaskExecutorMock::hasTasks() {
+ return getExecutor()->hasTasks();
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/task_executor_mock.h b/src/mongo/db/repl/task_executor_mock.h
index daae7c198b1..6f4ec02546c 100644
--- a/src/mongo/db/repl/task_executor_mock.h
+++ b/src/mongo/db/repl/task_executor_mock.h
@@ -52,6 +52,11 @@ public:
const executor::RemoteCommandRequestOnAny& request,
const RemoteCommandOnAnyCallbackFn& cb,
const BatonHandle& baton = nullptr) override;
+ StatusWith<CallbackHandle> scheduleExhaustRemoteCommandOnAny(
+ const executor::RemoteCommandRequestOnAny& request,
+ const RemoteCommandOnAnyCallbackFn& cb,
+ const BatonHandle& baton = nullptr) override;
+ bool hasTasks() override;
// Override to make scheduleWork() fail during testing.
ShouldFailScheduleWorkRequestFn shouldFailScheduleWorkRequest = []() { return false; };
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript
index ea00837604e..27d49b0f0ae 100644
--- a/src/mongo/executor/SConscript
+++ b/src/mongo/executor/SConscript
@@ -275,6 +275,7 @@ env.CppIntegrationTest(
'network_interface_integration_test.cpp',
'task_executor_cursor_integration_test.cpp',
'non_auth_task_executor_integration_test.cpp',
+ 'thread_pool_task_executor_integration_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/client/clientdriver_network',
diff --git a/src/mongo/executor/scoped_task_executor.cpp b/src/mongo/executor/scoped_task_executor.cpp
index 057ff4aa971..0a8fd531479 100644
--- a/src/mongo/executor/scoped_task_executor.cpp
+++ b/src/mongo/executor/scoped_task_executor.cpp
@@ -142,6 +142,21 @@ public:
cb);
}
+ StatusWith<CallbackHandle> scheduleExhaustRemoteCommandOnAny(
+ const RemoteCommandRequestOnAny& request,
+ const RemoteCommandOnAnyCallbackFn& cb,
+ const BatonHandle& baton = nullptr) override {
+ return _wrapCallback(
+ [&](auto&& x) {
+ return _executor->scheduleExhaustRemoteCommandOnAny(request, std::move(x), baton);
+ },
+ cb);
+ }
+
+ bool hasTasks() {
+ return _executor->hasTasks();
+ }
+
void cancel(const CallbackHandle& cbHandle) override {
return _executor->cancel(cbHandle);
}
diff --git a/src/mongo/executor/task_executor.cpp b/src/mongo/executor/task_executor.cpp
index 83d69b525fd..e2937372613 100644
--- a/src/mongo/executor/task_executor.cpp
+++ b/src/mongo/executor/task_executor.cpp
@@ -115,9 +115,22 @@ StatusWith<TaskExecutor::CallbackHandle> TaskExecutor::scheduleRemoteCommand(
const RemoteCommandRequest& request,
const RemoteCommandCallbackFn& cb,
const BatonHandle& baton) {
- return scheduleRemoteCommandOnAny(request, [cb](const RemoteCommandOnAnyCallbackArgs& args) {
- cb({args, 0});
- });
+ return scheduleRemoteCommandOnAny(request,
+ [cb](const RemoteCommandOnAnyCallbackArgs& args) {
+ cb({args, 0});
+ },
+ baton);
+}
+
+StatusWith<TaskExecutor::CallbackHandle> TaskExecutor::scheduleExhaustRemoteCommand(
+ const RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const BatonHandle& baton) {
+ return scheduleExhaustRemoteCommandOnAny(request,
+ [cb](const RemoteCommandOnAnyCallbackArgs& args) {
+ cb({args, 0});
+ },
+ baton);
}
} // namespace executor
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index d36f5c9bac6..d4c65e500b6 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -267,6 +267,33 @@ public:
const BatonHandle& baton = nullptr) = 0;
/**
+ * Schedules "cb" to be run by the executor on each reply recevied from executing the exhaust
+ * remote command described by "request".
+ *
+ * Returns a handle for waiting on or canceling the callback, or
+ * ErrorCodes::ShutdownInProgress.
+ *
+ * May be called by client threads or callbacks running in the executor.
+ *
+ * Contract: Implementations should guarantee that callback should be called *after* doing any
+ * processing related to the callback.
+ */
+ virtual StatusWith<CallbackHandle> scheduleExhaustRemoteCommand(
+ const RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const BatonHandle& baton = nullptr);
+
+ virtual StatusWith<CallbackHandle> scheduleExhaustRemoteCommandOnAny(
+ const RemoteCommandRequestOnAny& request,
+ const RemoteCommandOnAnyCallbackFn& cb,
+ const BatonHandle& baton = nullptr) = 0;
+
+ /**
+ * Returns true if there are any tasks scheduled on the executor.
+ */
+ virtual bool hasTasks() = 0;
+
+ /**
* If the callback referenced by "cbHandle" hasn't already executed, marks it as
* canceled and runnable.
*
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 16391afb13b..520a6b12e04 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -94,6 +94,7 @@ public:
CallbackFn callback;
AtomicWord<unsigned> canceled{0U};
WorkQueue::iterator iter;
+ boost::optional<WorkQueue::iterator> exhaustIter; // Used only in the exhaust path
Date_t readyDate;
bool isNetworkOperation = false;
bool isTimerOperation = false;
@@ -178,8 +179,8 @@ void ThreadPoolTaskExecutor::join() {
stdx::unique_lock<Latch> ThreadPoolTaskExecutor::_join(stdx::unique_lock<Latch> lk) {
_stateChange.wait(lk, [this] {
- // All tasks are spliced into the _poolInProgressQueue immediately after we accept them.
- // This occurs in scheduleIntoPool_inlock.
+ // All non-exhaust tasks are spliced into the _poolInProgressQueue immediately after we
+ // accept them. This occurs in scheduleIntoPool_inlock.
//
// On the other side, all tasks are spliced out of the _poolInProgressQueue in runCallback,
// which removes them from this list after executing the users callback.
@@ -595,7 +596,11 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
return;
}
- cbState->canceled.store(1);
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ cbState->canceled.store(1);
+ }
+
_pool->schedule([this, cbState](auto status) {
invariant(status.isOK() || ErrorCodes::isCancelationError(status.code()));
@@ -648,6 +653,157 @@ void ThreadPoolTaskExecutor::runCallback(std::shared_ptr<CallbackState> cbStateA
}
}
+StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleExhaustRemoteCommandOnAny(
+ const RemoteCommandRequestOnAny& request,
+ const RemoteCommandOnAnyCallbackFn& cb,
+ const BatonHandle& baton) {
+ RemoteCommandRequestOnAny scheduledRequest = request;
+ if (request.timeout == RemoteCommandRequest::kNoTimeout) {
+ scheduledRequest.expirationDate = RemoteCommandRequest::kNoExpirationDate;
+ } else {
+ scheduledRequest.expirationDate = _net->now() + scheduledRequest.timeout;
+ }
+
+ // In case the request fails to even get a connection from the pool,
+ // we wrap the callback in a method that prepares its input parameters.
+ auto wq = makeSingletonWorkQueue(
+ [scheduledRequest, cb](const CallbackArgs& cbData) {
+ remoteCommandFailedEarly(cbData, cb, scheduledRequest);
+ },
+ baton);
+ wq.front()->isNetworkOperation = true;
+ stdx::unique_lock<Latch> lk(_mutex);
+ auto swCbHandle = enqueueCallbackState_inlock(&_networkInProgressQueue, &wq);
+ if (!swCbHandle.isOK())
+ return swCbHandle;
+ std::shared_ptr<CallbackState> cbState = _networkInProgressQueue.back();
+ lk.unlock();
+ LOGV2_DEBUG(4495133,
+ 3,
+ "Scheduling exhaust remote command request: {scheduledRequest}",
+ "scheduledRequest"_attr = redact(scheduledRequest.toString()));
+
+ auto commandStatus = _net->startExhaustCommand(
+ swCbHandle.getValue(),
+ scheduledRequest,
+ [this, scheduledRequest, cbState, cb, baton](const ResponseOnAnyStatus& response,
+ bool isMoreToComeSet) {
+ using std::swap;
+
+ LOGV2_DEBUG(
+ 4495134,
+ 3,
+ "Received remote response: {response_isOK_response_response_status_toString}",
+ "response_isOK_response_response_status_toString"_attr =
+ redact(response.isOK() ? response.toString() : response.status.toString()));
+
+ stdx::unique_lock<Latch> lk(_mutex);
+ if (_inShutdown_inlock()) {
+ return;
+ }
+
+ // Swap the callback function with the new one
+ CallbackFn newCb = [cb, scheduledRequest, response](const CallbackArgs& cbData) {
+ remoteCommandFinished(cbData, cb, scheduledRequest, response);
+ };
+ swap(cbState->callback, newCb);
+
+ // If this is the last response, invoke the non-exhaust path. This will mark cbState as
+ // finished and remove the task from _networkInProgressQueue
+ if (!isMoreToComeSet) {
+ scheduleIntoPool_inlock(&_networkInProgressQueue, cbState->iter, std::move(lk));
+ return;
+ }
+
+ scheduleExhaustIntoPool_inlock(cbState, std::move(lk));
+ },
+ baton);
+
+ if (!commandStatus.isOK())
+ return commandStatus;
+
+ return swCbHandle;
+}
+
+void ThreadPoolTaskExecutor::scheduleExhaustIntoPool_inlock(std::shared_ptr<CallbackState> cbState,
+ stdx::unique_lock<Latch> lk) {
+ _poolInProgressQueue.push_back(cbState);
+ cbState->exhaustIter = --_poolInProgressQueue.end();
+ lk.unlock();
+
+ if (cbState->baton) {
+ cbState->baton->schedule([this, cbState](Status status) {
+ if (status.isOK()) {
+ runCallbackExhaust(cbState);
+ return;
+ }
+
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ cbState->canceled.store(1);
+ }
+
+ _pool->schedule([this, cbState](auto status) {
+ invariant(status.isOK() || ErrorCodes::isCancelationError(status.code()));
+
+ runCallbackExhaust(cbState);
+ });
+ });
+ } else {
+ _pool->schedule([this, cbState](auto status) {
+ if (ErrorCodes::isCancelationError(status.code())) {
+ stdx::lock_guard<Latch> lk(_mutex);
+
+ cbState->canceled.store(1);
+ } else {
+ fassert(4615617, status);
+ }
+
+ runCallbackExhaust(cbState);
+ });
+ }
+
+ _net->signalWorkAvailable();
+}
+
+void ThreadPoolTaskExecutor::runCallbackExhaust(std::shared_ptr<CallbackState> cbState) {
+ CallbackHandle cbHandle;
+ setCallbackForHandle(&cbHandle, cbState);
+ CallbackArgs args(this,
+ std::move(cbHandle),
+ cbState->canceled.load()
+ ? Status({ErrorCodes::CallbackCanceled, "Callback canceled"})
+ : Status::OK());
+ invariant(!cbState->isFinished.load());
+ {
+ // After running callback function, clear 'cbStateArg->callback' to release any resources
+ // that might be held by this function object.
+ // Swap 'cbStateArg->callback' with temporary copy before running callback for exception
+ // safety.
+ TaskExecutor::CallbackFn callback;
+ std::swap(cbState->callback, callback);
+ callback(std::move(args));
+ }
+
+ // Do not mark cbState as finished. It will be marked as finished on the last reply.
+ stdx::lock_guard<Latch> lk(_mutex);
+ invariant(cbState->exhaustIter);
+ _poolInProgressQueue.erase(cbState->exhaustIter.get());
+ if (_inShutdown_inlock() && _poolInProgressQueue.empty()) {
+ _stateChange.notify_all();
+ }
+}
+
+bool ThreadPoolTaskExecutor::hasTasks() {
+ stdx::unique_lock<Latch> lk(_mutex);
+ if (!_poolInProgressQueue.empty() || !_networkInProgressQueue.empty() ||
+ !_sleepersQueue.empty()) {
+ return true;
+ }
+
+ return false;
+}
+
bool ThreadPoolTaskExecutor::_inShutdown_inlock() const {
return _state >= joinRequired;
}
diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h
index 1bcfe806a39..16bf459f6f0 100644
--- a/src/mongo/executor/thread_pool_task_executor.h
+++ b/src/mongo/executor/thread_pool_task_executor.h
@@ -87,6 +87,10 @@ public:
const RemoteCommandRequestOnAny& request,
const RemoteCommandOnAnyCallbackFn& cb,
const BatonHandle& baton = nullptr) override;
+ StatusWith<CallbackHandle> scheduleExhaustRemoteCommandOnAny(
+ const RemoteCommandRequestOnAny& request,
+ const RemoteCommandOnAnyCallbackFn& cb,
+ const BatonHandle& baton = nullptr);
void cancel(const CallbackHandle& cbHandle) override;
void wait(const CallbackHandle& cbHandle,
Interruptible* interruptible = Interruptible::notInterruptible()) override;
@@ -98,6 +102,12 @@ public:
*/
void dropConnections(const HostAndPort& hostAndPort);
+ /**
+ * Returns true if there are any tasks in any of _poolInProgressQueue, _networkInProgressQueue,
+ * or _sleepersQueue.
+ */
+ bool hasTasks();
+
private:
class CallbackState;
class EventState;
@@ -174,10 +184,21 @@ private:
stdx::unique_lock<Latch> lk);
/**
+ * Schedules cbState into the thread pool and places it into _poolInProgressQueue. Does not
+ * remove the entry from the original queue.
+ */
+ void scheduleExhaustIntoPool_inlock(std::shared_ptr<CallbackState> cbState,
+ stdx::unique_lock<Latch> lk);
+ /**
* Executes the callback specified by "cbState".
*/
void runCallback(std::shared_ptr<CallbackState> cbState);
+ /**
+ * Executes the callback specified by "cbState". Will not mark cbState as finished.
+ */
+ void runCallbackExhaust(std::shared_ptr<CallbackState> cbState);
+
bool _inShutdown_inlock() const;
void _setState_inlock(State newState);
stdx::unique_lock<Latch> _join(stdx::unique_lock<Latch> lk);
diff --git a/src/mongo/executor/thread_pool_task_executor_integration_test.cpp b/src/mongo/executor/thread_pool_task_executor_integration_test.cpp
new file mode 100644
index 00000000000..b41d9648f52
--- /dev/null
+++ b/src/mongo/executor/thread_pool_task_executor_integration_test.cpp
@@ -0,0 +1,258 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/executor/task_executor.h"
+
+#include "mongo/db/namespace_string.h"
+#include "mongo/executor/network_interface_factory.h"
+#include "mongo/executor/network_interface_thread_pool.h"
+#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/rpc/topology_version_gen.h"
+#include "mongo/unittest/integration_test.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace executor {
+namespace {
+
+class TaskExecutorFixture : public mongo::unittest::Test {
+public:
+ void setUp() override {
+ std::shared_ptr<NetworkInterface> net = makeNetworkInterface("TaskExecutorTest");
+ auto tp = std::make_unique<NetworkInterfaceThreadPool>(net.get());
+
+ _executor = std::make_unique<ThreadPoolTaskExecutor>(std::move(tp), std::move(net));
+ _executor->startup();
+ };
+
+ void tearDown() override {
+ _executor->shutdown();
+ _executor.reset();
+ };
+
+ TaskExecutor* executor() {
+ return _executor.get();
+ }
+
+ bool waitUntilNoTasksOrDeadline(Date_t deadline) {
+ while (Date_t::now() <= deadline) {
+ if (!_executor->hasTasks()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ ServiceContext::UniqueServiceContext _serviceCtx = ServiceContext::make();
+ std::unique_ptr<ThreadPoolTaskExecutor> _executor;
+};
+
+class RequestHandlerUtil {
+public:
+ struct responseOutcomeCount {
+ int _success = 0;
+ int _failed = 0;
+ };
+
+ std::function<void(const executor::TaskExecutor::RemoteCommandCallbackArgs&)>&&
+ getRequestCallbackFn() {
+ return std::move(_callbackFn);
+ }
+
+ RequestHandlerUtil::responseOutcomeCount getCountersWhenReady() {
+ stdx::unique_lock<Latch> lk(_mutex);
+ _cv.wait(_mutex, [&] { return _replyUpdated; });
+ _replyUpdated = false;
+ return _responseOutcomeCount;
+ }
+
+private:
+ // set to true once '_responseOutcomeCount' has been updated. Used to indicate that a new
+ // response has been sent.
+ bool _replyUpdated = false;
+
+ // counter of how many successful and failed responses were received.
+ responseOutcomeCount _responseOutcomeCount;
+
+ Mutex _mutex = MONGO_MAKE_LATCH("ExhaustRequestHandlerUtil::_mutex");
+ stdx::condition_variable _cv;
+
+ // called when a server sends a new isMaster exhaust response. Updates _responseOutcomeCount
+ // and _replyUpdated.
+ std::function<void(const executor::TaskExecutor::RemoteCommandCallbackArgs&)> _callbackFn =
+ [&](const executor::TaskExecutor::RemoteCommandCallbackArgs& result) {
+ {
+ stdx::unique_lock<Latch> lk(_mutex);
+ if (result.response.isOK()) {
+ _responseOutcomeCount._success++;
+ } else {
+ _responseOutcomeCount._failed++;
+ }
+ _replyUpdated = true;
+ }
+
+ _cv.notify_all();
+ };
+};
+
+TEST_F(TaskExecutorFixture, RunExhaustShouldReceiveMultipleResponses) {
+ auto client = _serviceCtx->makeClient("TaskExecutorExhaustTest");
+ auto opCtx = client->makeOperationContext();
+
+ RemoteCommandRequest rcr(unittest::getFixtureConnectionString().getServers().front(),
+ "admin",
+ BSON("isMaster" << 1 << "maxAwaitTimeMS" << 1000 << "topologyVersion"
+ << TopologyVersion(OID::max(), 0).toBSON()),
+ opCtx.get());
+
+ RequestHandlerUtil exhaustRequestHandler;
+ auto swCbHandle = executor()->scheduleExhaustRemoteCommand(
+ std::move(rcr), exhaustRequestHandler.getRequestCallbackFn());
+ ASSERT_OK(swCbHandle.getStatus());
+ auto cbHandle = swCbHandle.getValue();
+
+ {
+ auto counters = exhaustRequestHandler.getCountersWhenReady();
+ ASSERT(cbHandle.isValid());
+
+ // The first response should be successful
+ ASSERT_EQ(counters._success, 1);
+ ASSERT_EQ(counters._failed, 0);
+ }
+
+ {
+ auto counters = exhaustRequestHandler.getCountersWhenReady();
+ ASSERT(cbHandle.isValid());
+
+ // The second response should also be successful
+ ASSERT_EQ(counters._success, 2);
+ ASSERT_EQ(counters._failed, 0);
+ }
+
+ // Cancel the callback
+ ASSERT(cbHandle.isValid());
+ executor()->cancel(cbHandle);
+ ASSERT(cbHandle.isCanceled());
+ auto counters = exhaustRequestHandler.getCountersWhenReady();
+
+ // The command was cancelled so the 'fail' counter should be incremented
+ ASSERT_EQ(counters._success, 2);
+ ASSERT_EQ(counters._failed, 1);
+
+ // The tasks should be removed after 'isMaster' fails
+ ASSERT_TRUE(waitUntilNoTasksOrDeadline(Date_t::now() + Seconds(5)));
+}
+
+TEST_F(TaskExecutorFixture, RunExhaustShouldStopOnFailure) {
+ // Turn on the failCommand failpoint for 'isMaster' on the server that we will schedule
+ // 'isMaster' on below
+ auto failCmdClient = _serviceCtx->makeClient("TaskExecutorExhaustTest");
+ auto opCtx = failCmdClient->makeOperationContext();
+
+ auto configureFailpointCmd = BSON("configureFailPoint"
+ << "failCommand"
+ << "mode"
+ << "alwaysOn"
+ << "data"
+ << BSON("errorCode" << ErrorCodes::CommandFailed
+ << "failCommands"
+ << BSON_ARRAY("isMaster")));
+ RemoteCommandRequest failCmd(unittest::getFixtureConnectionString().getServers().front(),
+ "admin",
+ configureFailpointCmd,
+ opCtx.get());
+ RequestHandlerUtil failCmdRequestHandler;
+ auto swCbHandle = _executor->scheduleRemoteCommand(
+ std::move(failCmd), failCmdRequestHandler.getRequestCallbackFn());
+ ASSERT_OK(swCbHandle.getStatus());
+ auto cbHandle = swCbHandle.getValue();
+
+ // Assert 'configureFailPoint' was successful
+ auto counters = failCmdRequestHandler.getCountersWhenReady();
+ ASSERT(cbHandle.isValid());
+ ASSERT_EQ(counters._success, 1);
+ ASSERT_EQ(counters._failed, 0);
+
+ ON_BLOCK_EXIT([&] {
+ auto stopFpCmd = BSON("configureFailPoint"
+ << "failCommand"
+ << "mode"
+ << "off");
+ RemoteCommandRequest stopFpRequest(
+ unittest::getFixtureConnectionString().getServers().front(),
+ "admin",
+ stopFpCmd,
+ opCtx.get());
+ auto swCbHandle = _executor->scheduleRemoteCommand(
+ std::move(stopFpRequest), failCmdRequestHandler.getRequestCallbackFn());
+
+ // Assert the failpoint is correctly turned off
+ auto counters = failCmdRequestHandler.getCountersWhenReady();
+ ASSERT(cbHandle.isValid());
+
+ ASSERT_EQ(counters._success, 2);
+ ASSERT_EQ(counters._failed, 0);
+ });
+
+ {
+ auto client = _serviceCtx->makeClient("TaskExecutorExhaustTest");
+ auto opCtx = client->makeOperationContext();
+
+ RemoteCommandRequest rcr(unittest::getFixtureConnectionString().getServers().front(),
+ "admin",
+ BSON("isMaster" << 1 << "maxAwaitTimeMS" << 1000
+ << "topologyVersion"
+ << TopologyVersion(OID::max(), 0).toBSON()),
+ opCtx.get());
+
+ RequestHandlerUtil exhaustRequestHandler;
+ auto swCbHandle = executor()->scheduleExhaustRemoteCommand(
+ std::move(rcr), exhaustRequestHandler.getRequestCallbackFn());
+ ASSERT_OK(swCbHandle.getStatus());
+ auto cbHandle = swCbHandle.getValue();
+
+ auto counters = exhaustRequestHandler.getCountersWhenReady();
+
+ // The response should be marked as failed
+ ASSERT_EQ(counters._success, 0);
+ ASSERT_EQ(counters._failed, 1);
+
+ // The tasks should be removed after 'isMaster' fails
+ ASSERT_TRUE(waitUntilNoTasksOrDeadline(Date_t::now() + Seconds(5)));
+ }
+}
+
+} // namespace
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/s/sharding_task_executor.cpp b/src/mongo/s/sharding_task_executor.cpp
index c9c47f9c3f9..8c114162fe5 100644
--- a/src/mongo/s/sharding_task_executor.cpp
+++ b/src/mongo/s/sharding_task_executor.cpp
@@ -257,6 +257,17 @@ StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleRemoteCom
requestWithFixedLsid ? *requestWithFixedLsid : request, shardingCb, baton);
}
+StatusWith<TaskExecutor::CallbackHandle> ShardingTaskExecutor::scheduleExhaustRemoteCommandOnAny(
+ const RemoteCommandRequestOnAny& request,
+ const RemoteCommandOnAnyCallbackFn& cb,
+ const BatonHandle& baton) {
+ MONGO_UNREACHABLE;
+}
+
+bool ShardingTaskExecutor::hasTasks() {
+ MONGO_UNREACHABLE;
+}
+
void ShardingTaskExecutor::cancel(const CallbackHandle& cbHandle) {
_executor->cancel(cbHandle);
}
diff --git a/src/mongo/s/sharding_task_executor.h b/src/mongo/s/sharding_task_executor.h
index 3f763d120cd..bf5b1db067c 100644
--- a/src/mongo/s/sharding_task_executor.h
+++ b/src/mongo/s/sharding_task_executor.h
@@ -72,6 +72,11 @@ public:
const RemoteCommandRequestOnAny& request,
const RemoteCommandOnAnyCallbackFn& cb,
const BatonHandle& baton = nullptr) override;
+ StatusWith<CallbackHandle> scheduleExhaustRemoteCommandOnAny(
+ const RemoteCommandRequestOnAny& request,
+ const RemoteCommandOnAnyCallbackFn& cb,
+ const BatonHandle& baton = nullptr) override;
+ bool hasTasks() override;
void cancel(const CallbackHandle& cbHandle) override;
void wait(const CallbackHandle& cbHandle,
Interruptible* interruptible = Interruptible::notInterruptible()) override;
diff --git a/src/mongo/unittest/task_executor_proxy.cpp b/src/mongo/unittest/task_executor_proxy.cpp
index dd583c17056..911123d375c 100644
--- a/src/mongo/unittest/task_executor_proxy.cpp
+++ b/src/mongo/unittest/task_executor_proxy.cpp
@@ -106,6 +106,18 @@ StatusWith<executor::TaskExecutor::CallbackHandle> TaskExecutorProxy::scheduleRe
return _executor->scheduleRemoteCommandOnAny(request, cb, baton);
}
+StatusWith<executor::TaskExecutor::CallbackHandle>
+TaskExecutorProxy::scheduleExhaustRemoteCommandOnAny(
+ const executor::RemoteCommandRequestOnAny& request,
+ const RemoteCommandOnAnyCallbackFn& cb,
+ const BatonHandle& baton) {
+ return _executor->scheduleExhaustRemoteCommandOnAny(request, cb, baton);
+}
+
+bool TaskExecutorProxy::hasTasks() {
+ return _executor->hasTasks();
+}
+
void TaskExecutorProxy::cancel(const CallbackHandle& cbHandle) {
_executor->cancel(cbHandle);
}
diff --git a/src/mongo/unittest/task_executor_proxy.h b/src/mongo/unittest/task_executor_proxy.h
index 3d37f837e81..fa27e0cde05 100644
--- a/src/mongo/unittest/task_executor_proxy.h
+++ b/src/mongo/unittest/task_executor_proxy.h
@@ -69,6 +69,11 @@ public:
const executor::RemoteCommandRequestOnAny& request,
const RemoteCommandOnAnyCallbackFn& cb,
const BatonHandle& baton = nullptr) override;
+ StatusWith<CallbackHandle> scheduleExhaustRemoteCommandOnAny(
+ const executor::RemoteCommandRequestOnAny& request,
+ const RemoteCommandOnAnyCallbackFn& cb,
+ const BatonHandle& baton = nullptr) override;
+ bool hasTasks() override;
void cancel(const CallbackHandle& cbHandle) override;
void wait(const CallbackHandle& cbHandle,
Interruptible* interruptible = Interruptible::notInterruptible()) override;