summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2016-07-08 12:30:31 -0400
committerBenety Goh <benety@mongodb.com>2016-07-11 14:03:21 -0400
commit6dc25f703227ae486fdc8201ccfeb64e1db5c752 (patch)
treea85fa257840a80ffc1ad3a0b8fbeea6c828cf931 /src
parent1d0955a9289f3bbbfce1c5953ef83016a0e55855 (diff)
downloadmongo-6dc25f703227ae486fdc8201ccfeb64e1db5c752.tar.gz
SERVER-24955 added join() to TaskRunner
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/task_runner.cpp9
-rw-r--r--src/mongo/db/repl/task_runner.h5
-rw-r--r--src/mongo/db/repl/task_runner_test.cpp39
3 files changed, 50 insertions, 3 deletions
diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp
index a5bd1a5e852..8619db6c880 100644
--- a/src/mongo/db/repl/task_runner.cpp
+++ b/src/mongo/db/repl/task_runner.cpp
@@ -83,9 +83,7 @@ TaskRunner::TaskRunner(OldThreadPool* threadPool)
}
TaskRunner::~TaskRunner() {
- DESTRUCTOR_GUARD(UniqueLock lk(_mutex); if (!_active) { return; } _cancelRequested = true;
- _condition.notify_all();
- while (_active) { _condition.wait(lk); });
+ DESTRUCTOR_GUARD(cancel(); join(););
}
std::string TaskRunner::getDiagnosticString() const {
@@ -127,6 +125,11 @@ void TaskRunner::cancel() {
_condition.notify_all();
}
+void TaskRunner::join() {
+ UniqueLock lk(_mutex);
+ _condition.wait(lk, [this]() { return !_active; });
+}
+
void TaskRunner::_runTasks() {
Client* client = nullptr;
ServiceContext::UniqueOperationContext txn;
diff --git a/src/mongo/db/repl/task_runner.h b/src/mongo/db/repl/task_runner.h
index 2b201757ff1..a8908660c44 100644
--- a/src/mongo/db/repl/task_runner.h
+++ b/src/mongo/db/repl/task_runner.h
@@ -137,6 +137,11 @@ public:
*/
void cancel();
+ /**
+ * Waits for the task runner to become inactive.
+ */
+ void join();
+
private:
/**
* Runs tasks in a loop.
diff --git a/src/mongo/db/repl/task_runner_test.cpp b/src/mongo/db/repl/task_runner_test.cpp
index 0279955ab06..dedb6269083 100644
--- a/src/mongo/db/repl/task_runner_test.cpp
+++ b/src/mongo/db/repl/task_runner_test.cpp
@@ -279,6 +279,45 @@ TEST_F(TaskRunnerTest, Cancel) {
ASSERT_OK(status);
}
+TEST_F(TaskRunnerTest, JoinShouldWaitForTasksToComplete) {
+ unittest::Barrier barrier(2U);
+ stdx::mutex mutex;
+ Status status1 = getDetectableErrorStatus();
+ Status status2 = getDetectableErrorStatus();
+
+ // "task1" should start running before we invoke join() the task runner.
+ // Upon completion, "task1" requests the task runner to retain the operation context. This has
+ // effect of keeping the task runner active.
+ auto task1 = [&](OperationContext* theTxn, const Status& theStatus) {
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ barrier.countDownAndWait();
+ status1 = theStatus;
+ return TaskRunner::NextAction::kKeepOperationContext;
+ };
+
+ // "task2" should start running after we invoke join() the task runner.
+ // Upon completion, "task2" requests the task runner to dispose the operation context. After the
+ // operation context is destroyed, the task runner will go into an inactive state.
+ auto task2 = [&](OperationContext* theTxn, const Status& theStatus) {
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ status2 = theStatus;
+ return TaskRunner::NextAction::kDisposeOperationContext;
+ };
+
+ getTaskRunner().schedule(task1);
+ getTaskRunner().schedule(task2);
+ barrier.countDownAndWait();
+
+ // join() waits for "task1" and "task2" to complete execution.
+ getTaskRunner().join();
+
+ // This status should be OK because we ensured that the task
+ // was scheduled and invoked before we called cancel().
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ ASSERT_OK(status1);
+ ASSERT_OK(status2);
+}
+
TEST_F(TaskRunnerTest, DestroyShouldWaitForTasksToComplete) {
stdx::mutex mutex;
stdx::condition_variable condition;