diff options
author | Jason Carey <jcarey@argv.me> | 2019-04-18 17:05:41 -0400 |
---|---|---|
committer | Jason Carey <jcarey@argv.me> | 2019-04-30 23:48:45 -0400 |
commit | cf2e2ab2d90a746340e323c19bcedebaf944a5bd (patch) | |
tree | d9e5b85cade3c2bb6a0f5123e07dbbd5538125ea /src/mongo/executor | |
parent | 490d309fe77fc031cbe873d58be5d23254e15e36 (diff) | |
download | mongo-cf2e2ab2d90a746340e323c19bcedebaf944a5bd.tar.gz |
SERVER-40722 Add ScopedTaskExecutor
Add a scoped task executor, which wraps up an executor and collects its
callbacks.
This type, when destroyed, will cancel all outstanding callbacks and
refuse further work.
Diffstat (limited to 'src/mongo/executor')
-rw-r--r-- | src/mongo/executor/SConscript | 21 | ||||
-rw-r--r-- | src/mongo/executor/scoped_task_executor.cpp | 323 | ||||
-rw-r--r-- | src/mongo/executor/scoped_task_executor.h | 116 | ||||
-rw-r--r-- | src/mongo/executor/scoped_task_executor_test.cpp | 320 |
4 files changed, 780 insertions, 0 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript index 2da4b4e18a6..7c015cb39f3 100644 --- a/src/mongo/executor/SConscript +++ b/src/mongo/executor/SConscript @@ -51,6 +51,15 @@ env.Library(target='task_executor_interface', 'remote_command', ]) +env.Library(target='scoped_task_executor', + source=[ + 'scoped_task_executor.cpp', + ], + LIBDEPS=[ + 'task_executor_interface', + '$BUILD_DIR/mongo/util/fail_point', + ]) + env.Library(target='network_interface', source=['network_interface.cpp',], LIBDEPS=[ @@ -327,3 +336,15 @@ env.CppIntegrationTest( '$BUILD_DIR/mongo/util/version_impl', ], ) + +env.CppUnitTest( + target='scoped_task_executor_test', + source=[ + 'scoped_task_executor_test.cpp', + ], + LIBDEPS=[ + 'scoped_task_executor', + 'network_interface_mock', + 'thread_pool_task_executor', + ], +) diff --git a/src/mongo/executor/scoped_task_executor.cpp b/src/mongo/executor/scoped_task_executor.cpp new file mode 100644 index 00000000000..c8663025e12 --- /dev/null +++ b/src/mongo/executor/scoped_task_executor.cpp @@ -0,0 +1,323 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/executor/scoped_task_executor.h" + +namespace mongo { +namespace executor { + +MONGO_FAIL_POINT_DEFINE(ScopedTaskExecutorHangBeforeSchedule); +MONGO_FAIL_POINT_DEFINE(ScopedTaskExecutorHangExitBeforeSchedule); +MONGO_FAIL_POINT_DEFINE(ScopedTaskExecutorHangAfterSchedule); + +/** + * Implements the wrapping indirection needed to satisfy the ScopedTaskExecutor contract. Note + * that at least shutdown() must be called on this type before destruction. + */ +class ScopedTaskExecutor::Impl : public std::enable_shared_from_this<ScopedTaskExecutor::Impl>, + public TaskExecutor { + + static const inline auto kShutdownStatus = + Status(ErrorCodes::ShutdownInProgress, "Shutting down ScopedTaskExecutor::Impl"); + +public: + explicit Impl(TaskExecutor* executor) : _executor(executor) {} + + ~Impl() { + // The ScopedTaskExecutor dtor calls shutdown, so this is guaranteed. + invariant(_inShutdown); + } + + void startup() override { + MONGO_UNREACHABLE; + } + + void shutdown() override { + auto handles = [&] { + stdx::lock_guard lk(_mutex); + _inShutdown = true; + + return _cbHandles; + }(); + + for (auto & [ id, handle ] : handles) { + // If we don't have a handle yet, it means there's a scheduling thread that's + // dropped the lock but hasn't yet stashed it (or failed to schedule it on the + // underlying executor). + // + // See _wrapCallback for how the scheduling thread handles those cases. + if (handle) { + _executor->cancel(handle); + } + } + } + + void join() override { + stdx::unique_lock lk(_mutex); + _cv.wait(lk, [&] { return _inShutdown && _cbHandles.empty(); }); + } + + void appendDiagnosticBSON(BSONObjBuilder* b) const override { + MONGO_UNREACHABLE; + } + + Date_t now() override { + return _executor->now(); + } + + StatusWith<EventHandle> makeEvent() override { + if (stdx::lock_guard lk(_mutex); _inShutdown) { + return kShutdownStatus; + } + + return _executor->makeEvent(); + } + + void signalEvent(const EventHandle& event) override { + return _executor->signalEvent(event); + } + + StatusWith<CallbackHandle> onEvent(const EventHandle& event, CallbackFn work) override { + return _wrapCallback([&](auto&& x) { return _executor->onEvent(event, std::move(x)); }, + std::move(work)); + } + + void waitForEvent(const EventHandle& event) override { + return _executor->waitForEvent(event); + } + + StatusWith<stdx::cv_status> waitForEvent(OperationContext* opCtx, + const EventHandle& event, + Date_t deadline = Date_t::max()) override { + return _executor->waitForEvent(opCtx, event, deadline); + } + + StatusWith<CallbackHandle> scheduleWork(CallbackFn work) override { + return _wrapCallback([&](auto&& x) { return _executor->scheduleWork(std::move(x)); }, + std::move(work)); + } + + StatusWith<CallbackHandle> scheduleWorkAt(Date_t when, CallbackFn work) override { + return _wrapCallback( + [&](auto&& x) { return _executor->scheduleWorkAt(when, std::move(x)); }, + std::move(work)); + } + + StatusWith<CallbackHandle> scheduleRemoteCommand(const RemoteCommandRequest& request, + const RemoteCommandCallbackFn& cb, + const BatonHandle& baton = nullptr) override { + return _wrapCallback( + [&](auto&& x) { + return _executor->scheduleRemoteCommand(request, std::move(x), baton); + }, + cb); + } + + void cancel(const CallbackHandle& cbHandle) override { + return _executor->cancel(cbHandle); + } + + void wait(const CallbackHandle& cbHandle, + Interruptible* interruptible = Interruptible::notInterruptible()) override { + return _executor->wait(cbHandle, interruptible); + } + + void appendConnectionStats(ConnectionPoolStats* stats) const override { + MONGO_UNREACHABLE; + } + +private: + /** + * Wraps a scheduling call, along with its callback, so that: + * + * 1. If the callback is run, it is invoked with a not-okay argument if this task executor or + * the underlying one has been shutdown. + * 2. The callback handle that is returned from the call to schedule is collected and + * canceled, if this object is shutdown before the callback is invoked. + * + * Theory of operation for shutdown/join + * + * All callbacks that are wrapped by this method are in 1 of 5 states: + * + * 1. Haven't yet acquired the first lock, no recorded state. + * + * 2. Have stashed an entry in the _cbHandles table, but with an unset callback handle. + * 2.a. We successfully schedule and record the callback handle after the fact. + * 2.b. We fail to schedule, requiring us to erase recorded state directly from + * _wrapCallback. + * + * 3. Acquired the lock after calling schedule before the callback ran. Callback handle is + * in the _cbHandles table and the task is cancellable. + * + * 4. Ran the callback before stashing the callback handle. No entry in the table and we + * won't stash the handle on exit. + * + * What happens in shutdown (I.e. when _inShutdown is set): + * + * 1. Nothing. We never record any values and return immediately with a not-ok status + * without running the task + * + * 2. We have an entry in the table, but no callback handle. + * 2.a. The scheduling thread will notice _inShutdown after calling schedule and will cancel + * it on the way out. The execution of the task will remove the entry and notify. + * 2.b. The scheduling thread will see that the underlying executor failed and + * remove/notify. + * + * 3. We'll call cancel in shutdown. The task will remote/notify. + * + * 4. The task has already completed and removed itself from the table. + */ + template <typename ScheduleCall, typename Work> + StatusWith<CallbackHandle> _wrapCallback(ScheduleCall&& schedule, Work&& work) { + size_t id; + + // State 1 - No Data + { + stdx::lock_guard lk(_mutex); + + // No clean up needed because we never ran or recorded anything + if (_inShutdown) { + return kShutdownStatus; + } + + id = _id++; + + _cbHandles.emplace( + std::piecewise_construct, std::forward_as_tuple(id), std::forward_as_tuple()); + }; + + if (MONGO_FAIL_POINT(ScopedTaskExecutorHangBeforeSchedule)) { + ScopedTaskExecutorHangBeforeSchedule.setMode(FailPoint::off); + + MONGO_FAIL_POINT_PAUSE_WHILE_SET(ScopedTaskExecutorHangExitBeforeSchedule); + } + + // State 2 - Indeterminate state. We don't know yet if the task will get scheduled. + auto swCbHandle = std::forward<ScheduleCall>(schedule)( + [ id, work = std::forward<Work>(work), self = shared_from_this() ](const auto& cargs) { + using ArgsT = std::decay_t<decltype(cargs)>; + + stdx::unique_lock<stdx::mutex> lk(self->_mutex); + + auto doWorkAndNotify = [&](const ArgsT& x) noexcept { + lk.unlock(); + work(x); + lk.lock(); + + // After we've run the task, we erase and notify. Sometimes that happens + // before we stash the cbHandle. + self->_eraseAndNotifyIfNeeded(lk, id); + }; + + if (!self->_inShutdown) { + doWorkAndNotify(cargs); + return; + } + + // Have to copy args because we get the arguments by const& and need to + // modify the status field. + auto args = cargs; + + IF_CONSTEXPR(std::is_same_v<ArgsT, CallbackArgs>) { + args.status = kShutdownStatus; + } + else { + static_assert(std::is_same_v<ArgsT, RemoteCommandCallbackArgs>, + "_wrapCallback only supports CallbackArgs and " + "RemoteCommandCallbackArgs"); + args.response.status = kShutdownStatus; + } + + doWorkAndNotify(args); + }); + + MONGO_FAIL_POINT_PAUSE_WHILE_SET(ScopedTaskExecutorHangAfterSchedule); + + stdx::unique_lock lk(_mutex); + + if (!swCbHandle.isOK()) { + // State 2.b - Failed to schedule + _eraseAndNotifyIfNeeded(lk, id); + return swCbHandle; + } + + // State 2.a - Scheduled, but haven't stashed the cbHandle + + if (_inShutdown) { + // If we're in shutdown, the caller of shutdown has cancelled all the handles it had + // available (which doesn't include this one). So we're responsible for calling + // cancel(). + // + // Note that the task will handle remove/notify + lk.unlock(); + _executor->cancel(swCbHandle.getValue()); + + return swCbHandle; + } + + if (auto iter = _cbHandles.find(id); iter != _cbHandles.end()) { + // State 3 - Handle stashed + iter->second = swCbHandle.getValue(); + } else { + // State 4 - Callback ran before we got here. + } + + return swCbHandle; + } + + void _eraseAndNotifyIfNeeded(WithLock, size_t id) { + invariant(_cbHandles.erase(id) == 1); + + if (_inShutdown && _cbHandles.empty()) { + _cv.notify_all(); + } + } + + stdx::mutex _mutex; + bool _inShutdown = false; + TaskExecutor* const _executor; + size_t _id = 0; + stdx::unordered_map<size_t, CallbackHandle> _cbHandles; + + // condition variable that callers of join wait on and outstanding callbacks potentially + // notify + stdx::condition_variable _cv; +}; + +ScopedTaskExecutor::ScopedTaskExecutor(TaskExecutor* executor) + : _executor(std::make_shared<Impl>(executor)) {} + +ScopedTaskExecutor::~ScopedTaskExecutor() { + _executor->shutdown(); +} + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/scoped_task_executor.h b/src/mongo/executor/scoped_task_executor.h new file mode 100644 index 00000000000..122308e489a --- /dev/null +++ b/src/mongo/executor/scoped_task_executor.h @@ -0,0 +1,116 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/optional.hpp> +#include <memory> + +#include "mongo/base/status.h" +#include "mongo/executor/task_executor.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" +#include "mongo/stdx/unordered_map.h" +#include "mongo/util/fail_point_service.h" +#include "mongo/util/if_constexpr.h" + +namespace mongo { + +class OperationContext; + +namespace executor { + +/** + * Implements a scoped task executor which collects all callback handles it receives as part of + * running operations and cancels any outstanding ones on destruction. + * + * The intent is that you can use this type with arbitrary task executor taking functions and allow + * this object's destruction to clean up any methods which returned callback handles. + * + * Note that while this type provides access to a TaskExecutor*, certain methods are illegal to + * call: + * - startup() + * - appendDiagnosticBSON() + * - appendConnectionStats() + * + * And certain other methods only pass through this class to the underlying executor: + * - makeEvent() + * will return a not-ok status after shutdown, but does not otherwise instrument the event + * - signalEvent() + * always signals + * - waitForEvent() + * always waits for the event + * - cancel() + * always cancels the task + * - wait() + * always waits for the task + * - now() + * always returns the time + * + * This leaves the various callback handle returning methods + join/shutdown. These methods, rather + * than performing a passthrough, address only tasks dispatched through this executor, rather than + * passing through to the underlying executor. + * + * Also note that this type DOES NOT call join in its dtor. I.e. all tasks are cancelled, but all + * callbacks may not have run when this object goes away. You must call join() if you want that + * guarantee. + */ +class ScopedTaskExecutor { +public: + explicit ScopedTaskExecutor(TaskExecutor* executor); + + // Delete all move/copy-ability + ScopedTaskExecutor(TaskExecutor&&) = delete; + + ~ScopedTaskExecutor(); + + operator TaskExecutor*() const { + return _executor.get(); + } + + const std::shared_ptr<TaskExecutor>& operator*() const { + return _executor; + } + + TaskExecutor* operator->() const { + return _executor.get(); + } + +private: + class Impl; + + std::shared_ptr<TaskExecutor> _executor; +}; + +MONGO_FAIL_POINT_DECLARE(ScopedTaskExecutorHangBeforeSchedule); +MONGO_FAIL_POINT_DECLARE(ScopedTaskExecutorHangExitBeforeSchedule); +MONGO_FAIL_POINT_DECLARE(ScopedTaskExecutorHangAfterSchedule); + +} // namespace executor +} // namespace mongo diff --git a/src/mongo/executor/scoped_task_executor_test.cpp b/src/mongo/executor/scoped_task_executor_test.cpp new file mode 100644 index 00000000000..5ce217ecddf --- /dev/null +++ b/src/mongo/executor/scoped_task_executor_test.cpp @@ -0,0 +1,320 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/executor/scoped_task_executor.h" + +#include "mongo/executor/network_interface_mock.h" +#include "mongo/executor/thread_pool_mock.h" +#include "mongo/executor/thread_pool_task_executor.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/future.h" + +namespace mongo { +namespace executor { +namespace { + +class ScopedTaskExecutorTest : public unittest::Test { +public: + void setUp() override { + auto net = std::make_unique<NetworkInterfaceMock>(); + _net = net.get(); + _tpte.emplace(std::make_unique<ThreadPoolMock>(_net, 1, ThreadPoolMock::Options{}), + std::move(net)); + _tpte->startup(); + _executor.emplace(_tpte.get_ptr()); + } + + void tearDown() override { + _net->exitNetwork(); + + if (_executor) { + (*_executor)->shutdown(); + (*_executor)->join(); + _executor.reset(); + } + _tpte.reset(); + } + + static inline thread_local bool isInline = false; + + void scheduleWork(Promise<void>& promise) { + isInline = true; + ASSERT(getExecutor() + ->scheduleWork([&](const TaskExecutor::CallbackArgs& ca) { + ASSERT_FALSE(isInline); + if (ca.status.isOK()) { + promise.emplaceValue(); + } else { + promise.setError(ca.status); + } + }) + .getStatus() + .isOK()); + isInline = false; + } + + void scheduleRemoteCommand(Promise<void>& promise) { + RemoteCommandRequest rcr(HostAndPort("localhost"), "test", BSONObj(), nullptr); + + isInline = true; + ASSERT(getExecutor() + ->scheduleRemoteCommand(rcr, + [&](const TaskExecutor::RemoteCommandCallbackArgs& ca) { + ASSERT_FALSE(isInline); + if (ca.response.status.isOK()) { + promise.emplaceValue(); + } else { + promise.setError(ca.response.status); + } + }) + .getStatus() + .isOK()); + isInline = false; + } + + void resetExecutor() { + _executor.reset(); + } + + void shutdownUnderlying() { + _tpte->shutdown(); + } + + ScopedTaskExecutor& getExecutor() { + return *_executor; + } + + NetworkInterfaceMock* getNet() { + return _net; + } + +private: + NetworkInterfaceMock* _net; + boost::optional<ThreadPoolTaskExecutor> _tpte; + boost::optional<ScopedTaskExecutor> _executor; +}; + +TEST_F(ScopedTaskExecutorTest, onEvent) { + auto pf = makePromiseFuture<void>(); + + auto event = uassertStatusOK(getExecutor()->makeEvent()); + + ASSERT(getExecutor() + ->onEvent(event, + [&](const TaskExecutor::CallbackArgs& ca) { + if (ca.status.isOK()) { + pf.promise.emplaceValue(); + } else { + pf.promise.setError(ca.status); + } + }) + .getStatus() + .isOK()); + + ASSERT_FALSE(pf.future.isReady()); + + getExecutor()->signalEvent(event); + + ASSERT_OK(pf.future.getNoThrow()); +} + +TEST_F(ScopedTaskExecutorTest, scheduleWork) { + auto pf = makePromiseFuture<void>(); + + ASSERT(getExecutor() + ->scheduleWork([&](const TaskExecutor::CallbackArgs& ca) { + if (ca.status.isOK()) { + pf.promise.emplaceValue(); + } else { + pf.promise.setError(ca.status); + } + }) + .getStatus() + .isOK()); + + ASSERT_OK(pf.future.getNoThrow()); +} + +TEST_F(ScopedTaskExecutorTest, scheduleWorkAt) { + auto pf = makePromiseFuture<void>(); + + ASSERT(getExecutor() + ->scheduleWorkAt(getExecutor()->now(), + [&](const TaskExecutor::CallbackArgs& ca) { + if (ca.status.isOK()) { + pf.promise.emplaceValue(); + } else { + pf.promise.setError(ca.status); + } + }) + .getStatus() + .isOK()); + + ASSERT_OK(pf.future.getNoThrow()); +} + +TEST_F(ScopedTaskExecutorTest, scheduleRemoteCommand) { + auto pf = makePromiseFuture<void>(); + + scheduleRemoteCommand(pf.promise); + { + NetworkInterfaceMock::InNetworkGuard ing(getNet()); + ASSERT(getNet()->hasReadyRequests()); + getNet()->scheduleSuccessfulResponse(BSONObj()); + getNet()->runReadyNetworkOperations(); + } + + ASSERT_OK(pf.future.getNoThrow()); +} + +// Fully run the callback before capturing the callback handle +TEST_F(ScopedTaskExecutorTest, scheduleLoseRaceWithSuccess) { + auto resultPf = makePromiseFuture<void>(); + auto schedulePf = makePromiseFuture<void>(); + + auto& fp = ScopedTaskExecutorHangAfterSchedule; + fp.setMode(FailPoint::alwaysOn); + + stdx::thread scheduler([&] { + scheduleWork(resultPf.promise); + schedulePf.promise.emplaceValue(); + }); + + ASSERT_OK(resultPf.future.getNoThrow()); + ASSERT_FALSE(schedulePf.future.isReady()); + + fp.setMode(FailPoint::off); + ASSERT_OK(schedulePf.future.getNoThrow()); + + scheduler.join(); +} + +// Stash the handle before running the callback +TEST_F(ScopedTaskExecutorTest, scheduleWinRaceWithSuccess) { + auto resultPf = makePromiseFuture<void>(); + auto schedulePf = makePromiseFuture<void>(); + + getNet()->enterNetwork(); + + stdx::thread scheduler([&] { + scheduleWork(resultPf.promise); + schedulePf.promise.emplaceValue(); + }); + + ASSERT_OK(schedulePf.future.getNoThrow()); + + ASSERT_FALSE(resultPf.future.isReady()); + + getNet()->exitNetwork(); + + ASSERT_OK(resultPf.future.getNoThrow()); + + scheduler.join(); +} + +// Schedule on the underlying, but are shut down when we execute our wrapping callback +TEST_F(ScopedTaskExecutorTest, scheduleLoseRaceWithShutdown) { + auto resultPf = makePromiseFuture<void>(); + auto schedulePf = makePromiseFuture<void>(); + + getNet()->enterNetwork(); + + scheduleWork(resultPf.promise); + + ASSERT_FALSE(resultPf.future.isReady()); + + getExecutor()->shutdown(); + getNet()->exitNetwork(); + + ASSERT_EQUALS(resultPf.future.getNoThrow(), ErrorCodes::ShutdownInProgress); +} + +// ScheduleRemoteCommand on the underlying, but are shut down when we execute our wrapping callback +TEST_F(ScopedTaskExecutorTest, scheduleRemoteCommandLoseRaceWithShutdown) { + auto pf = makePromiseFuture<void>(); + + scheduleRemoteCommand(pf.promise); + { + NetworkInterfaceMock::InNetworkGuard ing(getNet()); + ASSERT(getNet()->hasReadyRequests()); + getNet()->scheduleSuccessfulResponse(BSONObj()); + getExecutor()->shutdown(); + getNet()->runReadyNetworkOperations(); + } + + ASSERT_EQUALS(pf.future.getNoThrow(), ErrorCodes::ShutdownInProgress); +} + +// Fail to schedule on the underlying +TEST_F(ScopedTaskExecutorTest, scheduleLoseRaceWithShutdownOfUnderlying) { + auto& bfp = ScopedTaskExecutorHangBeforeSchedule; + auto& efp = ScopedTaskExecutorHangExitBeforeSchedule; + bfp.setMode(FailPoint::alwaysOn); + efp.setMode(FailPoint::alwaysOn); + + stdx::thread scheduler([&] { + ASSERT_FALSE( + getExecutor() + ->scheduleWork([&](const TaskExecutor::CallbackArgs& ca) { MONGO_UNREACHABLE; }) + .getStatus() + .isOK()); + }); + + MONGO_FAIL_POINT_PAUSE_WHILE_SET((bfp)); + + shutdownUnderlying(); + + efp.setMode(FailPoint::off); + + scheduler.join(); +} + +TEST_F(ScopedTaskExecutorTest, DestructionShutsDown) { + auto pf = makePromiseFuture<void>(); + + { + NetworkInterfaceMock::InNetworkGuard ing(getNet()); + + scheduleWork(pf.promise); + + ASSERT_FALSE(pf.future.isReady()); + + resetExecutor(); + + ASSERT_FALSE(pf.future.isReady()); + } + + ASSERT_EQUALS(pf.future.getNoThrow(), ErrorCodes::ShutdownInProgress); +} + +} // namespace +} // namespace executor +} // namespace mongo |