summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2019-04-18 17:05:41 -0400
committerJason Carey <jcarey@argv.me>2019-04-30 23:48:45 -0400
commitcf2e2ab2d90a746340e323c19bcedebaf944a5bd (patch)
treed9e5b85cade3c2bb6a0f5123e07dbbd5538125ea /src/mongo/executor
parent490d309fe77fc031cbe873d58be5d23254e15e36 (diff)
downloadmongo-cf2e2ab2d90a746340e323c19bcedebaf944a5bd.tar.gz
SERVER-40722 Add ScopedTaskExecutor
Add a scoped task executor, which wraps up an executor and collects its callbacks. This type, when destroyed, will cancel all outstanding callbacks and refuse further work.
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/SConscript21
-rw-r--r--src/mongo/executor/scoped_task_executor.cpp323
-rw-r--r--src/mongo/executor/scoped_task_executor.h116
-rw-r--r--src/mongo/executor/scoped_task_executor_test.cpp320
4 files changed, 780 insertions, 0 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript
index 2da4b4e18a6..7c015cb39f3 100644
--- a/src/mongo/executor/SConscript
+++ b/src/mongo/executor/SConscript
@@ -51,6 +51,15 @@ env.Library(target='task_executor_interface',
'remote_command',
])
+env.Library(target='scoped_task_executor',
+ source=[
+ 'scoped_task_executor.cpp',
+ ],
+ LIBDEPS=[
+ 'task_executor_interface',
+ '$BUILD_DIR/mongo/util/fail_point',
+ ])
+
env.Library(target='network_interface',
source=['network_interface.cpp',],
LIBDEPS=[
@@ -327,3 +336,15 @@ env.CppIntegrationTest(
'$BUILD_DIR/mongo/util/version_impl',
],
)
+
+env.CppUnitTest(
+ target='scoped_task_executor_test',
+ source=[
+ 'scoped_task_executor_test.cpp',
+ ],
+ LIBDEPS=[
+ 'scoped_task_executor',
+ 'network_interface_mock',
+ 'thread_pool_task_executor',
+ ],
+)
diff --git a/src/mongo/executor/scoped_task_executor.cpp b/src/mongo/executor/scoped_task_executor.cpp
new file mode 100644
index 00000000000..c8663025e12
--- /dev/null
+++ b/src/mongo/executor/scoped_task_executor.cpp
@@ -0,0 +1,323 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/executor/scoped_task_executor.h"
+
+namespace mongo {
+namespace executor {
+
+MONGO_FAIL_POINT_DEFINE(ScopedTaskExecutorHangBeforeSchedule);
+MONGO_FAIL_POINT_DEFINE(ScopedTaskExecutorHangExitBeforeSchedule);
+MONGO_FAIL_POINT_DEFINE(ScopedTaskExecutorHangAfterSchedule);
+
+/**
+ * Implements the wrapping indirection needed to satisfy the ScopedTaskExecutor contract. Note
+ * that at least shutdown() must be called on this type before destruction.
+ */
+class ScopedTaskExecutor::Impl : public std::enable_shared_from_this<ScopedTaskExecutor::Impl>,
+ public TaskExecutor {
+
+ static const inline auto kShutdownStatus =
+ Status(ErrorCodes::ShutdownInProgress, "Shutting down ScopedTaskExecutor::Impl");
+
+public:
+ explicit Impl(TaskExecutor* executor) : _executor(executor) {}
+
+ ~Impl() {
+ // The ScopedTaskExecutor dtor calls shutdown, so this is guaranteed.
+ invariant(_inShutdown);
+ }
+
+ void startup() override {
+ MONGO_UNREACHABLE;
+ }
+
+ void shutdown() override {
+ auto handles = [&] {
+ stdx::lock_guard lk(_mutex);
+ _inShutdown = true;
+
+ return _cbHandles;
+ }();
+
+ for (auto & [ id, handle ] : handles) {
+ // If we don't have a handle yet, it means there's a scheduling thread that's
+ // dropped the lock but hasn't yet stashed it (or failed to schedule it on the
+ // underlying executor).
+ //
+ // See _wrapCallback for how the scheduling thread handles those cases.
+ if (handle) {
+ _executor->cancel(handle);
+ }
+ }
+ }
+
+ void join() override {
+ stdx::unique_lock lk(_mutex);
+ _cv.wait(lk, [&] { return _inShutdown && _cbHandles.empty(); });
+ }
+
+ void appendDiagnosticBSON(BSONObjBuilder* b) const override {
+ MONGO_UNREACHABLE;
+ }
+
+ Date_t now() override {
+ return _executor->now();
+ }
+
+ StatusWith<EventHandle> makeEvent() override {
+ if (stdx::lock_guard lk(_mutex); _inShutdown) {
+ return kShutdownStatus;
+ }
+
+ return _executor->makeEvent();
+ }
+
+ void signalEvent(const EventHandle& event) override {
+ return _executor->signalEvent(event);
+ }
+
+ StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) override {
+ return _wrapCallback([&](auto&& x) { return _executor->onEvent(event, std::move(x)); },
+ std::move(work));
+ }
+
+ void waitForEvent(const EventHandle& event) override {
+ return _executor->waitForEvent(event);
+ }
+
+ StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx,
+ const EventHandle& event,
+ Date_t deadline = Date_t::max()) override {
+ return _executor->waitForEvent(opCtx, event, deadline);
+ }
+
+ StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override {
+ return _wrapCallback([&](auto&& x) { return _executor->scheduleWork(std::move(x)); },
+ std::move(work));
+ }
+
+ StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override {
+ return _wrapCallback(
+ [&](auto&& x) { return _executor->scheduleWorkAt(when, std::move(x)); },
+ std::move(work));
+ }
+
+ StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request,
+ const RemoteCommandCallbackFn& cb,
+ const BatonHandle& baton = nullptr) override {
+ return _wrapCallback(
+ [&](auto&& x) {
+ return _executor->scheduleRemoteCommand(request, std::move(x), baton);
+ },
+ cb);
+ }
+
+ void cancel(const CallbackHandle& cbHandle) override {
+ return _executor->cancel(cbHandle);
+ }
+
+ void wait(const CallbackHandle& cbHandle,
+ Interruptible* interruptible = Interruptible::notInterruptible()) override {
+ return _executor->wait(cbHandle, interruptible);
+ }
+
+ void appendConnectionStats(ConnectionPoolStats* stats) const override {
+ MONGO_UNREACHABLE;
+ }
+
+private:
+ /**
+ * Wraps a scheduling call, along with its callback, so that:
+ *
+ * 1. If the callback is run, it is invoked with a not-okay argument if this task executor or
+ * the underlying one has been shutdown.
+ * 2. The callback handle that is returned from the call to schedule is collected and
+ * canceled, if this object is shutdown before the callback is invoked.
+ *
+ * Theory of operation for shutdown/join
+ *
+ * All callbacks that are wrapped by this method are in 1 of 5 states:
+ *
+ * 1. Haven't yet acquired the first lock, no recorded state.
+ *
+ * 2. Have stashed an entry in the _cbHandles table, but with an unset callback handle.
+ * 2.a. We successfully schedule and record the callback handle after the fact.
+ * 2.b. We fail to schedule, requiring us to erase recorded state directly from
+ * _wrapCallback.
+ *
+ * 3. Acquired the lock after calling schedule before the callback ran. Callback handle is
+ * in the _cbHandles table and the task is cancellable.
+ *
+ * 4. Ran the callback before stashing the callback handle. No entry in the table and we
+ * won't stash the handle on exit.
+ *
+ * What happens in shutdown (I.e. when _inShutdown is set):
+ *
+ * 1. Nothing. We never record any values and return immediately with a not-ok status
+ * without running the task
+ *
+ * 2. We have an entry in the table, but no callback handle.
+ * 2.a. The scheduling thread will notice _inShutdown after calling schedule and will cancel
+ * it on the way out. The execution of the task will remove the entry and notify.
+ * 2.b. The scheduling thread will see that the underlying executor failed and
+ * remove/notify.
+ *
+ * 3. We'll call cancel in shutdown. The task will remote/notify.
+ *
+ * 4. The task has already completed and removed itself from the table.
+ */
+ template <typename ScheduleCall, typename Work>
+ StatusWith<CallbackHandle> _wrapCallback(ScheduleCall&& schedule, Work&& work) {
+ size_t id;
+
+ // State 1 - No Data
+ {
+ stdx::lock_guard lk(_mutex);
+
+ // No clean up needed because we never ran or recorded anything
+ if (_inShutdown) {
+ return kShutdownStatus;
+ }
+
+ id = _id++;
+
+ _cbHandles.emplace(
+ std::piecewise_construct, std::forward_as_tuple(id), std::forward_as_tuple());
+ };
+
+ if (MONGO_FAIL_POINT(ScopedTaskExecutorHangBeforeSchedule)) {
+ ScopedTaskExecutorHangBeforeSchedule.setMode(FailPoint::off);
+
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(ScopedTaskExecutorHangExitBeforeSchedule);
+ }
+
+ // State 2 - Indeterminate state. We don't know yet if the task will get scheduled.
+ auto swCbHandle = std::forward<ScheduleCall>(schedule)(
+ [ id, work = std::forward<Work>(work), self = shared_from_this() ](const auto& cargs) {
+ using ArgsT = std::decay_t<decltype(cargs)>;
+
+ stdx::unique_lock<stdx::mutex> lk(self->_mutex);
+
+ auto doWorkAndNotify = [&](const ArgsT& x) noexcept {
+ lk.unlock();
+ work(x);
+ lk.lock();
+
+ // After we've run the task, we erase and notify. Sometimes that happens
+ // before we stash the cbHandle.
+ self->_eraseAndNotifyIfNeeded(lk, id);
+ };
+
+ if (!self->_inShutdown) {
+ doWorkAndNotify(cargs);
+ return;
+ }
+
+ // Have to copy args because we get the arguments by const& and need to
+ // modify the status field.
+ auto args = cargs;
+
+ IF_CONSTEXPR(std::is_same_v<ArgsT, CallbackArgs>) {
+ args.status = kShutdownStatus;
+ }
+ else {
+ static_assert(std::is_same_v<ArgsT, RemoteCommandCallbackArgs>,
+ "_wrapCallback only supports CallbackArgs and "
+ "RemoteCommandCallbackArgs");
+ args.response.status = kShutdownStatus;
+ }
+
+ doWorkAndNotify(args);
+ });
+
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(ScopedTaskExecutorHangAfterSchedule);
+
+ stdx::unique_lock lk(_mutex);
+
+ if (!swCbHandle.isOK()) {
+ // State 2.b - Failed to schedule
+ _eraseAndNotifyIfNeeded(lk, id);
+ return swCbHandle;
+ }
+
+ // State 2.a - Scheduled, but haven't stashed the cbHandle
+
+ if (_inShutdown) {
+ // If we're in shutdown, the caller of shutdown has cancelled all the handles it had
+ // available (which doesn't include this one). So we're responsible for calling
+ // cancel().
+ //
+ // Note that the task will handle remove/notify
+ lk.unlock();
+ _executor->cancel(swCbHandle.getValue());
+
+ return swCbHandle;
+ }
+
+ if (auto iter = _cbHandles.find(id); iter != _cbHandles.end()) {
+ // State 3 - Handle stashed
+ iter->second = swCbHandle.getValue();
+ } else {
+ // State 4 - Callback ran before we got here.
+ }
+
+ return swCbHandle;
+ }
+
+ void _eraseAndNotifyIfNeeded(WithLock, size_t id) {
+ invariant(_cbHandles.erase(id) == 1);
+
+ if (_inShutdown && _cbHandles.empty()) {
+ _cv.notify_all();
+ }
+ }
+
+ stdx::mutex _mutex;
+ bool _inShutdown = false;
+ TaskExecutor* const _executor;
+ size_t _id = 0;
+ stdx::unordered_map<size_t, CallbackHandle> _cbHandles;
+
+ // condition variable that callers of join wait on and outstanding callbacks potentially
+ // notify
+ stdx::condition_variable _cv;
+};
+
+ScopedTaskExecutor::ScopedTaskExecutor(TaskExecutor* executor)
+ : _executor(std::make_shared<Impl>(executor)) {}
+
+ScopedTaskExecutor::~ScopedTaskExecutor() {
+ _executor->shutdown();
+}
+
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/scoped_task_executor.h b/src/mongo/executor/scoped_task_executor.h
new file mode 100644
index 00000000000..122308e489a
--- /dev/null
+++ b/src/mongo/executor/scoped_task_executor.h
@@ -0,0 +1,116 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/optional.hpp>
+#include <memory>
+
+#include "mongo/base/status.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/unordered_map.h"
+#include "mongo/util/fail_point_service.h"
+#include "mongo/util/if_constexpr.h"
+
+namespace mongo {
+
+class OperationContext;
+
+namespace executor {
+
+/**
+ * Implements a scoped task executor which collects all callback handles it receives as part of
+ * running operations and cancels any outstanding ones on destruction.
+ *
+ * The intent is that you can use this type with arbitrary task executor taking functions and allow
+ * this object's destruction to clean up any methods which returned callback handles.
+ *
+ * Note that while this type provides access to a TaskExecutor*, certain methods are illegal to
+ * call:
+ * - startup()
+ * - appendDiagnosticBSON()
+ * - appendConnectionStats()
+ *
+ * And certain other methods only pass through this class to the underlying executor:
+ * - makeEvent()
+ * will return a not-ok status after shutdown, but does not otherwise instrument the event
+ * - signalEvent()
+ * always signals
+ * - waitForEvent()
+ * always waits for the event
+ * - cancel()
+ * always cancels the task
+ * - wait()
+ * always waits for the task
+ * - now()
+ * always returns the time
+ *
+ * This leaves the various callback handle returning methods + join/shutdown. These methods, rather
+ * than performing a passthrough, address only tasks dispatched through this executor, rather than
+ * passing through to the underlying executor.
+ *
+ * Also note that this type DOES NOT call join in its dtor. I.e. all tasks are cancelled, but all
+ * callbacks may not have run when this object goes away. You must call join() if you want that
+ * guarantee.
+ */
+class ScopedTaskExecutor {
+public:
+ explicit ScopedTaskExecutor(TaskExecutor* executor);
+
+ // Delete all move/copy-ability
+ ScopedTaskExecutor(TaskExecutor&&) = delete;
+
+ ~ScopedTaskExecutor();
+
+ operator TaskExecutor*() const {
+ return _executor.get();
+ }
+
+ const std::shared_ptr<TaskExecutor>& operator*() const {
+ return _executor;
+ }
+
+ TaskExecutor* operator->() const {
+ return _executor.get();
+ }
+
+private:
+ class Impl;
+
+ std::shared_ptr<TaskExecutor> _executor;
+};
+
+MONGO_FAIL_POINT_DECLARE(ScopedTaskExecutorHangBeforeSchedule);
+MONGO_FAIL_POINT_DECLARE(ScopedTaskExecutorHangExitBeforeSchedule);
+MONGO_FAIL_POINT_DECLARE(ScopedTaskExecutorHangAfterSchedule);
+
+} // namespace executor
+} // namespace mongo
diff --git a/src/mongo/executor/scoped_task_executor_test.cpp b/src/mongo/executor/scoped_task_executor_test.cpp
new file mode 100644
index 00000000000..5ce217ecddf
--- /dev/null
+++ b/src/mongo/executor/scoped_task_executor_test.cpp
@@ -0,0 +1,320 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/executor/scoped_task_executor.h"
+
+#include "mongo/executor/network_interface_mock.h"
+#include "mongo/executor/thread_pool_mock.h"
+#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/future.h"
+
+namespace mongo {
+namespace executor {
+namespace {
+
+class ScopedTaskExecutorTest : public unittest::Test {
+public:
+ void setUp() override {
+ auto net = std::make_unique<NetworkInterfaceMock>();
+ _net = net.get();
+ _tpte.emplace(std::make_unique<ThreadPoolMock>(_net, 1, ThreadPoolMock::Options{}),
+ std::move(net));
+ _tpte->startup();
+ _executor.emplace(_tpte.get_ptr());
+ }
+
+ void tearDown() override {
+ _net->exitNetwork();
+
+ if (_executor) {
+ (*_executor)->shutdown();
+ (*_executor)->join();
+ _executor.reset();
+ }
+ _tpte.reset();
+ }
+
+ static inline thread_local bool isInline = false;
+
+ void scheduleWork(Promise<void>& promise) {
+ isInline = true;
+ ASSERT(getExecutor()
+ ->scheduleWork([&](const TaskExecutor::CallbackArgs& ca) {
+ ASSERT_FALSE(isInline);
+ if (ca.status.isOK()) {
+ promise.emplaceValue();
+ } else {
+ promise.setError(ca.status);
+ }
+ })
+ .getStatus()
+ .isOK());
+ isInline = false;
+ }
+
+ void scheduleRemoteCommand(Promise<void>& promise) {
+ RemoteCommandRequest rcr(HostAndPort("localhost"), "test", BSONObj(), nullptr);
+
+ isInline = true;
+ ASSERT(getExecutor()
+ ->scheduleRemoteCommand(rcr,
+ [&](const TaskExecutor::RemoteCommandCallbackArgs& ca) {
+ ASSERT_FALSE(isInline);
+ if (ca.response.status.isOK()) {
+ promise.emplaceValue();
+ } else {
+ promise.setError(ca.response.status);
+ }
+ })
+ .getStatus()
+ .isOK());
+ isInline = false;
+ }
+
+ void resetExecutor() {
+ _executor.reset();
+ }
+
+ void shutdownUnderlying() {
+ _tpte->shutdown();
+ }
+
+ ScopedTaskExecutor& getExecutor() {
+ return *_executor;
+ }
+
+ NetworkInterfaceMock* getNet() {
+ return _net;
+ }
+
+private:
+ NetworkInterfaceMock* _net;
+ boost::optional<ThreadPoolTaskExecutor> _tpte;
+ boost::optional<ScopedTaskExecutor> _executor;
+};
+
+TEST_F(ScopedTaskExecutorTest, onEvent) {
+ auto pf = makePromiseFuture<void>();
+
+ auto event = uassertStatusOK(getExecutor()->makeEvent());
+
+ ASSERT(getExecutor()
+ ->onEvent(event,
+ [&](const TaskExecutor::CallbackArgs& ca) {
+ if (ca.status.isOK()) {
+ pf.promise.emplaceValue();
+ } else {
+ pf.promise.setError(ca.status);
+ }
+ })
+ .getStatus()
+ .isOK());
+
+ ASSERT_FALSE(pf.future.isReady());
+
+ getExecutor()->signalEvent(event);
+
+ ASSERT_OK(pf.future.getNoThrow());
+}
+
+TEST_F(ScopedTaskExecutorTest, scheduleWork) {
+ auto pf = makePromiseFuture<void>();
+
+ ASSERT(getExecutor()
+ ->scheduleWork([&](const TaskExecutor::CallbackArgs& ca) {
+ if (ca.status.isOK()) {
+ pf.promise.emplaceValue();
+ } else {
+ pf.promise.setError(ca.status);
+ }
+ })
+ .getStatus()
+ .isOK());
+
+ ASSERT_OK(pf.future.getNoThrow());
+}
+
+TEST_F(ScopedTaskExecutorTest, scheduleWorkAt) {
+ auto pf = makePromiseFuture<void>();
+
+ ASSERT(getExecutor()
+ ->scheduleWorkAt(getExecutor()->now(),
+ [&](const TaskExecutor::CallbackArgs& ca) {
+ if (ca.status.isOK()) {
+ pf.promise.emplaceValue();
+ } else {
+ pf.promise.setError(ca.status);
+ }
+ })
+ .getStatus()
+ .isOK());
+
+ ASSERT_OK(pf.future.getNoThrow());
+}
+
+TEST_F(ScopedTaskExecutorTest, scheduleRemoteCommand) {
+ auto pf = makePromiseFuture<void>();
+
+ scheduleRemoteCommand(pf.promise);
+ {
+ NetworkInterfaceMock::InNetworkGuard ing(getNet());
+ ASSERT(getNet()->hasReadyRequests());
+ getNet()->scheduleSuccessfulResponse(BSONObj());
+ getNet()->runReadyNetworkOperations();
+ }
+
+ ASSERT_OK(pf.future.getNoThrow());
+}
+
+// Fully run the callback before capturing the callback handle
+TEST_F(ScopedTaskExecutorTest, scheduleLoseRaceWithSuccess) {
+ auto resultPf = makePromiseFuture<void>();
+ auto schedulePf = makePromiseFuture<void>();
+
+ auto& fp = ScopedTaskExecutorHangAfterSchedule;
+ fp.setMode(FailPoint::alwaysOn);
+
+ stdx::thread scheduler([&] {
+ scheduleWork(resultPf.promise);
+ schedulePf.promise.emplaceValue();
+ });
+
+ ASSERT_OK(resultPf.future.getNoThrow());
+ ASSERT_FALSE(schedulePf.future.isReady());
+
+ fp.setMode(FailPoint::off);
+ ASSERT_OK(schedulePf.future.getNoThrow());
+
+ scheduler.join();
+}
+
+// Stash the handle before running the callback
+TEST_F(ScopedTaskExecutorTest, scheduleWinRaceWithSuccess) {
+ auto resultPf = makePromiseFuture<void>();
+ auto schedulePf = makePromiseFuture<void>();
+
+ getNet()->enterNetwork();
+
+ stdx::thread scheduler([&] {
+ scheduleWork(resultPf.promise);
+ schedulePf.promise.emplaceValue();
+ });
+
+ ASSERT_OK(schedulePf.future.getNoThrow());
+
+ ASSERT_FALSE(resultPf.future.isReady());
+
+ getNet()->exitNetwork();
+
+ ASSERT_OK(resultPf.future.getNoThrow());
+
+ scheduler.join();
+}
+
+// Schedule on the underlying, but are shut down when we execute our wrapping callback
+TEST_F(ScopedTaskExecutorTest, scheduleLoseRaceWithShutdown) {
+ auto resultPf = makePromiseFuture<void>();
+ auto schedulePf = makePromiseFuture<void>();
+
+ getNet()->enterNetwork();
+
+ scheduleWork(resultPf.promise);
+
+ ASSERT_FALSE(resultPf.future.isReady());
+
+ getExecutor()->shutdown();
+ getNet()->exitNetwork();
+
+ ASSERT_EQUALS(resultPf.future.getNoThrow(), ErrorCodes::ShutdownInProgress);
+}
+
+// ScheduleRemoteCommand on the underlying, but are shut down when we execute our wrapping callback
+TEST_F(ScopedTaskExecutorTest, scheduleRemoteCommandLoseRaceWithShutdown) {
+ auto pf = makePromiseFuture<void>();
+
+ scheduleRemoteCommand(pf.promise);
+ {
+ NetworkInterfaceMock::InNetworkGuard ing(getNet());
+ ASSERT(getNet()->hasReadyRequests());
+ getNet()->scheduleSuccessfulResponse(BSONObj());
+ getExecutor()->shutdown();
+ getNet()->runReadyNetworkOperations();
+ }
+
+ ASSERT_EQUALS(pf.future.getNoThrow(), ErrorCodes::ShutdownInProgress);
+}
+
+// Fail to schedule on the underlying
+TEST_F(ScopedTaskExecutorTest, scheduleLoseRaceWithShutdownOfUnderlying) {
+ auto& bfp = ScopedTaskExecutorHangBeforeSchedule;
+ auto& efp = ScopedTaskExecutorHangExitBeforeSchedule;
+ bfp.setMode(FailPoint::alwaysOn);
+ efp.setMode(FailPoint::alwaysOn);
+
+ stdx::thread scheduler([&] {
+ ASSERT_FALSE(
+ getExecutor()
+ ->scheduleWork([&](const TaskExecutor::CallbackArgs& ca) { MONGO_UNREACHABLE; })
+ .getStatus()
+ .isOK());
+ });
+
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET((bfp));
+
+ shutdownUnderlying();
+
+ efp.setMode(FailPoint::off);
+
+ scheduler.join();
+}
+
+TEST_F(ScopedTaskExecutorTest, DestructionShutsDown) {
+ auto pf = makePromiseFuture<void>();
+
+ {
+ NetworkInterfaceMock::InNetworkGuard ing(getNet());
+
+ scheduleWork(pf.promise);
+
+ ASSERT_FALSE(pf.future.isReady());
+
+ resetExecutor();
+
+ ASSERT_FALSE(pf.future.isReady());
+ }
+
+ ASSERT_EQUALS(pf.future.getNoThrow(), ErrorCodes::ShutdownInProgress);
+}
+
+} // namespace
+} // namespace executor
+} // namespace mongo