diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2020-03-02 15:39:02 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-03-02 21:14:40 +0000 |
commit | 82833b418676787209c7ce9fcc658470c478070e (patch) | |
tree | b9c831dbe1436e1d72a3432485626038fab685d8 /src/mongo/executor | |
parent | 83854e39d2904b5d4a6e4ee1f9b4fedaa4df85a5 (diff) | |
download | mongo-82833b418676787209c7ce9fcc658470c478070e.tar.gz |
SERVER-46395 Add joinAsync() method to ScopedTaskExecutor::Impl.
Introduces a joinAsync() method to the TaskExecutor interface to provide
a non-blocking way for callers to learn when the task executor has been
shut down and all outstanding callbacks have finished running.
Diffstat (limited to 'src/mongo/executor')
-rw-r--r-- | src/mongo/executor/scoped_task_executor.cpp | 24 | ||||
-rw-r--r-- | src/mongo/executor/scoped_task_executor.h | 2 | ||||
-rw-r--r-- | src/mongo/executor/scoped_task_executor_test.cpp | 8 | ||||
-rw-r--r-- | src/mongo/executor/task_executor.h | 6 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.cpp | 4 | ||||
-rw-r--r-- | src/mongo/executor/thread_pool_task_executor.h | 1 |
6 files changed, 38 insertions, 7 deletions
diff --git a/src/mongo/executor/scoped_task_executor.cpp b/src/mongo/executor/scoped_task_executor.cpp index 0a8fd531479..e4823682658 100644 --- a/src/mongo/executor/scoped_task_executor.cpp +++ b/src/mongo/executor/scoped_task_executor.cpp @@ -63,6 +63,12 @@ public: void shutdown() override { auto handles = [&] { stdx::lock_guard lk(_mutex); + if (!_inShutdown && _cbHandles.empty()) { + // We are guaranteed that no more callbacks can be added to _cbHandles after + // _inShutdown is set to true. If there aren't any callbacks outstanding, then it is + // shutdown()'s responsibility to make the futures returned by joinAll() ready. + _promise.setWith([] {}); + } _inShutdown = true; return _cbHandles; @@ -81,8 +87,11 @@ public: } void join() override { - stdx::unique_lock lk(_mutex); - _cv.wait(lk, [&] { return _inShutdown && _cbHandles.empty(); }); + joinAsync().wait(); + } + + SharedSemiFuture<void> joinAsync() override { + return _promise.getFuture(); } void appendDiagnosticBSON(BSONObjBuilder* b) const override { @@ -312,7 +321,10 @@ private: invariant(_cbHandles.erase(id) == 1); if (_inShutdown && _cbHandles.empty()) { - _cv.notify_all(); + // We are guaranteed that no more callbacks can be added to _cbHandles after _inShutdown + // is set to true. If there are no more callbacks outstanding, then it is the last + // callback's responsibility to make the futures returned by joinAll() ready. + _promise.setWith([] {}); } } @@ -322,9 +334,9 @@ private: size_t _id = 0; stdx::unordered_map<size_t, CallbackHandle> _cbHandles; - // condition variable that callers of join wait on and outstanding callbacks potentially - // notify - stdx::condition_variable _cv; + // Promise that is set when the executor has been shut down and there aren't any outstanding + // callbacks. Callers of joinAsync() extract futures from this promise. + SharedPromise<void> _promise; }; ScopedTaskExecutor::ScopedTaskExecutor(std::shared_ptr<TaskExecutor> executor) diff --git a/src/mongo/executor/scoped_task_executor.h b/src/mongo/executor/scoped_task_executor.h index 5b624e10f64..2e72ff3873b 100644 --- a/src/mongo/executor/scoped_task_executor.h +++ b/src/mongo/executor/scoped_task_executor.h @@ -85,7 +85,7 @@ public: explicit ScopedTaskExecutor(std::shared_ptr<TaskExecutor> executor); // Delete all move/copy-ability - ScopedTaskExecutor(TaskExecutor&&) = delete; + ScopedTaskExecutor(ScopedTaskExecutor&&) = delete; ~ScopedTaskExecutor(); diff --git a/src/mongo/executor/scoped_task_executor_test.cpp b/src/mongo/executor/scoped_task_executor_test.cpp index 49c3842f392..ad45884cef4 100644 --- a/src/mongo/executor/scoped_task_executor_test.cpp +++ b/src/mongo/executor/scoped_task_executor_test.cpp @@ -251,10 +251,12 @@ TEST_F(ScopedTaskExecutorTest, scheduleLoseRaceWithShutdown) { ASSERT_FALSE(resultPf.future.isReady()); + ASSERT_FALSE(getExecutor()->joinAsync().isReady()); getExecutor()->shutdown(); getNet()->exitNetwork(); ASSERT_EQUALS(resultPf.future.getNoThrow(), ErrorCodes::ShutdownInProgress); + ASSERT_TRUE(getExecutor()->joinAsync().isReady()); } // ScheduleRemoteCommand on the underlying, but are shut down when we execute our wrapping callback @@ -315,6 +317,12 @@ TEST_F(ScopedTaskExecutorTest, DestructionShutsDown) { ASSERT_EQUALS(pf.future.getNoThrow(), ErrorCodes::ShutdownInProgress); } +TEST_F(ScopedTaskExecutorTest, joinAllBecomesReadyOnShutdown) { + ASSERT_FALSE(getExecutor()->joinAsync().isReady()); + getExecutor()->shutdown(); + ASSERT_TRUE(getExecutor()->joinAsync().isReady()); +} + } // namespace } // namespace executor } // namespace mongo diff --git a/src/mongo/executor/task_executor.h b/src/mongo/executor/task_executor.h index d4c65e500b6..4f2dad79abe 100644 --- a/src/mongo/executor/task_executor.h +++ b/src/mongo/executor/task_executor.h @@ -142,6 +142,12 @@ public: virtual void join() = 0; /** + * Returns a future that becomes ready when shutdown() has been called and all outstanding + * callbacks have finished running. + */ + virtual SharedSemiFuture<void> joinAsync() = 0; + + /** * Writes diagnostic information into "b". */ virtual void appendDiagnosticBSON(BSONObjBuilder* b) const = 0; diff --git a/src/mongo/executor/thread_pool_task_executor.cpp b/src/mongo/executor/thread_pool_task_executor.cpp index 520a6b12e04..10a57699045 100644 --- a/src/mongo/executor/thread_pool_task_executor.cpp +++ b/src/mongo/executor/thread_pool_task_executor.cpp @@ -177,6 +177,10 @@ void ThreadPoolTaskExecutor::join() { _join(stdx::unique_lock<Latch>(_mutex)); } +SharedSemiFuture<void> ThreadPoolTaskExecutor::joinAsync() { + MONGO_UNREACHABLE; +} + stdx::unique_lock<Latch> ThreadPoolTaskExecutor::_join(stdx::unique_lock<Latch> lk) { _stateChange.wait(lk, [this] { // All non-exhaust tasks are spliced into the _poolInProgressQueue immediately after we diff --git a/src/mongo/executor/thread_pool_task_executor.h b/src/mongo/executor/thread_pool_task_executor.h index 16bf459f6f0..4fbd38b2ba6 100644 --- a/src/mongo/executor/thread_pool_task_executor.h +++ b/src/mongo/executor/thread_pool_task_executor.h @@ -72,6 +72,7 @@ public: void startup() override; void shutdown() override; void join() override; + SharedSemiFuture<void> joinAsync() override; void appendDiagnosticBSON(BSONObjBuilder* b) const override; Date_t now() override; StatusWith<EventHandle> makeEvent() override; |