summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2019-04-23 14:52:55 -0400
committerJason Carey <jcarey@argv.me>2019-04-24 17:07:43 -0400
commit406e1642a83d8b810bec41ad293c1b1c8a59a6dd (patch)
treec2b2003dba64e627cc5b6b06ea0183d7aaeebcdf /src/mongo/executor
parente3e31d97772cca5d23a23180ceb9161fbf4e7736 (diff)
downloadmongo-406e1642a83d8b810bec41ad293c1b1c8a59a6dd.tar.gz
SERVER-40795 Always execute TPTE cbs out of line
Rather than relying on failure to schedule work onto the background thread pool as the mechanism for failing scheduled (but not run callbacks), wait to join the executor until after all jobs have cleared through. I.e. join the thread pool in task executor join(), after all pool processing jobs are gone, instead of in shutdown()
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/task_executor_test_common.cpp10
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp46
-rw-r--r--src/mongo/executor/thread_pool_task_executor_test.cpp37
3 files changed, 46 insertions, 47 deletions
diff --git a/src/mongo/executor/task_executor_test_common.cpp b/src/mongo/executor/task_executor_test_common.cpp
index 3d2324e849e..f98e71e5736 100644
--- a/src/mongo/executor/task_executor_test_common.cpp
+++ b/src/mongo/executor/task_executor_test_common.cpp
@@ -169,16 +169,6 @@ COMMON_EXECUTOR_TEST(RunOne) {
ASSERT_OK(status);
}
-COMMON_EXECUTOR_TEST(Schedule1ButShutdown) {
- TaskExecutor& executor = getExecutor();
- Status status = getDetectableErrorStatus();
- ASSERT_OK(executor.scheduleWork(makeSetStatusAndShutdownClosure(&status)).getStatus());
- executor.shutdown();
- launchExecutorThread();
- joinExecutorThread();
- ASSERT_EQUALS(status, ErrorCodes::CallbackCanceled);
-}
-
COMMON_EXECUTOR_TEST(Schedule2Cancel1) {
TaskExecutor& executor = getExecutor();
Status status1 = getDetectableErrorStatus();
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index e067fc43203..022d6f6dcce 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -56,7 +56,7 @@ MONGO_FAIL_POINT_DEFINE(initialSyncFuzzerSynchronizationPoint1);
MONGO_FAIL_POINT_DEFINE(initialSyncFuzzerSynchronizationPoint2);
namespace {
-MONGO_FAIL_POINT_DEFINE(scheduleIntoPoolSpinsUntilThreadPoolShutsDown);
+MONGO_FAIL_POINT_DEFINE(scheduleIntoPoolSpinsUntilThreadPoolTaskExecutorShutsDown);
}
class ThreadPoolTaskExecutor::CallbackState : public TaskExecutor::CallbackState {
@@ -147,9 +147,6 @@ ThreadPoolTaskExecutor::~ThreadPoolTaskExecutor() {
void ThreadPoolTaskExecutor::startup() {
_net->startup();
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_inShutdown_inlock()) {
- return;
- }
invariant(_state == preStart);
_setState_inlock(running);
_pool->startup();
@@ -176,7 +173,6 @@ void ThreadPoolTaskExecutor::shutdown() {
cbState->canceled.store(1);
}
scheduleIntoPool_inlock(&pending, std::move(lk));
- _pool->shutdown();
}
void ThreadPoolTaskExecutor::join() {
@@ -185,6 +181,18 @@ void ThreadPoolTaskExecutor::join() {
stdx::unique_lock<stdx::mutex> ThreadPoolTaskExecutor::_join(stdx::unique_lock<stdx::mutex> lk) {
_stateChange.wait(lk, [this] {
+ // All tasks are spliced into the _poolInProgressQueue immediately after we accept them.
+ // This occurs in scheduleIntoPool_inlock.
+ //
+ // On the other side, all tasks are spliced out of the _poolInProgressQueue in runCallback,
+ // which removes them from this list after executing the users callback.
+ //
+ // This check ensures that all work managed to enter after shutdown successfully flushes
+ // after shutdown
+ if (!_poolInProgressQueue.empty()) {
+ return false;
+ }
+
switch (_state) {
case preStart:
return false;
@@ -199,12 +207,14 @@ stdx::unique_lock<stdx::mutex> ThreadPoolTaskExecutor::_join(stdx::unique_lock<s
}
MONGO_UNREACHABLE;
});
+
if (_state == shutdownComplete) {
return lk;
}
invariant(_state == joinRequired);
_setState_inlock(joining);
lk.unlock();
+ _pool->shutdown();
_pool->join();
lk.lock();
while (!_unsignaledEvents.empty()) {
@@ -217,18 +227,8 @@ stdx::unique_lock<stdx::mutex> ThreadPoolTaskExecutor::_join(stdx::unique_lock<s
}
lk.unlock();
_net->shutdown();
-
lk.lock();
- // The _poolInProgressQueue may not be empty if the network interface attempted to schedule work
- // into _pool after _pool->shutdown(). Because _pool->join() has returned, we know that any
- // items left in _poolInProgressQueue will never be processed by another thread, so we process
- // them now.
- while (!_poolInProgressQueue.empty()) {
- auto cbState = _poolInProgressQueue.front();
- lk.unlock();
- runCallback(std::move(cbState));
- lk.lock();
- }
+ invariant(_poolInProgressQueue.empty());
invariant(_networkInProgressQueue.empty());
invariant(_sleepersQueue.empty());
invariant(_unsignaledEvents.empty());
@@ -595,13 +595,12 @@ void ThreadPoolTaskExecutor::scheduleIntoPool_inlock(WorkQueue* fromQueue,
lk.unlock();
- if (MONGO_FAIL_POINT(scheduleIntoPoolSpinsUntilThreadPoolShutsDown)) {
- scheduleIntoPoolSpinsUntilThreadPoolShutsDown.setMode(FailPoint::off);
+ if (MONGO_FAIL_POINT(scheduleIntoPoolSpinsUntilThreadPoolTaskExecutorShutsDown)) {
+ scheduleIntoPoolSpinsUntilThreadPoolTaskExecutorShutsDown.setMode(FailPoint::off);
- auto checkStatus = [&] { return ExecutorFuture(_pool).then([] {}).getNoThrow(); };
- while (!ErrorCodes::isCancelationError(checkStatus().code())) {
- sleepmillis(100);
- }
+ lk.lock();
+ _stateChange.wait(lk, [&] { return _inShutdown_inlock(); });
+ lk.unlock();
}
for (const auto& cbState : todo) {
@@ -660,6 +659,9 @@ void ThreadPoolTaskExecutor::runCallback(std::shared_ptr<CallbackState> cbStateA
if (cbStateArg->finishedCondition) {
cbStateArg->finishedCondition->notify_all();
}
+ if (_inShutdown_inlock() && _poolInProgressQueue.empty()) {
+ _stateChange.notify_all();
+ }
}
bool ThreadPoolTaskExecutor::_inShutdown_inlock() const {
diff --git a/src/mongo/executor/thread_pool_task_executor_test.cpp b/src/mongo/executor/thread_pool_task_executor_test.cpp
index 3544f72ffe4..ce63f225038 100644
--- a/src/mongo/executor/thread_pool_task_executor_test.cpp
+++ b/src/mongo/executor/thread_pool_task_executor_test.cpp
@@ -119,18 +119,19 @@ TEST_F(ThreadPoolExecutorTest,
ASSERT_TRUE(sharedCallbackStateDestroyed);
}
+thread_local bool amRunningRecursively = false;
+
TEST_F(ThreadPoolExecutorTest, ShutdownAndScheduleRaceDoesNotCrash) {
- // This is a regression test for SERVER-23686. It works by scheduling a work item in the
- // ThreadPoolTaskExecutor that blocks waiting to be signaled by this thread. Once that work item
- // is scheduled, this thread enables a FailPoint that causes future calls of
- // ThreadPoolTaskExecutor::scheduleIntoPool_inlock to spin until its underlying thread pool
- // shuts down, forcing the race condition described in the aforementioned server ticket. The
- // failpoint ensures that this thread spins until the task executor thread begins spinning on
- // the underlying pool shutting down, then this thread tells the task executor to shut
- // down. Once the pool shuts down, the previously blocked scheduleIntoPool_inlock unblocks, and
- // discovers the pool to be shut down. The correct behavior is for all scheduled callbacks to
- // execute, and for this last callback at least to execute with CallbackCanceled as its status.
- // Before the fix for SERVER-23686, this test causes an fassert failure.
+ // This test works by scheduling a work item in the ThreadPoolTaskExecutor that blocks waiting
+ // to be signaled by this thread. Once that work item is scheduled, this thread enables a
+ // FailPoint that causes future calls of ThreadPoolTaskExecutor::scheduleIntoPool_inlock to spin
+ // until it is shutdown, forcing a race between the caller of schedule and the caller of
+ // shutdown. The failpoint ensures that this thread spins until the task executor thread begins
+ // spinning on the state transitioning to shutting down, then this thread tells the task
+ // executor to shut down. Once the executor shuts down, the previously blocked
+ // scheduleIntoPool_inlock unblocks, and discovers the executor to be shut down. The correct
+ // behavior is for all scheduled callbacks to execute, and for this last callback at least to
+ // execute with CallbackCanceled as its status.
unittest::Barrier barrier{2};
auto status1 = getDetectableErrorStatus();
@@ -145,13 +146,19 @@ TEST_F(ThreadPoolExecutorTest, ShutdownAndScheduleRaceDoesNotCrash) {
if (!status1.isOK())
return;
barrier.countDownAndWait();
- cb2 = cbData.executor->scheduleWork([&status2](
- const TaskExecutor::CallbackArgs& cbData) { status2 = cbData.status; });
+
+ amRunningRecursively = true;
+ cb2 = cbData.executor->scheduleWork(
+ [&status2](const TaskExecutor::CallbackArgs& cbData) {
+ ASSERT_FALSE(amRunningRecursively);
+ status2 = cbData.status;
+ });
+ amRunningRecursively = false;
})
.getStatus());
- auto fpTPTE1 =
- getGlobalFailPointRegistry()->getFailPoint("scheduleIntoPoolSpinsUntilThreadPoolShutsDown");
+ auto fpTPTE1 = getGlobalFailPointRegistry()->getFailPoint(
+ "scheduleIntoPoolSpinsUntilThreadPoolTaskExecutorShutsDown");
fpTPTE1->setMode(FailPoint::alwaysOn);
barrier.countDownAndWait();
MONGO_FAIL_POINT_PAUSE_WHILE_SET((*fpTPTE1));