summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Schwerin <schwerin@mongodb.com>2016-04-13 18:38:40 -0400
committerAndy Schwerin <schwerin@mongodb.com>2016-06-09 17:09:59 -0400
commita0204821bb5993fc7b6f744d498244cf2f2ca757 (patch)
treeccee6e756d97f048535c48aef91283697995d827
parent516d1d68896b6cc227d427d27140c2a23f7123b4 (diff)
downloadmongo-a0204821bb5993fc7b6f744d498244cf2f2ca757.tar.gz
SERVER-23686 Handle race condition in ThreadPoolTaskExecutor shutdown logic.
Some implementations of NetworkInterface, particularly NetworkInterfaceMock, cannot be shut down before the ThreadPool used by the ThreadPoolTaskExecutor, leaving a race between shutting down the pool and scheduling the final completed network operations into the pool. The workaround is to let the ThreadPoolTaskExecutor's join() method execute any operation callbacks that get left behind in this scenario. The safety argument is described in a comment in the code.
-rw-r--r--src/mongo/executor/SConscript2
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp30
-rw-r--r--src/mongo/executor/thread_pool_task_executor_test.cpp42
-rw-r--r--src/mongo/util/fail_point.h8
4 files changed, 76 insertions, 6 deletions
diff --git a/src/mongo/executor/SConscript b/src/mongo/executor/SConscript
index c9786c22558..c8ff0e69fd0 100644
--- a/src/mongo/executor/SConscript
+++ b/src/mongo/executor/SConscript
@@ -255,6 +255,7 @@ env.Library(
],
LIBDEPS=[
'task_executor_interface',
+ '$BUILD_DIR/mongo/util/fail_point',
]
)
@@ -299,6 +300,7 @@ env.CppUnitTest(
],
LIBDEPS=[
'thread_pool_task_executor_test_fixture',
+ '$BUILD_DIR/mongo/unittest/concurrency',
]
)
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index ea43b5e09eb..570ac69826e 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -42,11 +42,17 @@
#include "mongo/executor/network_interface.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/util/concurrency/thread_pool_interface.h"
+#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
+#include "mongo/util/time_support.h"
namespace mongo {
namespace executor {
+namespace {
+MONGO_FP_DECLARE(scheduleIntoPoolSpinsUntilThreadPoolShutsDown);
+}
+
class ThreadPoolTaskExecutor::CallbackState : public TaskExecutor::CallbackState {
MONGO_DISALLOW_COPYING(CallbackState);
@@ -147,7 +153,6 @@ void ThreadPoolTaskExecutor::shutdown() {
cbState->canceled.store(1);
}
scheduleIntoPool_inlock(&pending, std::move(lk));
- _net->signalWorkAvailable();
_pool->shutdown();
}
@@ -167,7 +172,16 @@ void ThreadPoolTaskExecutor::join() {
_net->shutdown();
stdx::unique_lock<stdx::mutex> lk(_mutex);
- invariant(_poolInProgressQueue.empty());
+ // The _poolInProgressQueue may not be empty if the network interface attempted to schedule work
+ // into _pool after _pool->shutdown(). Because _pool->join() has returned, we know that any
+ // items left in _poolInProgressQueue will never be processed by another thread, so we process
+ // them now.
+ while (!_poolInProgressQueue.empty()) {
+ auto cbState = _poolInProgressQueue.front();
+ lk.unlock();
+ runCallback(std::move(cbState));
+ lk.lock();
+ }
invariant(_networkInProgressQueue.empty());
invariant(_sleepersQueue.empty());
invariant(_unsignaledEvents.empty());
@@ -438,8 +452,18 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
lk.unlock();
+ if (MONGO_FAIL_POINT(scheduleIntoPoolSpinsUntilThreadPoolShutsDown)) {
+ scheduleIntoPoolSpinsUntilThreadPoolShutsDown.setMode(FailPoint::off);
+ while (_pool->schedule([] {}) != ErrorCodes::ShutdownInProgress) {
+ sleepmillis(100);
+ }
+ }
+
for (const auto& cbState : todo) {
- fassert(28735, _pool->schedule([this, cbState] { runCallback(std::move(cbState)); }));
+ const auto status = _pool->schedule([this, cbState] { runCallback(std::move(cbState)); });
+ if (status == ErrorCodes::ShutdownInProgress)
+ break;
+ fassert(28735, status);
}
_net->signalWorkAvailable();
}
diff --git a/src/mongo/executor/thread_pool_task_executor_test.cpp b/src/mongo/executor/thread_pool_task_executor_test.cpp
index af1f4fb53b5..b2d43e669a0 100644
--- a/src/mongo/executor/thread_pool_task_executor_test.cpp
+++ b/src/mongo/executor/thread_pool_task_executor_test.cpp
@@ -39,7 +39,9 @@
#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/stdx/memory.h"
+#include "mongo/unittest/barrier.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/fail_point_service.h"
namespace mongo {
namespace executor {
@@ -78,6 +80,46 @@ TEST_F(ThreadPoolExecutorTest, TimelyCancelationOfScheduleWorkAt) {
joinExecutorThread();
}
+TEST_F(ThreadPoolExecutorTest, ShutdownAndScheduleRaceDoesNotCrash) {
+ // This is a regression test for SERVER-23686. It works by scheduling a work item in the
+ // ThreadPoolTaskExecutor that blocks waiting to be signaled by this thread. Once that work item
+ // is scheduled, this thread enables a FailPoint that causes future calls of
+ // ThreadPoolTaskExecutor::scheduleIntoPool_inlock to spin until its underlying thread pool
+ // shuts down, forcing the race condition described in the aforementioned server ticket. The
+ // failpoint ensures that this thread spins until the task executor thread begins spinning on
+ // the underlying pool shutting down, then this thread tells the task executor to shut
+ // down. Once the pool shuts down, the previously blocked scheduleIntoPool_inlock unblocks, and
+ // discovers the pool to be shut down. The correct behavior is for all scheduled callbacks to
+ // execute, and for this last callback at least to execute with CallbackCanceled as its status.
+ // Before the fix for SERVER-23686, this test causes an fassert failure.
+
+ unittest::Barrier barrier{2};
+ auto status1 = getDetectableErrorStatus();
+ StatusWith<TaskExecutor::CallbackHandle> cb2 = getDetectableErrorStatus();
+ auto status2 = getDetectableErrorStatus();
+ auto& executor = getExecutor();
+ launchExecutorThread();
+
+ ASSERT_OK(executor.scheduleWork([&](const TaskExecutor::CallbackArgs& cbData) {
+ status1 = cbData.status;
+ if (!status1.isOK())
+ return;
+ barrier.countDownAndWait();
+ cb2 = cbData.executor->scheduleWork(
+ [&status2](const TaskExecutor::CallbackArgs& cbData) { status2 = cbData.status; });
+ }).getStatus());
+
+ auto fpTPTE1 =
+ getGlobalFailPointRegistry()->getFailPoint("scheduleIntoPoolSpinsUntilThreadPoolShutsDown");
+ fpTPTE1->setMode(FailPoint::alwaysOn);
+ barrier.countDownAndWait();
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET((*fpTPTE1));
+ executor.shutdown();
+ joinExecutorThread();
+ ASSERT_OK(status1);
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status2);
+}
+
} // namespace
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/util/fail_point.h b/src/mongo/util/fail_point.h
index 6896796a031..faf0818151c 100644
--- a/src/mongo/util/fail_point.h
+++ b/src/mongo/util/fail_point.h
@@ -225,9 +225,11 @@ private:
#define MONGO_FAIL_POINT(symbol) MONGO_unlikely(symbol.shouldFail())
#define MONGO_FAIL_POINT_PAUSE_WHILE_SET(symbol) \
- while (MONGO_FAIL_POINT(symbol)) { \
- sleepmillis(100); \
- }
+ do { \
+ while (MONGO_FAIL_POINT(symbol)) { \
+ sleepmillis(100); \
+ } \
+ } while (false)
/**
* Macro for creating a fail point with block context. Also use this when