summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/client/fetcher_test.cpp3
-rw-r--r--src/mongo/client/remote_command_retry_scheduler_test.cpp5
-rw-r--r--src/mongo/db/repl/abstract_async_component_test.cpp2
-rw-r--r--src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp4
-rw-r--r--src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h1
-rw-r--r--src/mongo/db/repl/base_cloner_test_fixture.cpp6
-rw-r--r--src/mongo/db/repl/databases_cloner_test.cpp7
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp7
-rw-r--r--src/mongo/db/repl/multiapplier_test.cpp10
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp5
-rw-r--r--src/mongo/db/repl/reporter_test.cpp3
-rw-r--r--src/mongo/db/repl/rollback_common_point_resolver_test.cpp13
-rw-r--r--src/mongo/db/repl/rollback_test_fixture.cpp1
-rw-r--r--src/mongo/db/repl/sync_source_resolver_test.cpp6
-rw-r--r--src/mongo/executor/network_interface_mock.cpp8
-rw-r--r--src/mongo/executor/network_interface_mock.h5
-rw-r--r--src/mongo/executor/task_executor.h27
-rw-r--r--src/mongo/executor/task_executor_test_fixture.cpp22
-rw-r--r--src/mongo/executor/task_executor_test_fixture.h13
-rw-r--r--src/mongo/executor/thread_pool_mock.cpp2
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp87
-rw-r--r--src/mongo/executor/thread_pool_task_executor.h28
-rw-r--r--src/mongo/executor/thread_pool_task_executor_test.cpp7
23 files changed, 135 insertions, 137 deletions
diff --git a/src/mongo/client/fetcher_test.cpp b/src/mongo/client/fetcher_test.cpp
index 00dae071d3a..4e17d55b346 100644
--- a/src/mongo/client/fetcher_test.cpp
+++ b/src/mongo/client/fetcher_test.cpp
@@ -117,7 +117,8 @@ void FetcherTest::setUp() {
}
void FetcherTest::tearDown() {
- executor::ThreadPoolExecutorTest::tearDown();
+ getExecutor().shutdown();
+ getExecutor().join();
// Executor may still invoke fetcher's callback before shutting down.
fetcher.reset();
}
diff --git a/src/mongo/client/remote_command_retry_scheduler_test.cpp b/src/mongo/client/remote_command_retry_scheduler_test.cpp
index 705324f94c5..d040d72ba19 100644
--- a/src/mongo/client/remote_command_retry_scheduler_test.cpp
+++ b/src/mongo/client/remote_command_retry_scheduler_test.cpp
@@ -63,7 +63,6 @@ public:
protected:
void setUp() override;
- void tearDown() override;
};
class CallbackResponseSaver {
@@ -152,10 +151,6 @@ void RemoteCommandRetrySchedulerTest::setUp() {
launchExecutorThread();
}
-void RemoteCommandRetrySchedulerTest::tearDown() {
- executor::ThreadPoolExecutorTest::tearDown();
-}
-
CallbackResponseSaver::CallbackResponseSaver() = default;
void CallbackResponseSaver::operator()(
diff --git a/src/mongo/db/repl/abstract_async_component_test.cpp b/src/mongo/db/repl/abstract_async_component_test.cpp
index 5c69ca67d76..9d82755bafd 100644
--- a/src/mongo/db/repl/abstract_async_component_test.cpp
+++ b/src/mongo/db/repl/abstract_async_component_test.cpp
@@ -177,8 +177,6 @@ void AbstractAsyncComponentTest::setUp() {
void AbstractAsyncComponentTest::tearDown() {
shutdownExecutorThread();
joinExecutorThread();
-
- executor::ThreadPoolExecutorTest::tearDown();
}
TEST_F(AbstractAsyncComponentTest, ConstructorThrowsUserAssertionOnNullTaskExecutor) {
diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp
index 98c1bd08957..ab818341712 100644
--- a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp
+++ b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp
@@ -92,10 +92,6 @@ void AbstractOplogFetcherTest::setUp() {
lastFetched = {456LL, {{123, 0}, 1}};
}
-void AbstractOplogFetcherTest::tearDown() {
- executor::ThreadPoolExecutorTest::tearDown();
-}
-
executor::RemoteCommandRequest AbstractOplogFetcherTest::processNetworkResponse(
executor::RemoteCommandResponse response, bool expectReadyRequestsAfterProcessing) {
diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h
index d33ff6174b1..2e238000e70 100644
--- a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h
+++ b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h
@@ -82,7 +82,6 @@ public:
protected:
void setUp() override;
- void tearDown() override;
/**
* Schedules network response and instructs network interface to process response.
diff --git a/src/mongo/db/repl/base_cloner_test_fixture.cpp b/src/mongo/db/repl/base_cloner_test_fixture.cpp
index 89676a8a7f6..e1a8131fb10 100644
--- a/src/mongo/db/repl/base_cloner_test_fixture.cpp
+++ b/src/mongo/db/repl/base_cloner_test_fixture.cpp
@@ -111,14 +111,12 @@ void BaseClonerTest::setUp() {
}
void BaseClonerTest::tearDown() {
- executor::ThreadPoolExecutorTest::shutdownExecutorThread();
- executor::ThreadPoolExecutorTest::joinExecutorThread();
+ getExecutor().shutdown();
+ getExecutor().join();
storageInterface.reset();
dbWorkThreadPool->join();
dbWorkThreadPool.reset();
-
- executor::ThreadPoolExecutorTest::tearDown();
}
void BaseClonerTest::clear() {
diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp
index f6a50786e47..36020cea9a8 100644
--- a/src/mongo/db/repl/databases_cloner_test.cpp
+++ b/src/mongo/db/repl/databases_cloner_test.cpp
@@ -186,12 +186,9 @@ protected:
}
void tearDown() override {
- executor::ThreadPoolExecutorTest::shutdownExecutorThread();
- executor::ThreadPoolExecutorTest::joinExecutorThread();
-
+ getExecutor().shutdown();
+ getExecutor().join();
_dbWorkThreadPool.join();
-
- executor::ThreadPoolExecutorTest::tearDown();
}
/**
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index c549bfbe391..1a55a5d83dc 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -360,8 +360,8 @@ protected:
if (_executorThreadShutdownComplete) {
return;
}
- executor::ThreadPoolExecutorTest::shutdownExecutorThread();
- executor::ThreadPoolExecutorTest::joinExecutorThread();
+ getExecutor().shutdown();
+ getExecutor().join();
_executorThreadShutdownComplete = true;
}
@@ -372,9 +372,6 @@ protected:
_dbWorkThreadPool.reset();
_replicationProcess.reset();
_storageInterface.reset();
-
- // tearDown() destroys the task executor which was referenced by the initial syncer.
- executor::ThreadPoolExecutorTest::tearDown();
}
/**
diff --git a/src/mongo/db/repl/multiapplier_test.cpp b/src/mongo/db/repl/multiapplier_test.cpp
index 45f387ecd0c..a031064130b 100644
--- a/src/mongo/db/repl/multiapplier_test.cpp
+++ b/src/mongo/db/repl/multiapplier_test.cpp
@@ -45,7 +45,6 @@ public:
private:
executor::ThreadPoolMock::Options makeThreadPoolMockOptions() const override;
void setUp() override;
- void tearDown() override;
};
executor::ThreadPoolMock::Options MultiApplierTest::makeThreadPoolMockOptions() const {
@@ -60,15 +59,6 @@ void MultiApplierTest::setUp() {
launchExecutorThread();
}
-void MultiApplierTest::tearDown() {
- executor::ThreadPoolExecutorTest::shutdownExecutorThread();
- executor::ThreadPoolExecutorTest::joinExecutorThread();
-
- // Local tear down steps here.
-
- executor::ThreadPoolExecutorTest::tearDown();
-}
-
Status applyOperation(MultiApplier::OperationPtrs*) {
return Status::OK();
};
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index f4d91c6baed..01e85bc355e 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -54,7 +54,6 @@ using NetworkGuard = executor::NetworkInterfaceMock::InNetworkGuard;
class OplogFetcherTest : public AbstractOplogFetcherTest {
protected:
void setUp() override;
- void tearDown() override;
/**
* Starts an oplog fetcher. Processes a single batch of results from
@@ -117,10 +116,6 @@ void OplogFetcherTest::setUp() {
};
}
-void OplogFetcherTest::tearDown() {
- AbstractOplogFetcherTest::tearDown();
-}
-
BSONObj OplogFetcherTest::makeOplogQueryMetadataObject(OpTime lastAppliedOpTime,
int rbid,
int primaryIndex,
diff --git a/src/mongo/db/repl/reporter_test.cpp b/src/mongo/db/repl/reporter_test.cpp
index 383b2cf8aca..f0261d64f5f 100644
--- a/src/mongo/db/repl/reporter_test.cpp
+++ b/src/mongo/db/repl/reporter_test.cpp
@@ -182,7 +182,8 @@ void ReporterTest::setUp() {
}
void ReporterTest::tearDown() {
- executor::ThreadPoolExecutorTest::tearDown();
+ getExecutor().shutdown();
+ getExecutor().join();
// Executor may still invoke reporter's callback before shutting down.
reporter.reset();
posUpdater.reset();
diff --git a/src/mongo/db/repl/rollback_common_point_resolver_test.cpp b/src/mongo/db/repl/rollback_common_point_resolver_test.cpp
index 4ed949ae1ce..d537e200362 100644
--- a/src/mongo/db/repl/rollback_common_point_resolver_test.cpp
+++ b/src/mongo/db/repl/rollback_common_point_resolver_test.cpp
@@ -99,9 +99,6 @@ private:
class RollbackCommonPointResolverTest : public AbstractOplogFetcherTest {
protected:
- void setUp() override;
- void tearDown() override;
-
/**
* Starts a rollback common point resolver. Processes a single batch of results from
* the oplog query and shuts down.
@@ -152,16 +149,6 @@ protected:
std::unique_ptr<ShutdownState> shutdownState;
};
-void RollbackCommonPointResolverTest::setUp() {
- AbstractOplogFetcherTest::setUp();
-}
-
-void RollbackCommonPointResolverTest::tearDown() {
- AbstractOplogFetcherTest::tearDown();
- resolver.reset();
- shutdownState.reset();
-}
-
HostAndPort source("localhost:12345");
NamespaceString nss("local.oplog.rs");
diff --git a/src/mongo/db/repl/rollback_test_fixture.cpp b/src/mongo/db/repl/rollback_test_fixture.cpp
index 791cbf0e2e7..cd77a41ba95 100644
--- a/src/mongo/db/repl/rollback_test_fixture.cpp
+++ b/src/mongo/db/repl/rollback_test_fixture.cpp
@@ -90,7 +90,6 @@ void RollbackTest::setUp() {
void RollbackTest::tearDown() {
_coordinator = nullptr;
_opCtx.reset();
- _threadPoolExecutorTest.tearDown();
// We cannot unset the global replication coordinator because ServiceContextMongoD::tearDown()
// calls dropAllDatabasesExceptLocal() which requires the replication coordinator to clear all
diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp
index da410b6082a..48bdf70f13c 100644
--- a/src/mongo/db/repl/sync_source_resolver_test.cpp
+++ b/src/mongo/db/repl/sync_source_resolver_test.cpp
@@ -108,14 +108,12 @@ void SyncSourceResolverTest::setUp() {
}
void SyncSourceResolverTest::tearDown() {
- executor::ThreadPoolExecutorTest::shutdownExecutorThread();
- executor::ThreadPoolExecutorTest::joinExecutorThread();
+ getExecutor().shutdown();
+ getExecutor().join();
_resolver.reset();
_selector.reset();
_executorProxy.reset();
-
- executor::ThreadPoolExecutorTest::tearDown();
}
std::unique_ptr<SyncSourceResolver> SyncSourceResolverTest::_makeResolver(
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index 81198e74ee2..bf45981490b 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -196,6 +196,10 @@ bool NetworkInterfaceMock::onNetworkThread() {
void NetworkInterfaceMock::startup() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _startup_inlock();
+}
+
+void NetworkInterfaceMock::_startup_inlock() {
invariant(!_hasStarted);
_hasStarted = true;
_inShutdown.store(false);
@@ -207,7 +211,9 @@ void NetworkInterfaceMock::shutdown() {
invariant(!inShutdown());
stdx::unique_lock<stdx::mutex> lk(_mutex);
- invariant(_hasStarted);
+ if (!_hasStarted) {
+ _startup_inlock();
+ }
_inShutdown.store(true);
NetworkOperationList todo;
todo.splice(todo.end(), _scheduled);
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index 91b04cb01bd..edc40326d26 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -288,6 +288,11 @@ private:
enum ThreadType { kNoThread = 0, kExecutorThread = 1, kNetworkThread = 2 };
/**
+ * Implementation of startup behavior.
+ */
+ void _startup_inlock();
+
+ /**
* Returns information about the state of this mock for diagnostic purposes.
*/
std::string _getDiagnosticString_inlock() const;
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index 0c7a676a358..22146268788 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -52,7 +52,7 @@ namespace executor {
struct ConnectionPoolStats;
/**
- * Generic event loop with notions of events and callbacks.
+ * Executor with notions of events and callbacks.
*
* Callbacks represent work to be performed by the executor.
* They may be scheduled by client threads or by other callbacks. Methods that
@@ -68,9 +68,6 @@ struct ConnectionPoolStats;
*
* If an event is unsignaled when shutdown is called, the executor will ensure that any threads
* blocked in waitForEvent() eventually return.
- *
- * Logically, Callbacks and Events exist for the life of the executor. That means that while
- * the executor is in scope, no CallbackHandle or EventHandle is stale.
*/
class TaskExecutor {
MONGO_DISALLOW_COPYING(TaskExecutor);
@@ -105,6 +102,10 @@ public:
*/
using RemoteCommandCallbackFn = stdx::function<void(const RemoteCommandCallbackArgs&)>;
+ /**
+ * Destroys the task executor. Implicitly performs the equivalent of shutdown() and join()
+ * before returning, if necessary.
+ */
virtual ~TaskExecutor();
/**
@@ -115,21 +116,21 @@ public:
virtual void startup() = 0;
/**
- * Signals to the executor that it should shut down. This method should not block. After
- * calling shutdown, it is illegal to schedule more tasks on the executor and join should be
- * called to wait for shutdown to complete.
+ * Signals to the executor that it should shut down. This method may be called from within a
+ * callback. As such, this method must not block. After shutdown returns, attempts to schedule
+ * more tasks on the executor will return errors.
*
- * It is legal to call this method multiple times, but it should only be called after startup
- * has been called.
+ * It is legal to call this method multiple times. If the task executor goes out of scope
+ * before this method is called, the destructor performs this activity.
*/
virtual void shutdown() = 0;
/**
- * Waits for the shutdown sequence initiated by an earlier call to shutdown to complete. It is
- * only legal to call this method if startup has been called earlier.
+ * Waits for the shutdown sequence initiated by a call to shutdown() to complete. Must not be
+ * called from within a callback.
*
- * If startup is ever called, the code must ensure that join is eventually called once and only
- * once.
+ * Unlike stdx::thread::join, this method may be called from any thread that wishes to wait for
+ * shutdown to complete.
*/
virtual void join() = 0;
diff --git a/src/mongo/executor/task_executor_test_fixture.cpp b/src/mongo/executor/task_executor_test_fixture.cpp
index a97c3559e7e..f58657bacbb 100644
--- a/src/mongo/executor/task_executor_test_fixture.cpp
+++ b/src/mongo/executor/task_executor_test_fixture.cpp
@@ -63,42 +63,20 @@ void TaskExecutorTest::setUp() {
auto net = stdx::make_unique<NetworkInterfaceMock>();
_net = net.get();
_executor = makeTaskExecutor(std::move(net));
- _executorState = LifecycleState::kPreStart;
-}
-
-void TaskExecutorTest::tearDown() {
- if (_executorState == LifecycleState::kRunning) {
- shutdownExecutorThread();
- }
- if (_executorState == LifecycleState::kJoinRequired) {
- joinExecutorThread();
- }
- invariant(_executorState == LifecycleState::kPreStart ||
- _executorState == LifecycleState::kShutdownComplete);
- _executor.reset();
}
void TaskExecutorTest::launchExecutorThread() {
- invariant(_executorState == LifecycleState::kPreStart);
_executor->startup();
- _executorState = LifecycleState::kRunning;
postExecutorThreadLaunch();
}
void TaskExecutorTest::shutdownExecutorThread() {
- invariant(_executorState == LifecycleState::kRunning);
_executor->shutdown();
- _executorState = LifecycleState::kJoinRequired;
}
void TaskExecutorTest::joinExecutorThread() {
- // Tests may call shutdown() directly, bypassing the state change in shutdownExecutorThread().
- invariant(_executorState == LifecycleState::kRunning ||
- _executorState == LifecycleState::kJoinRequired);
_net->exitNetwork();
- _executorState = LifecycleState::kJoining;
_executor->join();
- _executorState = LifecycleState::kShutdownComplete;
}
void TaskExecutorTest::_doTest() {
diff --git a/src/mongo/executor/task_executor_test_fixture.h b/src/mongo/executor/task_executor_test_fixture.h
index e90cbaec413..ff9904125d4 100644
--- a/src/mongo/executor/task_executor_test_fixture.h
+++ b/src/mongo/executor/task_executor_test_fixture.h
@@ -73,13 +73,6 @@ public:
*/
void setUp() override;
- /**
- * Destroys the replication executor.
- *
- * Shuts down and joins the running executor.
- */
- void tearDown() override;
-
void launchExecutorThread();
void shutdownExecutorThread();
void joinExecutorThread();
@@ -102,12 +95,6 @@ private:
NetworkInterfaceMock* _net;
std::unique_ptr<TaskExecutor> _executor;
-
- /**
- * kPreStart -> kRunning -> kJoinRequired -> kJoining -> kShutdownComplete
- */
- enum LifecycleState { kPreStart, kRunning, kJoinRequired, kJoining, kShutdownComplete };
- LifecycleState _executorState = LifecycleState::kPreStart;
};
} // namespace executor
diff --git a/src/mongo/executor/thread_pool_mock.cpp b/src/mongo/executor/thread_pool_mock.cpp
index b3e1b84a836..e09c28e2d5a 100644
--- a/src/mongo/executor/thread_pool_mock.cpp
+++ b/src/mongo/executor/thread_pool_mock.cpp
@@ -45,6 +45,7 @@ ThreadPoolMock::~ThreadPoolMock() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
_inShutdown = true;
_net->signalWorkAvailable();
+ _net->exitNetwork();
if (_started) {
if (_worker.joinable()) {
lk.unlock();
@@ -82,6 +83,7 @@ void ThreadPoolMock::join() {
if (_started) {
stdx::thread toJoin = std::move(_worker);
_net->signalWorkAvailable();
+ _net->exitNetwork();
lk.unlock();
toJoin.join();
lk.lock();
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 7d111ae6f95..dbcf465f8d8 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -127,20 +127,31 @@ ThreadPoolTaskExecutor::ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterfa
std::unique_ptr<NetworkInterface> net)
: _net(std::move(net)), _pool(std::move(pool)) {}
-ThreadPoolTaskExecutor::~ThreadPoolTaskExecutor() {}
+ThreadPoolTaskExecutor::~ThreadPoolTaskExecutor() {
+ shutdown();
+ auto lk = _join(stdx::unique_lock<stdx::mutex>(_mutex));
+ invariant(_state == shutdownComplete);
+}
void ThreadPoolTaskExecutor::startup() {
_net->startup();
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_inShutdown) {
+ if (_inShutdown_inlock()) {
return;
}
+ invariant(_state == preStart);
+ _setState_inlock(running);
_pool->startup();
}
void ThreadPoolTaskExecutor::shutdown() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- _inShutdown = true;
+ if (_inShutdown_inlock()) {
+ invariant(_networkInProgressQueue.empty());
+ invariant(_sleepersQueue.empty());
+ return;
+ }
+ _setState_inlock(joinRequired);
WorkQueue pending;
pending.splice(pending.end(), _networkInProgressQueue);
pending.splice(pending.end(), _sleepersQueue);
@@ -158,21 +169,45 @@ void ThreadPoolTaskExecutor::shutdown() {
}
void ThreadPoolTaskExecutor::join() {
- _pool->join();
- {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- while (!_unsignaledEvents.empty()) {
- auto eventState = _unsignaledEvents.front();
- invariant(eventState->waiters.empty());
- EventHandle event;
- setEventForHandle(&event, std::move(eventState));
- signalEvent_inlock(event, std::move(lk));
- lk = stdx::unique_lock<stdx::mutex>(_mutex);
+ _join(stdx::unique_lock<stdx::mutex>(_mutex));
+}
+
+stdx::unique_lock<stdx::mutex> ThreadPoolTaskExecutor::_join(stdx::unique_lock<stdx::mutex> lk) {
+ _stateChange.wait(lk, [this] {
+ switch (_state) {
+ case preStart:
+ return false;
+ case running:
+ return false;
+ case joinRequired:
+ return true;
+ case joining:
+ return false;
+ case shutdownComplete:
+ return true;
}
+ MONGO_UNREACHABLE;
+ });
+ if (_state == shutdownComplete) {
+ return lk;
}
+ invariant(_state == joinRequired);
+ _setState_inlock(joining);
+ lk.unlock();
+ _pool->join();
+ lk.lock();
+ while (!_unsignaledEvents.empty()) {
+ auto eventState = _unsignaledEvents.front();
+ invariant(eventState->waiters.empty());
+ EventHandle event;
+ setEventForHandle(&event, std::move(eventState));
+ signalEvent_inlock(event, std::move(lk));
+ lk = stdx::unique_lock<stdx::mutex>(_mutex);
+ }
+ lk.unlock();
_net->shutdown();
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ lk.lock();
// The _poolInProgressQueue may not be empty if the network interface attempted to schedule work
// into _pool after _pool->shutdown(). Because _pool->join() has returned, we know that any
// items left in _poolInProgressQueue will never be processed by another thread, so we process
@@ -186,6 +221,8 @@ void ThreadPoolTaskExecutor::join() {
invariant(_networkInProgressQueue.empty());
invariant(_sleepersQueue.empty());
invariant(_unsignaledEvents.empty());
+ _setState_inlock(shutdownComplete);
+ return lk;
}
void ThreadPoolTaskExecutor::appendDiagnosticBSON(BSONObjBuilder* b) const {
@@ -204,7 +241,7 @@ void ThreadPoolTaskExecutor::appendDiagnosticBSON(BSONObjBuilder* b) const {
queues.done();
b->appendIntOrLL("unsignaledEvents", _unsignaledEvents.size());
- b->append("shuttingDown", _inShutdown);
+ b->append("shuttingDown", _inShutdown_inlock());
b->append("networkInterface", _net->getDiagnosticString());
}
@@ -217,7 +254,7 @@ StatusWith<TaskExecutor::EventHandle> ThreadPoolTaskExecutor::makeEvent() {
EventHandle event;
setEventForHandle(&event, el.front());
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_inShutdown) {
+ if (_inShutdown_inlock()) {
return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
}
_unsignaledEvents.splice(_unsignaledEvents.end(), el);
@@ -355,7 +392,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC
remoteCommandFinished(cbData, cb, scheduledRequest, response);
};
stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (_inShutdown) {
+ if (_inShutdown_inlock()) {
return;
}
LOG(3) << "Received remote response: "
@@ -370,7 +407,7 @@ void ThreadPoolTaskExecutor::cancel(const CallbackHandle& cbHandle) {
invariant(cbHandle.isValid());
auto cbState = checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle));
stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (_inShutdown) {
+ if (_inShutdown_inlock()) {
return;
}
cbState->canceled.store(1);
@@ -415,7 +452,7 @@ void ThreadPoolTaskExecutor::appendConnectionStats(ConnectionPoolStats* stats) c
StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::enqueueCallbackState_inlock(
WorkQueue* queue, WorkQueue* wq) {
- if (_inShutdown) {
+ if (_inShutdown_inlock()) {
return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
}
invariant(!wq->empty());
@@ -515,6 +552,18 @@ void ThreadPoolTaskExecutor::runCallback(std::shared_ptr<CallbackState> cbStateA
}
}
+bool ThreadPoolTaskExecutor::_inShutdown_inlock() const {
+ return _state >= joinRequired;
+}
+
+void ThreadPoolTaskExecutor::_setState_inlock(State newState) {
+ if (newState == _state) {
+ return;
+ }
+ _state = newState;
+ _stateChange.notify_all();
+}
+
void ThreadPoolTaskExecutor::dropConnections(const HostAndPort& hostAndPort) {
_net->dropConnections(hostAndPort);
}
diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h
index 9d6a6d940b9..4a02e3bb73a 100644
--- a/src/mongo/executor/thread_pool_task_executor.h
+++ b/src/mongo/executor/thread_pool_task_executor.h
@@ -95,6 +95,25 @@ private:
using EventList = stdx::list<std::shared_ptr<EventState>>;
/**
+ * Representation of the stage of life of a thread pool.
+ *
+ * A pool starts out in the preStart state, and ends life in the shutdownComplete state. Work
+ * may only be scheduled in the preStart and running states. Threads may only be started in the
+ * running state. In shutdownComplete, there are no remaining threads or pending tasks to
+ * execute.
+ *
+ * Diagram of legal transitions:
+ *
+ * preStart -> running -> joinRequired -> joining -> shutdownComplete
+ * \ ^
+ * \_____________/
+ *
+ * NOTE: The enumeration values below are compared using operator<, etc, with the expectation
+ * that a -> b in the diagram above implies that a < b in the enum below.
+ */
+ enum State { preStart, running, joinRequired, joining, shutdownComplete };
+
+ /**
* Returns an EventList containing one unsignaled EventState. This is a helper function for
* performing allocations outside of _mutex, and should only be called by makeSingletonWork and
* makeEvent().
@@ -147,6 +166,10 @@ private:
*/
void runCallback(std::shared_ptr<CallbackState> cbState);
+ bool _inShutdown_inlock() const;
+ void _setState_inlock(State newState);
+ stdx::unique_lock<stdx::mutex> _join(stdx::unique_lock<stdx::mutex> lk);
+
// The network interface used for remote command execution and waiting.
std::unique_ptr<NetworkInterface> _net;
@@ -168,8 +191,9 @@ private:
// List of all events that have yet to be signaled.
EventList _unsignaledEvents;
- // Indicates whether or not the executor is shutting down.
- bool _inShutdown = false;
+ // Lifecycle state of this executor.
+ stdx::condition_variable _stateChange;
+ State _state = preStart;
};
} // namespace executor
diff --git a/src/mongo/executor/thread_pool_task_executor_test.cpp b/src/mongo/executor/thread_pool_task_executor_test.cpp
index db0cc594f42..2c4d29d8436 100644
--- a/src/mongo/executor/thread_pool_task_executor_test.cpp
+++ b/src/mongo/executor/thread_pool_task_executor_test.cpp
@@ -75,8 +75,6 @@ TEST_F(ThreadPoolExecutorTest, TimelyCancelationOfScheduleWorkAt) {
executor.wait(cb1);
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status1);
ASSERT_EQUALS(startTime + Milliseconds(200), net->now());
- executor.shutdown();
- joinExecutorThread();
}
bool sharedCallbackStateDestroyed = false;
@@ -120,9 +118,6 @@ TEST_F(ThreadPoolExecutorTest,
// to ThreadPoolTaskExecutor::CallbackState).
ASSERT_TRUE(callbackInvoked);
ASSERT_TRUE(sharedCallbackStateDestroyed);
-
- executor.shutdown();
- joinExecutorThread();
}
TEST_F(ThreadPoolExecutorTest, ShutdownAndScheduleRaceDoesNotCrash) {
@@ -162,7 +157,7 @@ TEST_F(ThreadPoolExecutorTest, ShutdownAndScheduleRaceDoesNotCrash) {
barrier.countDownAndWait();
MONGO_FAIL_POINT_PAUSE_WHILE_SET((*fpTPTE1));
executor.shutdown();
- joinExecutorThread();
+ executor.join();
ASSERT_OK(status1);
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status2);
}