summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
authorAndy Schwerin <schwerin@mongodb.com>2017-06-09 14:06:26 -0400
committerAndy Schwerin <schwerin@mongodb.com>2017-06-09 14:06:26 -0400
commitf9c36696a25d3837f512421755952736236bbed0 (patch)
tree9becbcdb1ade14d930d3b2da3c731ebfe20f8a75 /src/mongo/executor
parentdb55e668d87d66b171c310241bdbcabfa790e2cf (diff)
downloadmongo-f9c36696a25d3837f512421755952736236bbed0.tar.gz
SERVER-29493 Make ThreadPoolTaskExecutor's destructor execute shutdown and join.
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/network_interface_mock.cpp8
-rw-r--r--src/mongo/executor/network_interface_mock.h5
-rw-r--r--src/mongo/executor/task_executor.h27
-rw-r--r--src/mongo/executor/task_executor_test_fixture.cpp22
-rw-r--r--src/mongo/executor/task_executor_test_fixture.h13
-rw-r--r--src/mongo/executor/thread_pool_mock.cpp2
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp87
-rw-r--r--src/mongo/executor/thread_pool_task_executor.h28
-rw-r--r--src/mongo/executor/thread_pool_task_executor_test.cpp7
9 files changed, 123 insertions, 76 deletions
diff --git a/src/mongo/executor/network_interface_mock.cpp b/src/mongo/executor/network_interface_mock.cpp
index 81198e74ee2..bf45981490b 100644
--- a/src/mongo/executor/network_interface_mock.cpp
+++ b/src/mongo/executor/network_interface_mock.cpp
@@ -196,6 +196,10 @@ bool NetworkInterfaceMock::onNetworkThread() {
void NetworkInterfaceMock::startup() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _startup_inlock();
+}
+
+void NetworkInterfaceMock::_startup_inlock() {
invariant(!_hasStarted);
_hasStarted = true;
_inShutdown.store(false);
@@ -207,7 +211,9 @@ void NetworkInterfaceMock::shutdown() {
invariant(!inShutdown());
stdx::unique_lock<stdx::mutex> lk(_mutex);
- invariant(_hasStarted);
+ if (!_hasStarted) {
+ _startup_inlock();
+ }
_inShutdown.store(true);
NetworkOperationList todo;
todo.splice(todo.end(), _scheduled);
diff --git a/src/mongo/executor/network_interface_mock.h b/src/mongo/executor/network_interface_mock.h
index 91b04cb01bd..edc40326d26 100644
--- a/src/mongo/executor/network_interface_mock.h
+++ b/src/mongo/executor/network_interface_mock.h
@@ -288,6 +288,11 @@ private:
enum ThreadType { kNoThread = 0, kExecutorThread = 1, kNetworkThread = 2 };
/**
+ * Implementation of startup behavior.
+ */
+ void _startup_inlock();
+
+ /**
* Returns information about the state of this mock for diagnostic purposes.
*/
std::string _getDiagnosticString_inlock() const;
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index 0c7a676a358..22146268788 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -52,7 +52,7 @@ namespace executor {
struct ConnectionPoolStats;
/**
- * Generic event loop with notions of events and callbacks.
+ * Executor with notions of events and callbacks.
*
* Callbacks represent work to be performed by the executor.
* They may be scheduled by client threads or by other callbacks. Methods that
@@ -68,9 +68,6 @@ struct ConnectionPoolStats;
*
* If an event is unsignaled when shutdown is called, the executor will ensure that any threads
* blocked in waitForEvent() eventually return.
- *
- * Logically, Callbacks and Events exist for the life of the executor. That means that while
- * the executor is in scope, no CallbackHandle or EventHandle is stale.
*/
class TaskExecutor {
MONGO_DISALLOW_COPYING(TaskExecutor);
@@ -105,6 +102,10 @@ public:
*/
using RemoteCommandCallbackFn = stdx::function<void(const RemoteCommandCallbackArgs&)>;
+ /**
+ * Destroys the task executor. Implicitly performs the equivalent of shutdown() and join()
+ * before returning, if necessary.
+ */
virtual ~TaskExecutor();
/**
@@ -115,21 +116,21 @@ public:
virtual void startup() = 0;
/**
- * Signals to the executor that it should shut down. This method should not block. After
- * calling shutdown, it is illegal to schedule more tasks on the executor and join should be
- * called to wait for shutdown to complete.
+ * Signals to the executor that it should shut down. This method may be called from within a
+ * callback. As such, this method must not block. After shutdown returns, attempts to schedule
+ * more tasks on the executor will return errors.
*
- * It is legal to call this method multiple times, but it should only be called after startup
- * has been called.
+ * It is legal to call this method multiple times. If the task executor goes out of scope
+ * before this method is called, the destructor performs this activity.
*/
virtual void shutdown() = 0;
/**
- * Waits for the shutdown sequence initiated by an earlier call to shutdown to complete. It is
- * only legal to call this method if startup has been called earlier.
+ * Waits for the shutdown sequence initiated by a call to shutdown() to complete. Must not be
+ * called from within a callback.
*
- * If startup is ever called, the code must ensure that join is eventually called once and only
- * once.
+ * Unlike stdx::thread::join, this method may be called from any thread that wishes to wait for
+ * shutdown to complete.
*/
virtual void join() = 0;
diff --git a/src/mongo/executor/task_executor_test_fixture.cpp b/src/mongo/executor/task_executor_test_fixture.cpp
index a97c3559e7e..f58657bacbb 100644
--- a/src/mongo/executor/task_executor_test_fixture.cpp
+++ b/src/mongo/executor/task_executor_test_fixture.cpp
@@ -63,42 +63,20 @@ void TaskExecutorTest::setUp() {
auto net = stdx::make_unique<NetworkInterfaceMock>();
_net = net.get();
_executor = makeTaskExecutor(std::move(net));
- _executorState = LifecycleState::kPreStart;
-}
-
-void TaskExecutorTest::tearDown() {
- if (_executorState == LifecycleState::kRunning) {
- shutdownExecutorThread();
- }
- if (_executorState == LifecycleState::kJoinRequired) {
- joinExecutorThread();
- }
- invariant(_executorState == LifecycleState::kPreStart ||
- _executorState == LifecycleState::kShutdownComplete);
- _executor.reset();
}
void TaskExecutorTest::launchExecutorThread() {
- invariant(_executorState == LifecycleState::kPreStart);
_executor->startup();
- _executorState = LifecycleState::kRunning;
postExecutorThreadLaunch();
}
void TaskExecutorTest::shutdownExecutorThread() {
- invariant(_executorState == LifecycleState::kRunning);
_executor->shutdown();
- _executorState = LifecycleState::kJoinRequired;
}
void TaskExecutorTest::joinExecutorThread() {
- // Tests may call shutdown() directly, bypassing the state change in shutdownExecutorThread().
- invariant(_executorState == LifecycleState::kRunning ||
- _executorState == LifecycleState::kJoinRequired);
_net->exitNetwork();
- _executorState = LifecycleState::kJoining;
_executor->join();
- _executorState = LifecycleState::kShutdownComplete;
}
void TaskExecutorTest::_doTest() {
diff --git a/src/mongo/executor/task_executor_test_fixture.h b/src/mongo/executor/task_executor_test_fixture.h
index e90cbaec413..ff9904125d4 100644
--- a/src/mongo/executor/task_executor_test_fixture.h
+++ b/src/mongo/executor/task_executor_test_fixture.h
@@ -73,13 +73,6 @@ public:
*/
void setUp() override;
- /**
- * Destroys the replication executor.
- *
- * Shuts down and joins the running executor.
- */
- void tearDown() override;
-
void launchExecutorThread();
void shutdownExecutorThread();
void joinExecutorThread();
@@ -102,12 +95,6 @@ private:
NetworkInterfaceMock* _net;
std::unique_ptr<TaskExecutor> _executor;
-
- /**
- * kPreStart -> kRunning -> kJoinRequired -> kJoining -> kShutdownComplete
- */
- enum LifecycleState { kPreStart, kRunning, kJoinRequired, kJoining, kShutdownComplete };
- LifecycleState _executorState = LifecycleState::kPreStart;
};
} // namespace executor
diff --git a/src/mongo/executor/thread_pool_mock.cpp b/src/mongo/executor/thread_pool_mock.cpp
index b3e1b84a836..e09c28e2d5a 100644
--- a/src/mongo/executor/thread_pool_mock.cpp
+++ b/src/mongo/executor/thread_pool_mock.cpp
@@ -45,6 +45,7 @@ ThreadPoolMock::~ThreadPoolMock() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
_inShutdown = true;
_net->signalWorkAvailable();
+ _net->exitNetwork();
if (_started) {
if (_worker.joinable()) {
lk.unlock();
@@ -82,6 +83,7 @@ void ThreadPoolMock::join() {
if (_started) {
stdx::thread toJoin = std::move(_worker);
_net->signalWorkAvailable();
+ _net->exitNetwork();
lk.unlock();
toJoin.join();
lk.lock();
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 7d111ae6f95..dbcf465f8d8 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -127,20 +127,31 @@ ThreadPoolTaskExecutor::ThreadPoolTaskExecutor(std::unique_ptr<ThreadPoolInterfa
std::unique_ptr<NetworkInterface> net)
: _net(std::move(net)), _pool(std::move(pool)) {}
-ThreadPoolTaskExecutor::~ThreadPoolTaskExecutor() {}
+ThreadPoolTaskExecutor::~ThreadPoolTaskExecutor() {
+ shutdown();
+ auto lk = _join(stdx::unique_lock<stdx::mutex>(_mutex));
+ invariant(_state == shutdownComplete);
+}
void ThreadPoolTaskExecutor::startup() {
_net->startup();
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_inShutdown) {
+ if (_inShutdown_inlock()) {
return;
}
+ invariant(_state == preStart);
+ _setState_inlock(running);
_pool->startup();
}
void ThreadPoolTaskExecutor::shutdown() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- _inShutdown = true;
+ if (_inShutdown_inlock()) {
+ invariant(_networkInProgressQueue.empty());
+ invariant(_sleepersQueue.empty());
+ return;
+ }
+ _setState_inlock(joinRequired);
WorkQueue pending;
pending.splice(pending.end(), _networkInProgressQueue);
pending.splice(pending.end(), _sleepersQueue);
@@ -158,21 +169,45 @@ void ThreadPoolTaskExecutor::shutdown() {
}
void ThreadPoolTaskExecutor::join() {
- _pool->join();
- {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- while (!_unsignaledEvents.empty()) {
- auto eventState = _unsignaledEvents.front();
- invariant(eventState->waiters.empty());
- EventHandle event;
- setEventForHandle(&event, std::move(eventState));
- signalEvent_inlock(event, std::move(lk));
- lk = stdx::unique_lock<stdx::mutex>(_mutex);
+ _join(stdx::unique_lock<stdx::mutex>(_mutex));
+}
+
+stdx::unique_lock<stdx::mutex> ThreadPoolTaskExecutor::_join(stdx::unique_lock<stdx::mutex> lk) {
+ _stateChange.wait(lk, [this] {
+ switch (_state) {
+ case preStart:
+ return false;
+ case running:
+ return false;
+ case joinRequired:
+ return true;
+ case joining:
+ return false;
+ case shutdownComplete:
+ return true;
}
+ MONGO_UNREACHABLE;
+ });
+ if (_state == shutdownComplete) {
+ return lk;
}
+ invariant(_state == joinRequired);
+ _setState_inlock(joining);
+ lk.unlock();
+ _pool->join();
+ lk.lock();
+ while (!_unsignaledEvents.empty()) {
+ auto eventState = _unsignaledEvents.front();
+ invariant(eventState->waiters.empty());
+ EventHandle event;
+ setEventForHandle(&event, std::move(eventState));
+ signalEvent_inlock(event, std::move(lk));
+ lk = stdx::unique_lock<stdx::mutex>(_mutex);
+ }
+ lk.unlock();
_net->shutdown();
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ lk.lock();
// The _poolInProgressQueue may not be empty if the network interface attempted to schedule work
// into _pool after _pool->shutdown(). Because _pool->join() has returned, we know that any
// items left in _poolInProgressQueue will never be processed by another thread, so we process
@@ -186,6 +221,8 @@ void ThreadPoolTaskExecutor::join() {
invariant(_networkInProgressQueue.empty());
invariant(_sleepersQueue.empty());
invariant(_unsignaledEvents.empty());
+ _setState_inlock(shutdownComplete);
+ return lk;
}
void ThreadPoolTaskExecutor::appendDiagnosticBSON(BSONObjBuilder* b) const {
@@ -204,7 +241,7 @@ void ThreadPoolTaskExecutor::appendDiagnosticBSON(BSONObjBuilder* b) const {
queues.done();
b->appendIntOrLL("unsignaledEvents", _unsignaledEvents.size());
- b->append("shuttingDown", _inShutdown);
+ b->append("shuttingDown", _inShutdown_inlock());
b->append("networkInterface", _net->getDiagnosticString());
}
@@ -217,7 +254,7 @@ StatusWith<TaskExecutor::EventHandle> ThreadPoolTaskExecutor::makeEvent() {
EventHandle event;
setEventForHandle(&event, el.front());
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (_inShutdown) {
+ if (_inShutdown_inlock()) {
return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
}
_unsignaledEvents.splice(_unsignaledEvents.end(), el);
@@ -355,7 +392,7 @@ StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::scheduleRemoteC
remoteCommandFinished(cbData, cb, scheduledRequest, response);
};
stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (_inShutdown) {
+ if (_inShutdown_inlock()) {
return;
}
LOG(3) << "Received remote response: "
@@ -370,7 +407,7 @@ void ThreadPoolTaskExecutor::cancel(const CallbackHandle& cbHandle) {
invariant(cbHandle.isValid());
auto cbState = checked_cast<CallbackState*>(getCallbackFromHandle(cbHandle));
stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (_inShutdown) {
+ if (_inShutdown_inlock()) {
return;
}
cbState->canceled.store(1);
@@ -415,7 +452,7 @@ void ThreadPoolTaskExecutor::appendConnectionStats(ConnectionPoolStats* stats) c
StatusWith<TaskExecutor::CallbackHandle> ThreadPoolTaskExecutor::enqueueCallbackState_inlock(
WorkQueue* queue, WorkQueue* wq) {
- if (_inShutdown) {
+ if (_inShutdown_inlock()) {
return {ErrorCodes::ShutdownInProgress, "Shutdown in progress"};
}
invariant(!wq->empty());
@@ -515,6 +552,18 @@ void ThreadPoolTaskExecutor::runCallback(std::shared_ptr<CallbackState> cbStateA
}
}
+bool ThreadPoolTaskExecutor::_inShutdown_inlock() const {
+ return _state >= joinRequired;
+}
+
+void ThreadPoolTaskExecutor::_setState_inlock(State newState) {
+ if (newState == _state) {
+ return;
+ }
+ _state = newState;
+ _stateChange.notify_all();
+}
+
void ThreadPoolTaskExecutor::dropConnections(const HostAndPort& hostAndPort) {
_net->dropConnections(hostAndPort);
}
diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h
index 9d6a6d940b9..4a02e3bb73a 100644
--- a/src/mongo/executor/thread_pool_task_executor.h
+++ b/src/mongo/executor/thread_pool_task_executor.h
@@ -95,6 +95,25 @@ private:
using EventList = stdx::list<std::shared_ptr<EventState>>;
/**
+ * Representation of the stage of life of a thread pool.
+ *
+ * A pool starts out in the preStart state, and ends life in the shutdownComplete state. Work
+ * may only be scheduled in the preStart and running states. Threads may only be started in the
+ * running state. In shutdownComplete, there are no remaining threads or pending tasks to
+ * execute.
+ *
+ * Diagram of legal transitions:
+ *
+ * preStart -> running -> joinRequired -> joining -> shutdownComplete
+ * \ ^
+ * \_____________/
+ *
+ * NOTE: The enumeration values below are compared using operator<, etc, with the expectation
+ * that a -> b in the diagram above implies that a < b in the enum below.
+ */
+ enum State { preStart, running, joinRequired, joining, shutdownComplete };
+
+ /**
* Returns an EventList containing one unsignaled EventState. This is a helper function for
* performing allocations outside of _mutex, and should only be called by makeSingletonWork and
* makeEvent().
@@ -147,6 +166,10 @@ private:
*/
void runCallback(std::shared_ptr<CallbackState> cbState);
+ bool _inShutdown_inlock() const;
+ void _setState_inlock(State newState);
+ stdx::unique_lock<stdx::mutex> _join(stdx::unique_lock<stdx::mutex> lk);
+
// The network interface used for remote command execution and waiting.
std::unique_ptr<NetworkInterface> _net;
@@ -168,8 +191,9 @@ private:
// List of all events that have yet to be signaled.
EventList _unsignaledEvents;
- // Indicates whether or not the executor is shutting down.
- bool _inShutdown = false;
+ // Lifecycle state of this executor.
+ stdx::condition_variable _stateChange;
+ State _state = preStart;
};
} // namespace executor
diff --git a/src/mongo/executor/thread_pool_task_executor_test.cpp b/src/mongo/executor/thread_pool_task_executor_test.cpp
index db0cc594f42..2c4d29d8436 100644
--- a/src/mongo/executor/thread_pool_task_executor_test.cpp
+++ b/src/mongo/executor/thread_pool_task_executor_test.cpp
@@ -75,8 +75,6 @@ TEST_F(ThreadPoolExecutorTest, TimelyCancelationOfScheduleWorkAt) {
executor.wait(cb1);
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status1);
ASSERT_EQUALS(startTime + Milliseconds(200), net->now());
- executor.shutdown();
- joinExecutorThread();
}
bool sharedCallbackStateDestroyed = false;
@@ -120,9 +118,6 @@ TEST_F(ThreadPoolExecutorTest,
// to ThreadPoolTaskExecutor::CallbackState).
ASSERT_TRUE(callbackInvoked);
ASSERT_TRUE(sharedCallbackStateDestroyed);
-
- executor.shutdown();
- joinExecutorThread();
}
TEST_F(ThreadPoolExecutorTest, ShutdownAndScheduleRaceDoesNotCrash) {
@@ -162,7 +157,7 @@ TEST_F(ThreadPoolExecutorTest, ShutdownAndScheduleRaceDoesNotCrash) {
barrier.countDownAndWait();
MONGO_FAIL_POINT_PAUSE_WHILE_SET((*fpTPTE1));
executor.shutdown();
- joinExecutorThread();
+ executor.join();
ASSERT_OK(status1);
ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status2);
}