diff options
author | Andy Schwerin <schwerin@mongodb.com> | 2017-06-09 14:06:26 -0400 |
---|---|---|
committer | Andy Schwerin <schwerin@mongodb.com> | 2017-06-09 14:06:26 -0400 |
commit | f9c36696a25d3837f512421755952736236bbed0 (patch) | |
tree | 9becbcdb1ade14d930d3b2da3c731ebfe20f8a75 /src/mongo/executor | |
parent | db55e668d87d66b171c310241bdbcabfa790e2cf (diff) | |
download | mongo-f9c36696a25d3837f512421755952736236bbed0.tar.gz |
SERVER-29493 Make ThreadPoolTaskExecutor's destructor execute shutdown and join.
Diffstat (limited to 'src/mongo/executor')
-rw-r--r-- | src/mongo/executor/network_interface_mock.cpp | 8 | ||||
-rw-r--r-- | src/mongo/executor/network_interface_mock.h | 5 | ||||
-rw-r--r-- | src/mongo/executor/task_executor.h | 27 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_test_fixture.cpp | 22 | ||||
-rw-r--r-- | src/mongo/executor/task_executor_test_fixture.h | 13 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_mock.cpp | 2 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.cpp | 87 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.h | 28 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor_test.cpp | 7 |
9 files changed, 123 insertions, 76 deletions
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index 81198e74ee2..bf45981490b 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -196,6 +196,10 @@ bool NetworkInterfaceMock::onNetworkThread() { void NetworkInterfaceMock::startup() { stdx::lock_guard<stdx::mutex> lk(_mutex); + _startup_inlock(); +} + +void NetworkInterfaceMock::_startup_inlock() { invariant(!_hasStarted); _hasStarted = true; _inShutdown.store(false); @@ -207,7 +211,9 @@ void NetworkInterfaceMock::shutdown() { invariant(!inShutdown()); stdx::unique_lock<stdx::mutex> lk(_mutex); - invariant(_hasStarted); + if (!_hasStarted) { + _startup_inlock(); + } _inShutdown.store(true); NetworkOperationList todo; todo.splice(todo.end(), _scheduled); diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index 91b04cb01bd..edc40326d26 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -288,6 +288,11 @@ private: enum ThreadType { kNoThread = 0, kExecutorThread = 1, kNetworkThread = 2 }; /** + * Implementation of startup behavior. + */ + void _startup_inlock(); + + /** * Returns information about the state of this mock for diagnostic purposes. */ std::string _getDiagnosticString_inlock() const; diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index 0c7a676a358..22146268788 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -52,7 +52,7 @@ namespace executor { struct ConnectionPoolStats; /** - * Generic event loop with notions of events and callbacks. + * Executor with notions of events and callbacks. * * Callbacks represent work to be performed by the executor. * They may be scheduled by client threads or by other callbacks. Methods that @@ -68,9 +68,6 @@ struct ConnectionPoolStats; * * If an event is unsignaled when shutdown is called, the executor will ensure that any threads * blocked in waitForEvent() eventually return. - * - * Logically, Callbacks and Events exist for the life of the executor. That means that while - * the executor is in scope, no CallbackHandle or EventHandle is stale. */ class TaskExecutor { MONGO_DISALLOW_COPYING(TaskExecutor); @@ -105,6 +102,10 @@ public: */ using RemoteCommandCallbackFn = stdx::function<void(const RemoteCommandCallbackArgs&)>; + /** + * Destroys the task executor. Implicitly performs the equivalent of shutdown() and join() + * before returning, if necessary. + */ virtual ~TaskExecutor(); /** @@ -115,21 +116,21 @@ public: virtual void startup() = 0; /** - * Signals to the executor that it should shut down. This method should not block. After - * calling shutdown, it is illegal to schedule more tasks on the executor and join should be - * called to wait for shutdown to complete. + * Signals to the executor that it should shut down. This method may be called from within a + * callback. As such, this method must not block. After shutdown returns, attempts to schedule + * more tasks on the executor will return errors. * - * It is legal to call this method multiple times, but it should only be called after startup - * has been called. + * It is legal to call this method multiple times. If the task executor goes out of scope + * before this method is called, the destructor performs this activity. */ virtual void shutdown() = 0; /** - * Waits for the shutdown sequence initiated by an earlier call to shutdown to complete. It is - * only legal to call this method if startup has been called earlier. + * Waits for the shutdown sequence initiated by a call to shutdown() to complete. Must not be + * called from within a callback. * - * If startup is ever called, the code must ensure that join is eventually called once and only - * once. + * Unlike stdx::thread::join, this method may be called from any thread that wishes to wait for + * shutdown to complete. */ virtual void join() = 0; diff --git a/src/mongo/executor/task_executor_test_fixture.cpp b/src/mongo/executor/task_executor_test_fixture.cpp index a97c3559e7e..f58657bacbb 100644 --- a/src/mongo/executor/task_executor_test_fixture.cpp +++ b/src/mongo/executor/task_executor_test_fixture.cpp @@ -63,42 +63,20 @@ void TaskExecutorTest::setUp() { auto net = stdx::make_unique<NetworkInterfaceMock>(); _net = net.get(); _executor = makeTaskExecutor(std::move(net)); - _executorState = LifecycleState::kPreStart; -} - -void TaskExecutorTest::tearDown() { - if (_executorState == LifecycleState::kRunning) { - shutdownExecutorThread(); - } - if (_executorState == LifecycleState::kJoinRequired) { - joinExecutorThread(); - } - invariant(_executorState == LifecycleState::kPreStart || - _executorState == LifecycleState::kShutdownComplete); - _executor.reset(); } void TaskExecutorTest::launchExecutorThread() { - invariant(_executorState == LifecycleState::kPreStart); _executor->startup(); - _executorState = LifecycleState::kRunning; postExecutorThreadLaunch(); } void TaskExecutorTest::shutdownExecutorThread() { - invariant(_executorState == LifecycleState::kRunning); _executor->shutdown(); - _executorState = LifecycleState::kJoinRequired; } void TaskExecutorTest::joinExecutorThread() { - // Tests may call shutdown() directly, bypassing the state change in shutdownExecutorThread(). - invariant(_executorState == LifecycleState::kRunning || - _executorState == LifecycleState::kJoinRequired); _net->exitNetwork(); - _executorState = LifecycleState::kJoining; _executor->join(); - _executorState = LifecycleState::kShutdownComplete; } void TaskExecutorTest::_doTest() { diff --git a/src/mongo/executor/task_executor_test_fixture.h b/src/mongo/executor/task_executor_test_fixture.h index e90cbaec413..ff9904125d4 100644 --- a/src/mongo/executor/task_executor_test_fixture.h +++ b/src/mongo/executor/task_executor_test_fixture.h @@ -73,13 +73,6 @@ public: */ void setUp() override; - /** - * Destroys the replication executor. - * - * Shuts down and joins the running executor. - */ - void tearDown() override; - void launchExecutorThread(); void shutdownExecutorThread(); void joinExecutorThread(); @@ -102,12 +95,6 @@ private: NetworkInterfaceMock* _net; std::unique_ptr<TaskExecutor> _executor; - - /** - * kPreStart -> kRunning -> kJoinRequired -> kJoining -> kShutdownComplete - */ - enum LifecycleState { kPreStart, kRunning, kJoinRequired, kJoining, kShutdownComplete }; - LifecycleState _executorState = LifecycleState::kPreStart; }; } // namespace executor diff --git a/src/mongo/executor/thread_pool_mock.cpp b/src/mongo/executor/thread_pool_mock.cpp index b3e1b84a836..e09c28e2d5a 100644 --- a/src/mongo/executor/thread_pool_mock.cpp +++ b/src/mongo/executor/thread_pool_mock.cpp @@ -45,6 +45,7 @@ ThreadPoolMock::~ThreadPoolMock() { stdx::unique_lock<stdx::mutex> lk(_mutex); _inShutdown = true; _net->signalWorkAvailable(); + _net->exitNetwork(); if (_started) { if (_worker.joinable()) { lk.unlock(); @@ -82,6 +83,7 @@ void ThreadPoolMock::join() { if (_started) { stdx::thread toJoin = std::move(_worker); _net->signalWorkAvailable(); + _net->exitNetwork(); lk.unlock(); toJoin.join(); lk.lock(); diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 7d111ae6f95..dbcf465f8d8 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -127,20 +127,31 @@ ThreadPoolTaskExecutor::ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterfa std::unique_ptr<NetworkInterface> net) : _net(std::move(net)), _pool(std::move(pool)) {} -ThreadPoolTaskExecutor::~ThreadPoolTaskExecutor() {} +ThreadPoolTaskExecutor::~ThreadPoolTaskExecutor() { + shutdown(); + auto lk = _join(stdx::unique_lock<stdx::mutex>(_mutex)); + invariant(_state == shutdownComplete); +} void ThreadPoolTaskExecutor::startup() { _net->startup(); stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_inShutdown) { + if (_inShutdown_inlock()) { return; } + invariant(_state == preStart); + _setState_inlock(running); _pool->startup(); } void ThreadPoolTaskExecutor::shutdown() { stdx::unique_lock<stdx::mutex> lk(_mutex); - _inShutdown = true; + if (_inShutdown_inlock()) { + invariant(_networkInProgressQueue.empty()); + invariant(_sleepersQueue.empty()); + return; + } + _setState_inlock(joinRequired); WorkQueue pending; pending.splice(pending.end(), _networkInProgressQueue); pending.splice(pending.end(), _sleepersQueue); @@ -158,21 +169,45 @@ void ThreadPoolTaskExecutor::shutdown() { } void ThreadPoolTaskExecutor::join() { - _pool->join(); - { - stdx::unique_lock<stdx::mutex> lk(_mutex); - while (!_unsignaledEvents.empty()) { - auto eventState = _unsignaledEvents.front(); - invariant(eventState->waiters.empty()); - EventHandle event; - setEventForHandle(&event, std::move(eventState)); - signalEvent_inlock(event, std::move(lk)); - lk = stdx::unique_lock<stdx::mutex>(_mutex); + _join(stdx::unique_lock<stdx::mutex>(_mutex)); +} + +stdx::unique_lock<stdx::mutex> ThreadPoolTaskExecutor::_join(stdx::unique_lock<stdx::mutex> lk) { + _stateChange.wait(lk, [this] { + switch (_state) { + case preStart: + return false; + case running: + return false; + case joinRequired: + return true; + case joining: + return false; + case shutdownComplete: + return true; } + MONGO_UNREACHABLE; + }); + if (_state == shutdownComplete) { + return lk; } + invariant(_state == joinRequired); + _setState_inlock(joining); + lk.unlock(); + _pool->join(); + lk.lock(); + while (!_unsignaledEvents.empty()) { + auto eventState = _unsignaledEvents.front(); + invariant(eventState->waiters.empty()); + EventHandle event; + setEventForHandle(&event, std::move(eventState)); + signalEvent_inlock(event, std::move(lk)); + lk = stdx::unique_lock<stdx::mutex>(_mutex); + } + lk.unlock(); _net->shutdown(); - stdx::unique_lock<stdx::mutex> lk(_mutex); + lk.lock(); // The _poolInProgressQueue may not be empty if the network interface attempted to schedule work // into _pool after _pool->shutdown(). Because _pool->join() has returned, we know that any // items left in _poolInProgressQueue will never be processed by another thread, so we process @@ -186,6 +221,8 @@ void ThreadPoolTaskExecutor::join() { invariant(_networkInProgressQueue.empty()); invariant(_sleepersQueue.empty()); invariant(_unsignaledEvents.empty()); + _setState_inlock(shutdownComplete); + return lk; } void ThreadPoolTaskExecutor::appendDiagnosticBSON(BSONObjBuilder* b) const { @@ -204,7 +241,7 @@ void ThreadPoolTaskExecutor::appendDiagnosticBSON(BSONObjBuilder* b) const { queues.done(); b->appendIntOrLL("unsignaledEvents", _unsignaledEvents.size()); - b->append("shuttingDown", _inShutdown); + b->append("shuttingDown", _inShutdown_inlock()); b->append("networkInterface", _net->getDiagnosticString()); } @@ -217,7 +254,7 @@ StatusWith<TaskExecutor::EventHandle> ThreadPoolTaskExecutor::makeEvent() { EventHandle event; setEventForHandle(&event, el.front()); stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_inShutdown) { + if (_inShutdown_inlock()) { return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; } _unsignaledEvents.splice(_unsignaledEvents.end(), el); @@ -355,7 +392,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC remoteCommandFinished(cbData, cb, scheduledRequest, response); }; stdx::unique_lock<stdx::mutex> lk(_mutex); - if (_inShutdown) { + if (_inShutdown_inlock()) { return; } LOG(3) << "Received remote response: " @@ -370,7 +407,7 @@ void ThreadPoolTaskExecutor::cancel(const CallbackHandle& cbHandle) { invariant(cbHandle.isValid()); auto cbState = checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle)); stdx::unique_lock<stdx::mutex> lk(_mutex); - if (_inShutdown) { + if (_inShutdown_inlock()) { return; } cbState->canceled.store(1); @@ -415,7 +452,7 @@ void ThreadPoolTaskExecutor::appendConnectionStats(ConnectionPoolStats* stats) c StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::enqueueCallbackState_inlock( WorkQueue* queue, WorkQueue* wq) { - if (_inShutdown) { + if (_inShutdown_inlock()) { return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; } invariant(!wq->empty()); @@ -515,6 +552,18 @@ void ThreadPoolTaskExecutor::runCallback(std::shared_ptr<CallbackState> cbStateA } } +bool ThreadPoolTaskExecutor::_inShutdown_inlock() const { + return _state >= joinRequired; +} + +void ThreadPoolTaskExecutor::_setState_inlock(State newState) { + if (newState == _state) { + return; + } + _state = newState; + _stateChange.notify_all(); +} + void ThreadPoolTaskExecutor::dropConnections(const HostAndPort& hostAndPort) { _net->dropConnections(hostAndPort); } diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h index 9d6a6d940b9..4a02e3bb73a 100644 --- a/src/mongo/executor/thread_pool_task_executor.h +++ b/src/mongo/executor/thread_pool_task_executor.h @@ -95,6 +95,25 @@ private: using EventList = stdx::list<std::shared_ptr<EventState>>; /** + * Representation of the stage of life of a thread pool. + * + * A pool starts out in the preStart state, and ends life in the shutdownComplete state. Work + * may only be scheduled in the preStart and running states. Threads may only be started in the + * running state. In shutdownComplete, there are no remaining threads or pending tasks to + * execute. + * + * Diagram of legal transitions: + * + * preStart -> running -> joinRequired -> joining -> shutdownComplete + * \ ^ + * \_____________/ + * + * NOTE: The enumeration values below are compared using operator<, etc, with the expectation + * that a -> b in the diagram above implies that a < b in the enum below. + */ + enum State { preStart, running, joinRequired, joining, shutdownComplete }; + + /** * Returns an EventList containing one unsignaled EventState. This is a helper function for * performing allocations outside of _mutex, and should only be called by makeSingletonWork and * makeEvent(). @@ -147,6 +166,10 @@ private: */ void runCallback(std::shared_ptr<CallbackState> cbState); + bool _inShutdown_inlock() const; + void _setState_inlock(State newState); + stdx::unique_lock<stdx::mutex> _join(stdx::unique_lock<stdx::mutex> lk); + // The network interface used for remote command execution and waiting. std::unique_ptr<NetworkInterface> _net; @@ -168,8 +191,9 @@ private: // List of all events that have yet to be signaled. EventList _unsignaledEvents; - // Indicates whether or not the executor is shutting down. - bool _inShutdown = false; + // Lifecycle state of this executor. + stdx::condition_variable _stateChange; + State _state = preStart; }; } // namespace executor diff --git a/src/mongo/executor/thread_pool_task_executor_test.cpp b/src/mongo/executor/thread_pool_task_executor_test.cpp index db0cc594f42..2c4d29d8436 100644 --- a/src/mongo/executor/thread_pool_task_executor_test.cpp +++ b/src/mongo/executor/thread_pool_task_executor_test.cpp @@ -75,8 +75,6 @@ TEST_F(ThreadPoolExecutorTest, TimelyCancelationOfScheduleWorkAt) { executor.wait(cb1); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status1); ASSERT_EQUALS(startTime + Milliseconds(200), net->now()); - executor.shutdown(); - joinExecutorThread(); } bool sharedCallbackStateDestroyed = false; @@ -120,9 +118,6 @@ TEST_F(ThreadPoolExecutorTest, // to ThreadPoolTaskExecutor::CallbackState). ASSERT_TRUE(callbackInvoked); ASSERT_TRUE(sharedCallbackStateDestroyed); - - executor.shutdown(); - joinExecutorThread(); } TEST_F(ThreadPoolExecutorTest, ShutdownAndScheduleRaceDoesNotCrash) { @@ -162,7 +157,7 @@ TEST_F(ThreadPoolExecutorTest, ShutdownAndScheduleRaceDoesNotCrash) { barrier.countDownAndWait(); MONGO_FAIL_POINT_PAUSE_WHILE_SET((*fpTPTE1)); executor.shutdown(); - joinExecutorThread(); + executor.join(); ASSERT_OK(status1); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status2); } |