diff options
23 files changed, 135 insertions, 137 deletions
diff --git a/src/mongo/client/fetcher_test.cpp b/src/mongo/client/fetcher_test.cpp index 00dae071d3a..4e17d55b346 100644 --- a/src/mongo/client/fetcher_test.cpp +++ b/src/mongo/client/fetcher_test.cpp @@ -117,7 +117,8 @@ void FetcherTest::setUp() { } void FetcherTest::tearDown() { - executor::ThreadPoolExecutorTest::tearDown(); + getExecutor().shutdown(); + getExecutor().join(); // Executor may still invoke fetcher's callback before shutting down. fetcher.reset(); } diff --git a/src/mongo/client/remote_command_retry_scheduler_test.cpp b/src/mongo/client/remote_command_retry_scheduler_test.cpp index 705324f94c5..d040d72ba19 100644 --- a/src/mongo/client/remote_command_retry_scheduler_test.cpp +++ b/src/mongo/client/remote_command_retry_scheduler_test.cpp @@ -63,7 +63,6 @@ public: protected: void setUp() override; - void tearDown() override; }; class CallbackResponseSaver { @@ -152,10 +151,6 @@ void RemoteCommandRetrySchedulerTest::setUp() { launchExecutorThread(); } -void RemoteCommandRetrySchedulerTest::tearDown() { - executor::ThreadPoolExecutorTest::tearDown(); -} - CallbackResponseSaver::CallbackResponseSaver() = default; void CallbackResponseSaver::operator()( diff --git a/src/mongo/db/repl/abstract_async_component_test.cpp b/src/mongo/db/repl/abstract_async_component_test.cpp index 5c69ca67d76..9d82755bafd 100644 --- a/src/mongo/db/repl/abstract_async_component_test.cpp +++ b/src/mongo/db/repl/abstract_async_component_test.cpp @@ -177,8 +177,6 @@ void AbstractAsyncComponentTest::setUp() { void AbstractAsyncComponentTest::tearDown() { shutdownExecutorThread(); joinExecutorThread(); - - executor::ThreadPoolExecutorTest::tearDown(); } TEST_F(AbstractAsyncComponentTest, ConstructorThrowsUserAssertionOnNullTaskExecutor) { diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp index 98c1bd08957..ab818341712 100644 --- a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp +++ b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp @@ -92,10 +92,6 @@ void AbstractOplogFetcherTest::setUp() { lastFetched = {456LL, {{123, 0}, 1}}; } -void AbstractOplogFetcherTest::tearDown() { - executor::ThreadPoolExecutorTest::tearDown(); -} - executor::RemoteCommandRequest AbstractOplogFetcherTest::processNetworkResponse( executor::RemoteCommandResponse response, bool expectReadyRequestsAfterProcessing) { diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h index d33ff6174b1..2e238000e70 100644 --- a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h +++ b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h @@ -82,7 +82,6 @@ public: protected: void setUp() override; - void tearDown() override; /** * Schedules network response and instructs network interface to process response. diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp index 89676a8a7f6..e1a8131fb10 100644 --- a/src/mongo/db/repl/base_cloner_test_fixture.cpp +++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp @@ -111,14 +111,12 @@ void BaseClonerTest::setUp() { } void BaseClonerTest::tearDown() { - executor::ThreadPoolExecutorTest::shutdownExecutorThread(); - executor::ThreadPoolExecutorTest::joinExecutorThread(); + getExecutor().shutdown(); + getExecutor().join(); storageInterface.reset(); dbWorkThreadPool->join(); dbWorkThreadPool.reset(); - - executor::ThreadPoolExecutorTest::tearDown(); } void BaseClonerTest::clear() { diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp index f6a50786e47..36020cea9a8 100644 --- a/src/mongo/db/repl/databases_cloner_test.cpp +++ b/src/mongo/db/repl/databases_cloner_test.cpp @@ -186,12 +186,9 @@ protected: } void tearDown() override { - executor::ThreadPoolExecutorTest::shutdownExecutorThread(); - executor::ThreadPoolExecutorTest::joinExecutorThread(); - + getExecutor().shutdown(); + getExecutor().join(); _dbWorkThreadPool.join(); - - executor::ThreadPoolExecutorTest::tearDown(); } /** diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index c549bfbe391..1a55a5d83dc 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -360,8 +360,8 @@ protected: if (_executorThreadShutdownComplete) { return; } - executor::ThreadPoolExecutorTest::shutdownExecutorThread(); - executor::ThreadPoolExecutorTest::joinExecutorThread(); + getExecutor().shutdown(); + getExecutor().join(); _executorThreadShutdownComplete = true; } @@ -372,9 +372,6 @@ protected: _dbWorkThreadPool.reset(); _replicationProcess.reset(); _storageInterface.reset(); - - // tearDown() destroys the task executor which was referenced by the initial syncer. - executor::ThreadPoolExecutorTest::tearDown(); } /** diff --git a/src/mongo/db/repl/multiapplier_test.cpp b/src/mongo/db/repl/multiapplier_test.cpp index 45f387ecd0c..a031064130b 100644 --- a/src/mongo/db/repl/multiapplier_test.cpp +++ b/src/mongo/db/repl/multiapplier_test.cpp @@ -45,7 +45,6 @@ public: private: executor::ThreadPoolMock::Options makeThreadPoolMockOptions() const override; void setUp() override; - void tearDown() override; }; executor::ThreadPoolMock::Options MultiApplierTest::makeThreadPoolMockOptions() const { @@ -60,15 +59,6 @@ void MultiApplierTest::setUp() { launchExecutorThread(); } -void MultiApplierTest::tearDown() { - executor::ThreadPoolExecutorTest::shutdownExecutorThread(); - executor::ThreadPoolExecutorTest::joinExecutorThread(); - - // Local tear down steps here. - - executor::ThreadPoolExecutorTest::tearDown(); -} - Status applyOperation(MultiApplier::OperationPtrs*) { return Status::OK(); }; diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index f4d91c6baed..01e85bc355e 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -54,7 +54,6 @@ using NetworkGuard = executor::NetworkInterfaceMock::InNetworkGuard; class OplogFetcherTest : public AbstractOplogFetcherTest { protected: void setUp() override; - void tearDown() override; /** * Starts an oplog fetcher. Processes a single batch of results from @@ -117,10 +116,6 @@ void OplogFetcherTest::setUp() { }; } -void OplogFetcherTest::tearDown() { - AbstractOplogFetcherTest::tearDown(); -} - BSONObj OplogFetcherTest::makeOplogQueryMetadataObject(OpTime lastAppliedOpTime, int rbid, int primaryIndex, diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp index 383b2cf8aca..f0261d64f5f 100644 --- a/src/mongo/db/repl/reporter_test.cpp +++ b/src/mongo/db/repl/reporter_test.cpp @@ -182,7 +182,8 @@ void ReporterTest::setUp() { } void ReporterTest::tearDown() { - executor::ThreadPoolExecutorTest::tearDown(); + getExecutor().shutdown(); + getExecutor().join(); // Executor may still invoke reporter's callback before shutting down. reporter.reset(); posUpdater.reset(); diff --git a/src/mongo/db/repl/rollback_common_point_resolver_test.cpp b/src/mongo/db/repl/rollback_common_point_resolver_test.cpp index 4ed949ae1ce..d537e200362 100644 --- a/src/mongo/db/repl/rollback_common_point_resolver_test.cpp +++ b/src/mongo/db/repl/rollback_common_point_resolver_test.cpp @@ -99,9 +99,6 @@ private: class RollbackCommonPointResolverTest : public AbstractOplogFetcherTest { protected: - void setUp() override; - void tearDown() override; - /** * Starts a rollback common point resolver. Processes a single batch of results from * the oplog query and shuts down. @@ -152,16 +149,6 @@ protected: std::unique_ptr<ShutdownState> shutdownState; }; -void RollbackCommonPointResolverTest::setUp() { - AbstractOplogFetcherTest::setUp(); -} - -void RollbackCommonPointResolverTest::tearDown() { - AbstractOplogFetcherTest::tearDown(); - resolver.reset(); - shutdownState.reset(); -} - HostAndPort source("localhost:12345"); NamespaceString nss("local.oplog.rs"); diff --git a/src/mongo/db/repl/rollback_test_fixture.cpp b/src/mongo/db/repl/rollback_test_fixture.cpp index 791cbf0e2e7..cd77a41ba95 100644 --- a/src/mongo/db/repl/rollback_test_fixture.cpp +++ b/src/mongo/db/repl/rollback_test_fixture.cpp @@ -90,7 +90,6 @@ void RollbackTest::setUp() { void RollbackTest::tearDown() { _coordinator = nullptr; _opCtx.reset(); - _threadPoolExecutorTest.tearDown(); // We cannot unset the global replication coordinator because ServiceContextMongoD::tearDown() // calls dropAllDatabasesExceptLocal() which requires the replication coordinator to clear all diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp index da410b6082a..48bdf70f13c 100644 --- a/src/mongo/db/repl/sync_source_resolver_test.cpp +++ b/src/mongo/db/repl/sync_source_resolver_test.cpp @@ -108,14 +108,12 @@ void SyncSourceResolverTest::setUp() { } void SyncSourceResolverTest::tearDown() { - executor::ThreadPoolExecutorTest::shutdownExecutorThread(); - executor::ThreadPoolExecutorTest::joinExecutorThread(); + getExecutor().shutdown(); + getExecutor().join(); _resolver.reset(); _selector.reset(); _executorProxy.reset(); - - executor::ThreadPoolExecutorTest::tearDown(); } std::unique_ptr<SyncSourceResolver> SyncSourceResolverTest::_makeResolver( diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp index 81198e74ee2..bf45981490b 100644 --- a/src/mongo/executor/network_interface_mock.cpp +++ b/src/mongo/executor/network_interface_mock.cpp @@ -196,6 +196,10 @@ bool NetworkInterfaceMock::onNetworkThread() { void NetworkInterfaceMock::startup() { stdx::lock_guard<stdx::mutex> lk(_mutex); + _startup_inlock(); +} + +void NetworkInterfaceMock::_startup_inlock() { invariant(!_hasStarted); _hasStarted = true; _inShutdown.store(false); @@ -207,7 +211,9 @@ void NetworkInterfaceMock::shutdown() { invariant(!inShutdown()); stdx::unique_lock<stdx::mutex> lk(_mutex); - invariant(_hasStarted); + if (!_hasStarted) { + _startup_inlock(); + } _inShutdown.store(true); NetworkOperationList todo; todo.splice(todo.end(), _scheduled); diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h index 91b04cb01bd..edc40326d26 100644 --- a/src/mongo/executor/network_interface_mock.h +++ b/src/mongo/executor/network_interface_mock.h @@ -288,6 +288,11 @@ private: enum ThreadType { kNoThread = 0, kExecutorThread = 1, kNetworkThread = 2 }; /** + * Implementation of startup behavior. + */ + void _startup_inlock(); + + /** * Returns information about the state of this mock for diagnostic purposes. */ std::string _getDiagnosticString_inlock() const; diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index 0c7a676a358..22146268788 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -52,7 +52,7 @@ namespace executor { struct ConnectionPoolStats; /** - * Generic event loop with notions of events and callbacks. + * Executor with notions of events and callbacks. * * Callbacks represent work to be performed by the executor. * They may be scheduled by client threads or by other callbacks. Methods that @@ -68,9 +68,6 @@ struct ConnectionPoolStats; * * If an event is unsignaled when shutdown is called, the executor will ensure that any threads * blocked in waitForEvent() eventually return. - * - * Logically, Callbacks and Events exist for the life of the executor. That means that while - * the executor is in scope, no CallbackHandle or EventHandle is stale. */ class TaskExecutor { MONGO_DISALLOW_COPYING(TaskExecutor); @@ -105,6 +102,10 @@ public: */ using RemoteCommandCallbackFn = stdx::function<void(const RemoteCommandCallbackArgs&)>; + /** + * Destroys the task executor. Implicitly performs the equivalent of shutdown() and join() + * before returning, if necessary. + */ virtual ~TaskExecutor(); /** @@ -115,21 +116,21 @@ public: virtual void startup() = 0; /** - * Signals to the executor that it should shut down. This method should not block. After - * calling shutdown, it is illegal to schedule more tasks on the executor and join should be - * called to wait for shutdown to complete. + * Signals to the executor that it should shut down. This method may be called from within a + * callback. As such, this method must not block. After shutdown returns, attempts to schedule + * more tasks on the executor will return errors. * - * It is legal to call this method multiple times, but it should only be called after startup - * has been called. + * It is legal to call this method multiple times. If the task executor goes out of scope + * before this method is called, the destructor performs this activity. */ virtual void shutdown() = 0; /** - * Waits for the shutdown sequence initiated by an earlier call to shutdown to complete. It is - * only legal to call this method if startup has been called earlier. + * Waits for the shutdown sequence initiated by a call to shutdown() to complete. Must not be + * called from within a callback. * - * If startup is ever called, the code must ensure that join is eventually called once and only - * once. + * Unlike stdx::thread::join, this method may be called from any thread that wishes to wait for + * shutdown to complete. */ virtual void join() = 0; diff --git a/src/mongo/executor/task_executor_test_fixture.cpp b/src/mongo/executor/task_executor_test_fixture.cpp index a97c3559e7e..f58657bacbb 100644 --- a/src/mongo/executor/task_executor_test_fixture.cpp +++ b/src/mongo/executor/task_executor_test_fixture.cpp @@ -63,42 +63,20 @@ void TaskExecutorTest::setUp() { auto net = stdx::make_unique<NetworkInterfaceMock>(); _net = net.get(); _executor = makeTaskExecutor(std::move(net)); - _executorState = LifecycleState::kPreStart; -} - -void TaskExecutorTest::tearDown() { - if (_executorState == LifecycleState::kRunning) { - shutdownExecutorThread(); - } - if (_executorState == LifecycleState::kJoinRequired) { - joinExecutorThread(); - } - invariant(_executorState == LifecycleState::kPreStart || - _executorState == LifecycleState::kShutdownComplete); - _executor.reset(); } void TaskExecutorTest::launchExecutorThread() { - invariant(_executorState == LifecycleState::kPreStart); _executor->startup(); - _executorState = LifecycleState::kRunning; postExecutorThreadLaunch(); } void TaskExecutorTest::shutdownExecutorThread() { - invariant(_executorState == LifecycleState::kRunning); _executor->shutdown(); - _executorState = LifecycleState::kJoinRequired; } void TaskExecutorTest::joinExecutorThread() { - // Tests may call shutdown() directly, bypassing the state change in shutdownExecutorThread(). - invariant(_executorState == LifecycleState::kRunning || - _executorState == LifecycleState::kJoinRequired); _net->exitNetwork(); - _executorState = LifecycleState::kJoining; _executor->join(); - _executorState = LifecycleState::kShutdownComplete; } void TaskExecutorTest::_doTest() { diff --git a/src/mongo/executor/task_executor_test_fixture.h b/src/mongo/executor/task_executor_test_fixture.h index e90cbaec413..ff9904125d4 100644 --- a/src/mongo/executor/task_executor_test_fixture.h +++ b/src/mongo/executor/task_executor_test_fixture.h @@ -73,13 +73,6 @@ public: */ void setUp() override; - /** - * Destroys the replication executor. - * - * Shuts down and joins the running executor. - */ - void tearDown() override; - void launchExecutorThread(); void shutdownExecutorThread(); void joinExecutorThread(); @@ -102,12 +95,6 @@ private: NetworkInterfaceMock* _net; std::unique_ptr<TaskExecutor> _executor; - - /** - * kPreStart -> kRunning -> kJoinRequired -> kJoining -> kShutdownComplete - */ - enum LifecycleState { kPreStart, kRunning, kJoinRequired, kJoining, kShutdownComplete }; - LifecycleState _executorState = LifecycleState::kPreStart; }; } // namespace executor diff --git a/src/mongo/executor/thread_pool_mock.cpp b/src/mongo/executor/thread_pool_mock.cpp index b3e1b84a836..e09c28e2d5a 100644 --- a/src/mongo/executor/thread_pool_mock.cpp +++ b/src/mongo/executor/thread_pool_mock.cpp @@ -45,6 +45,7 @@ ThreadPoolMock::~ThreadPoolMock() { stdx::unique_lock<stdx::mutex> lk(_mutex); _inShutdown = true; _net->signalWorkAvailable(); + _net->exitNetwork(); if (_started) { if (_worker.joinable()) { lk.unlock(); @@ -82,6 +83,7 @@ void ThreadPoolMock::join() { if (_started) { stdx::thread toJoin = std::move(_worker); _net->signalWorkAvailable(); + _net->exitNetwork(); lk.unlock(); toJoin.join(); lk.lock(); diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 7d111ae6f95..dbcf465f8d8 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -127,20 +127,31 @@ ThreadPoolTaskExecutor::ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterfa std::unique_ptr<NetworkInterface> net) : _net(std::move(net)), _pool(std::move(pool)) {} -ThreadPoolTaskExecutor::~ThreadPoolTaskExecutor() {} +ThreadPoolTaskExecutor::~ThreadPoolTaskExecutor() { + shutdown(); + auto lk = _join(stdx::unique_lock<stdx::mutex>(_mutex)); + invariant(_state == shutdownComplete); +} void ThreadPoolTaskExecutor::startup() { _net->startup(); stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_inShutdown) { + if (_inShutdown_inlock()) { return; } + invariant(_state == preStart); + _setState_inlock(running); _pool->startup(); } void ThreadPoolTaskExecutor::shutdown() { stdx::unique_lock<stdx::mutex> lk(_mutex); - _inShutdown = true; + if (_inShutdown_inlock()) { + invariant(_networkInProgressQueue.empty()); + invariant(_sleepersQueue.empty()); + return; + } + _setState_inlock(joinRequired); WorkQueue pending; pending.splice(pending.end(), _networkInProgressQueue); pending.splice(pending.end(), _sleepersQueue); @@ -158,21 +169,45 @@ void ThreadPoolTaskExecutor::shutdown() { } void ThreadPoolTaskExecutor::join() { - _pool->join(); - { - stdx::unique_lock<stdx::mutex> lk(_mutex); - while (!_unsignaledEvents.empty()) { - auto eventState = _unsignaledEvents.front(); - invariant(eventState->waiters.empty()); - EventHandle event; - setEventForHandle(&event, std::move(eventState)); - signalEvent_inlock(event, std::move(lk)); - lk = stdx::unique_lock<stdx::mutex>(_mutex); + _join(stdx::unique_lock<stdx::mutex>(_mutex)); +} + +stdx::unique_lock<stdx::mutex> ThreadPoolTaskExecutor::_join(stdx::unique_lock<stdx::mutex> lk) { + _stateChange.wait(lk, [this] { + switch (_state) { + case preStart: + return false; + case running: + return false; + case joinRequired: + return true; + case joining: + return false; + case shutdownComplete: + return true; } + MONGO_UNREACHABLE; + }); + if (_state == shutdownComplete) { + return lk; } + invariant(_state == joinRequired); + _setState_inlock(joining); + lk.unlock(); + _pool->join(); + lk.lock(); + while (!_unsignaledEvents.empty()) { + auto eventState = _unsignaledEvents.front(); + invariant(eventState->waiters.empty()); + EventHandle event; + setEventForHandle(&event, std::move(eventState)); + signalEvent_inlock(event, std::move(lk)); + lk = stdx::unique_lock<stdx::mutex>(_mutex); + } + lk.unlock(); _net->shutdown(); - stdx::unique_lock<stdx::mutex> lk(_mutex); + lk.lock(); // The _poolInProgressQueue may not be empty if the network interface attempted to schedule work // into _pool after _pool->shutdown(). Because _pool->join() has returned, we know that any // items left in _poolInProgressQueue will never be processed by another thread, so we process @@ -186,6 +221,8 @@ void ThreadPoolTaskExecutor::join() { invariant(_networkInProgressQueue.empty()); invariant(_sleepersQueue.empty()); invariant(_unsignaledEvents.empty()); + _setState_inlock(shutdownComplete); + return lk; } void ThreadPoolTaskExecutor::appendDiagnosticBSON(BSONObjBuilder* b) const { @@ -204,7 +241,7 @@ void ThreadPoolTaskExecutor::appendDiagnosticBSON(BSONObjBuilder* b) const { queues.done(); b->appendIntOrLL("unsignaledEvents", _unsignaledEvents.size()); - b->append("shuttingDown", _inShutdown); + b->append("shuttingDown", _inShutdown_inlock()); b->append("networkInterface", _net->getDiagnosticString()); } @@ -217,7 +254,7 @@ StatusWith<TaskExecutor::EventHandle> ThreadPoolTaskExecutor::makeEvent() { EventHandle event; setEventForHandle(&event, el.front()); stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_inShutdown) { + if (_inShutdown_inlock()) { return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; } _unsignaledEvents.splice(_unsignaledEvents.end(), el); @@ -355,7 +392,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC remoteCommandFinished(cbData, cb, scheduledRequest, response); }; stdx::unique_lock<stdx::mutex> lk(_mutex); - if (_inShutdown) { + if (_inShutdown_inlock()) { return; } LOG(3) << "Received remote response: " @@ -370,7 +407,7 @@ void ThreadPoolTaskExecutor::cancel(const CallbackHandle& cbHandle) { invariant(cbHandle.isValid()); auto cbState = checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle)); stdx::unique_lock<stdx::mutex> lk(_mutex); - if (_inShutdown) { + if (_inShutdown_inlock()) { return; } cbState->canceled.store(1); @@ -415,7 +452,7 @@ void ThreadPoolTaskExecutor::appendConnectionStats(ConnectionPoolStats* stats) c StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::enqueueCallbackState_inlock( WorkQueue* queue, WorkQueue* wq) { - if (_inShutdown) { + if (_inShutdown_inlock()) { return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"}; } invariant(!wq->empty()); @@ -515,6 +552,18 @@ void ThreadPoolTaskExecutor::runCallback(std::shared_ptr<CallbackState> cbStateA } } +bool ThreadPoolTaskExecutor::_inShutdown_inlock() const { + return _state >= joinRequired; +} + +void ThreadPoolTaskExecutor::_setState_inlock(State newState) { + if (newState == _state) { + return; + } + _state = newState; + _stateChange.notify_all(); +} + void ThreadPoolTaskExecutor::dropConnections(const HostAndPort& hostAndPort) { _net->dropConnections(hostAndPort); } diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h index 9d6a6d940b9..4a02e3bb73a 100644 --- a/src/mongo/executor/thread_pool_task_executor.h +++ b/src/mongo/executor/thread_pool_task_executor.h @@ -95,6 +95,25 @@ private: using EventList = stdx::list<std::shared_ptr<EventState>>; /** + * Representation of the stage of life of a thread pool. + * + * A pool starts out in the preStart state, and ends life in the shutdownComplete state. Work + * may only be scheduled in the preStart and running states. Threads may only be started in the + * running state. In shutdownComplete, there are no remaining threads or pending tasks to + * execute. + * + * Diagram of legal transitions: + * + * preStart -> running -> joinRequired -> joining -> shutdownComplete + * \ ^ + * \_____________/ + * + * NOTE: The enumeration values below are compared using operator<, etc, with the expectation + * that a -> b in the diagram above implies that a < b in the enum below. + */ + enum State { preStart, running, joinRequired, joining, shutdownComplete }; + + /** * Returns an EventList containing one unsignaled EventState. This is a helper function for * performing allocations outside of _mutex, and should only be called by makeSingletonWork and * makeEvent(). @@ -147,6 +166,10 @@ private: */ void runCallback(std::shared_ptr<CallbackState> cbState); + bool _inShutdown_inlock() const; + void _setState_inlock(State newState); + stdx::unique_lock<stdx::mutex> _join(stdx::unique_lock<stdx::mutex> lk); + // The network interface used for remote command execution and waiting. std::unique_ptr<NetworkInterface> _net; @@ -168,8 +191,9 @@ private: // List of all events that have yet to be signaled. EventList _unsignaledEvents; - // Indicates whether or not the executor is shutting down. - bool _inShutdown = false; + // Lifecycle state of this executor. + stdx::condition_variable _stateChange; + State _state = preStart; }; } // namespace executor diff --git a/src/mongo/executor/thread_pool_task_executor_test.cpp b/src/mongo/executor/thread_pool_task_executor_test.cpp index db0cc594f42..2c4d29d8436 100644 --- a/src/mongo/executor/thread_pool_task_executor_test.cpp +++ b/src/mongo/executor/thread_pool_task_executor_test.cpp @@ -75,8 +75,6 @@ TEST_F(ThreadPoolExecutorTest, TimelyCancelationOfScheduleWorkAt) { executor.wait(cb1); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status1); ASSERT_EQUALS(startTime + Milliseconds(200), net->now()); - executor.shutdown(); - joinExecutorThread(); } bool sharedCallbackStateDestroyed = false; @@ -120,9 +118,6 @@ TEST_F(ThreadPoolExecutorTest, // to ThreadPoolTaskExecutor::CallbackState). ASSERT_TRUE(callbackInvoked); ASSERT_TRUE(sharedCallbackStateDestroyed); - - executor.shutdown(); - joinExecutorThread(); } TEST_F(ThreadPoolExecutorTest, ShutdownAndScheduleRaceDoesNotCrash) { @@ -162,7 +157,7 @@ TEST_F(ThreadPoolExecutorTest, ShutdownAndScheduleRaceDoesNotCrash) { barrier.countDownAndWait(); MONGO_FAIL_POINT_PAUSE_WHILE_SET((*fpTPTE1)); executor.shutdown(); - joinExecutorThread(); + executor.join(); ASSERT_OK(status1); ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status2); } |