diff options
author | Andy Schwerin <schwerin@mongodb.com> | 2016-04-13 18:38:40 -0400 |
---|---|---|
committer | Andy Schwerin <schwerin@mongodb.com> | 2016-06-09 17:09:59 -0400 |
commit | a0204821bb5993fc7b6f744d498244cf2f2ca757 (patch) | |
tree | ccee6e756d97f048535c48aef91283697995d827 | |
parent | 516d1d68896b6cc227d427d27140c2a23f7123b4 (diff) | |
download | mongo-a0204821bb5993fc7b6f744d498244cf2f2ca757.tar.gz |
SERVER-23686 Handle race condition in ThreadPoolTaskExecutor shutdown logic.
Some implementations of NetworkInterface, particularly NetworkInterfaceMock,
cannot be shut down before the ThreadPool used by the ThreadPoolTaskExecutor,
leaving a race between shutting down the pool and scheduling the final completed
network operations into the pool. The workaround is to let the
ThreadPoolTaskExecutor's join() method execute any operation callbacks that get
left behind in this scenario. The safety argument is described in a comment in
the code.
-rw-r--r-- | src/mongo/executor/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.cpp | 30 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor_test.cpp | 42 | ||||
-rw-r--r-- | src/mongo/util/fail_point.h | 8 |
4 files changed, 76 insertions, 6 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript index c9786c22558..c8ff0e69fd0 100644 --- a/src/mongo/executor/SConscript +++ b/src/mongo/executor/SConscript @@ -255,6 +255,7 @@ env.Library( ], LIBDEPS=[ 'task_executor_interface', + '$BUILD_DIR/mongo/util/fail_point', ] ) @@ -299,6 +300,7 @@ env.CppUnitTest( ], LIBDEPS=[ 'thread_pool_task_executor_test_fixture', + '$BUILD_DIR/mongo/unittest/concurrency', ] ) diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index ea43b5e09eb..570ac69826e 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -42,11 +42,17 @@ #include "mongo/executor/network_interface.h" #include "mongo/platform/atomic_word.h" #include "mongo/util/concurrency/thread_pool_interface.h" +#include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" +#include "mongo/util/time_support.h" namespace mongo { namespace executor { +namespace { +MONGO_FP_DECLARE(scheduleIntoPoolSpinsUntilThreadPoolShutsDown); +} + class ThreadPoolTaskExecutor::CallbackState : public TaskExecutor::CallbackState { MONGO_DISALLOW_COPYING(CallbackState); @@ -147,7 +153,6 @@ void ThreadPoolTaskExecutor::shutdown() { cbState->canceled.store(1); } scheduleIntoPool_inlock(&pending, std::move(lk)); - _net->signalWorkAvailable(); _pool->shutdown(); } @@ -167,7 +172,16 @@ void ThreadPoolTaskExecutor::join() { _net->shutdown(); stdx::unique_lock<stdx::mutex> lk(_mutex); - invariant(_poolInProgressQueue.empty()); + // The _poolInProgressQueue may not be empty if the network interface attempted to schedule work + // into _pool after _pool->shutdown(). Because _pool->join() has returned, we know that any + // items left in _poolInProgressQueue will never be processed by another thread, so we process + // them now. + while (!_poolInProgressQueue.empty()) { + auto cbState = _poolInProgressQueue.front(); + lk.unlock(); + runCallback(std::move(cbState)); + lk.lock(); + } invariant(_networkInProgressQueue.empty()); invariant(_sleepersQueue.empty()); invariant(_unsignaledEvents.empty()); @@ -438,8 +452,18 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue, lk.unlock(); + if (MONGO_FAIL_POINT(scheduleIntoPoolSpinsUntilThreadPoolShutsDown)) { + scheduleIntoPoolSpinsUntilThreadPoolShutsDown.setMode(FailPoint::off); + while (_pool->schedule([] {}) != ErrorCodes::ShutdownInProgress) { + sleepmillis(100); + } + } + for (const auto& cbState : todo) { - fassert(28735, _pool->schedule([this, cbState] { runCallback(std::move(cbState)); })); + const auto status = _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }); + if (status == ErrorCodes::ShutdownInProgress) + break; + fassert(28735, status); } _net->signalWorkAvailable(); } diff --git a/src/mongo/executor/thread_pool_task_executor_test.cpp b/src/mongo/executor/thread_pool_task_executor_test.cpp index af1f4fb53b5..b2d43e669a0 100644 --- a/src/mongo/executor/thread_pool_task_executor_test.cpp +++ b/src/mongo/executor/thread_pool_task_executor_test.cpp @@ -39,7 +39,9 @@ #include "mongo/executor/thread_pool_task_executor.h" #include "mongo/executor/thread_pool_task_executor_test_fixture.h" #include "mongo/stdx/memory.h" +#include "mongo/unittest/barrier.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/fail_point_service.h" namespace mongo { namespace executor { @@ -78,6 +80,46 @@ TEST_F(ThreadPoolExecutorTest, TimelyCancelationOfScheduleWorkAt) { joinExecutorThread(); } +TEST_F(ThreadPoolExecutorTest, ShutdownAndScheduleRaceDoesNotCrash) { + // This is a regression test for SERVER-23686. It works by scheduling a work item in the + // ThreadPoolTaskExecutor that blocks waiting to be signaled by this thread. Once that work item + // is scheduled, this thread enables a FailPoint that causes future calls of + // ThreadPoolTaskExecutor::scheduleIntoPool_inlock to spin until its underlying thread pool + // shuts down, forcing the race condition described in the aforementioned server ticket. The + // failpoint ensures that this thread spins until the task executor thread begins spinning on + // the underlying pool shutting down, then this thread tells the task executor to shut + // down. Once the pool shuts down, the previously blocked scheduleIntoPool_inlock unblocks, and + // discovers the pool to be shut down. The correct behavior is for all scheduled callbacks to + // execute, and for this last callback at least to execute with CallbackCanceled as its status. + // Before the fix for SERVER-23686, this test causes an fassert failure. + + unittest::Barrier barrier{2}; + auto status1 = getDetectableErrorStatus(); + StatusWith<TaskExecutor::CallbackHandle> cb2 = getDetectableErrorStatus(); + auto status2 = getDetectableErrorStatus(); + auto& executor = getExecutor(); + launchExecutorThread(); + + ASSERT_OK(executor.scheduleWork([&](const TaskExecutor::CallbackArgs& cbData) { + status1 = cbData.status; + if (!status1.isOK()) + return; + barrier.countDownAndWait(); + cb2 = cbData.executor->scheduleWork( + [&status2](const TaskExecutor::CallbackArgs& cbData) { status2 = cbData.status; }); + }).getStatus()); + + auto fpTPTE1 = + getGlobalFailPointRegistry()->getFailPoint("scheduleIntoPoolSpinsUntilThreadPoolShutsDown"); + fpTPTE1->setMode(FailPoint::alwaysOn); + barrier.countDownAndWait(); + MONGO_FAIL_POINT_PAUSE_WHILE_SET((*fpTPTE1)); + executor.shutdown(); + joinExecutorThread(); + ASSERT_OK(status1); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status2); +} + } // namespace } // namespace executor } // namespace mongo diff --git a/src/mongo/util/fail_point.h b/src/mongo/util/fail_point.h index 6896796a031..faf0818151c 100644 --- a/src/mongo/util/fail_point.h +++ b/src/mongo/util/fail_point.h @@ -225,9 +225,11 @@ private: #define MONGO_FAIL_POINT(symbol) MONGO_unlikely(symbol.shouldFail()) #define MONGO_FAIL_POINT_PAUSE_WHILE_SET(symbol) \ - while (MONGO_FAIL_POINT(symbol)) { \ - sleepmillis(100); \ - } + do { \ + while (MONGO_FAIL_POINT(symbol)) { \ + sleepmillis(100); \ + } \ + } while (false) /** * Macro for creating a fail point with block context. Also use this when |