summaryrefslogtreecommitdiff
path: root/src/mongo/executor
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2020-03-02 15:39:02 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-03-02 21:14:40 +0000
commit82833b418676787209c7ce9fcc658470c478070e (patch)
treeb9c831dbe1436e1d72a3432485626038fab685d8 /src/mongo/executor
parent83854e39d2904b5d4a6e4ee1f9b4fedaa4df85a5 (diff)
downloadmongo-82833b418676787209c7ce9fcc658470c478070e.tar.gz
SERVER-46395 Add joinAsync() method to ScopedTaskExecutor::Impl.
Introduces a joinAsync() method to the TaskExecutor interface to provide a non-blocking way for callers to learn when the task executor has been shut down and all outstanding callbacks have finished running.
Diffstat (limited to 'src/mongo/executor')
-rw-r--r--src/mongo/executor/scoped_task_executor.cpp24
-rw-r--r--src/mongo/executor/scoped_task_executor.h2
-rw-r--r--src/mongo/executor/scoped_task_executor_test.cpp8
-rw-r--r--src/mongo/executor/task_executor.h6
-rw-r--r--src/mongo/executor/thread_pool_task_executor.cpp4
-rw-r--r--src/mongo/executor/thread_pool_task_executor.h1
6 files changed, 38 insertions, 7 deletions
diff --git a/src/mongo/executor/scoped_task_executor.cpp b/src/mongo/executor/scoped_task_executor.cpp
index 0a8fd531479..e4823682658 100644
--- a/src/mongo/executor/scoped_task_executor.cpp
+++ b/src/mongo/executor/scoped_task_executor.cpp
@@ -63,6 +63,12 @@ public:
void shutdown() override {
auto handles = [&] {
stdx::lock_guard lk(_mutex);
+ if (!_inShutdown && _cbHandles.empty()) {
+ // We are guaranteed that no more callbacks can be added to _cbHandles after
+ // _inShutdown is set to true. If there aren't any callbacks outstanding, then it is
+ // shutdown()'s responsibility to make the futures returned by joinAll() ready.
+ _promise.setWith([] {});
+ }
_inShutdown = true;
return _cbHandles;
@@ -81,8 +87,11 @@ public:
}
void join() override {
- stdx::unique_lock lk(_mutex);
- _cv.wait(lk, [&] { return _inShutdown && _cbHandles.empty(); });
+ joinAsync().wait();
+ }
+
+ SharedSemiFuture<void> joinAsync() override {
+ return _promise.getFuture();
}
void appendDiagnosticBSON(BSONObjBuilder* b) const override {
@@ -312,7 +321,10 @@ private:
invariant(_cbHandles.erase(id) == 1);
if (_inShutdown && _cbHandles.empty()) {
- _cv.notify_all();
+ // We are guaranteed that no more callbacks can be added to _cbHandles after _inShutdown
+ // is set to true. If there are no more callbacks outstanding, then it is the last
+ // callback's responsibility to make the futures returned by joinAll() ready.
+ _promise.setWith([] {});
}
}
@@ -322,9 +334,9 @@ private:
size_t _id = 0;
stdx::unordered_map<size_t, CallbackHandle> _cbHandles;
- // condition variable that callers of join wait on and outstanding callbacks potentially
- // notify
- stdx::condition_variable _cv;
+ // Promise that is set when the executor has been shut down and there aren't any outstanding
+ // callbacks. Callers of joinAsync() extract futures from this promise.
+ SharedPromise<void> _promise;
};
ScopedTaskExecutor::ScopedTaskExecutor(std::shared_ptr<TaskExecutor> executor)
diff --git a/src/mongo/executor/scoped_task_executor.h b/src/mongo/executor/scoped_task_executor.h
index 5b624e10f64..2e72ff3873b 100644
--- a/src/mongo/executor/scoped_task_executor.h
+++ b/src/mongo/executor/scoped_task_executor.h
@@ -85,7 +85,7 @@ public:
explicit ScopedTaskExecutor(std::shared_ptr<TaskExecutor> executor);
// Delete all move/copy-ability
- ScopedTaskExecutor(TaskExecutor&&) = delete;
+ ScopedTaskExecutor(ScopedTaskExecutor&&) = delete;
~ScopedTaskExecutor();
diff --git a/src/mongo/executor/scoped_task_executor_test.cpp b/src/mongo/executor/scoped_task_executor_test.cpp
index 49c3842f392..ad45884cef4 100644
--- a/src/mongo/executor/scoped_task_executor_test.cpp
+++ b/src/mongo/executor/scoped_task_executor_test.cpp
@@ -251,10 +251,12 @@ TEST_F(ScopedTaskExecutorTest, scheduleLoseRaceWithShutdown) {
ASSERT_FALSE(resultPf.future.isReady());
+ ASSERT_FALSE(getExecutor()->joinAsync().isReady());
getExecutor()->shutdown();
getNet()->exitNetwork();
ASSERT_EQUALS(resultPf.future.getNoThrow(), ErrorCodes::ShutdownInProgress);
+ ASSERT_TRUE(getExecutor()->joinAsync().isReady());
}
// ScheduleRemoteCommand on the underlying, but are shut down when we execute our wrapping callback
@@ -315,6 +317,12 @@ TEST_F(ScopedTaskExecutorTest, DestructionShutsDown) {
ASSERT_EQUALS(pf.future.getNoThrow(), ErrorCodes::ShutdownInProgress);
}
+TEST_F(ScopedTaskExecutorTest, joinAllBecomesReadyOnShutdown) {
+ ASSERT_FALSE(getExecutor()->joinAsync().isReady());
+ getExecutor()->shutdown();
+ ASSERT_TRUE(getExecutor()->joinAsync().isReady());
+}
+
} // namespace
} // namespace executor
} // namespace mongo
diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h
index d4c65e500b6..4f2dad79abe 100644
--- a/src/mongo/executor/task_executor.h
+++ b/src/mongo/executor/task_executor.h
@@ -142,6 +142,12 @@ public:
virtual void join() = 0;
/**
+ * Returns a future that becomes ready when shutdown() has been called and all outstanding
+ * callbacks have finished running.
+ */
+ virtual SharedSemiFuture<void> joinAsync() = 0;
+
+ /**
* Writes diagnostic information into "b".
*/
virtual void appendDiagnosticBSON(BSONObjBuilder* b) const = 0;
diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp
index 520a6b12e04..10a57699045 100644
--- a/src/mongo/executor/thread_pool_task_executor.cpp
+++ b/src/mongo/executor/thread_pool_task_executor.cpp
@@ -177,6 +177,10 @@ void ThreadPoolTaskExecutor::join() {
_join(stdx::unique_lock<Latch>(_mutex));
}
+SharedSemiFuture<void> ThreadPoolTaskExecutor::joinAsync() {
+ MONGO_UNREACHABLE;
+}
+
stdx::unique_lock<Latch> ThreadPoolTaskExecutor::_join(stdx::unique_lock<Latch> lk) {
_stateChange.wait(lk, [this] {
// All non-exhaust tasks are spliced into the _poolInProgressQueue immediately after we
diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h
index 16bf459f6f0..4fbd38b2ba6 100644
--- a/src/mongo/executor/thread_pool_task_executor.h
+++ b/src/mongo/executor/thread_pool_task_executor.h
@@ -72,6 +72,7 @@ public:
void startup() override;
void shutdown() override;
void join() override;
+ SharedSemiFuture<void> joinAsync() override;
void appendDiagnosticBSON(BSONObjBuilder* b) const override;
Date_t now() override;
StatusWith<EventHandle> makeEvent() override;