summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/task_runner_test.cpp
diff options
context:
space:
mode:
authorMark Benvenuto <mark.benvenuto@mongodb.com>2015-06-20 00:22:50 -0400
committerMark Benvenuto <mark.benvenuto@mongodb.com>2015-06-20 10:56:02 -0400
commit9c2ed42daa8fbbef4a919c21ec564e2db55e8d60 (patch)
tree3814f79c10d7b490948d8cb7b112ac1dd41ceff1 /src/mongo/db/repl/task_runner_test.cpp
parent01965cf52bce6976637ecb8f4a622aeb05ab256a (diff)
downloadmongo-9c2ed42daa8fbbef4a919c21ec564e2db55e8d60.tar.gz
SERVER-18579: Clang-Format - reformat code, no comment reflow
Diffstat (limited to 'src/mongo/db/repl/task_runner_test.cpp')
-rw-r--r--src/mongo/db/repl/task_runner_test.cpp567
1 files changed, 283 insertions, 284 deletions
diff --git a/src/mongo/db/repl/task_runner_test.cpp b/src/mongo/db/repl/task_runner_test.cpp
index c1e4d10b731..3c6f9d29f45 100644
--- a/src/mongo/db/repl/task_runner_test.cpp
+++ b/src/mongo/db/repl/task_runner_test.cpp
@@ -40,312 +40,311 @@
namespace {
- using namespace mongo;
- using namespace mongo::repl;
-
- using Task = TaskRunner::Task;
-
- TEST_F(TaskRunnerTest, InvalidConstruction) {
- // Null thread pool.
- ASSERT_THROWS_CODE(TaskRunner(nullptr, []() -> OperationContext* { return nullptr; }),
- UserException,
- ErrorCodes::BadValue);
-
- // Null function for creating operation contexts.
- ASSERT_THROWS_CODE(TaskRunner(&getThreadPool(), TaskRunner::CreateOperationContextFn()),
- UserException,
- ErrorCodes::BadValue);
- }
-
- TEST_F(TaskRunnerTest, GetDiagnosticString) {
- ASSERT_FALSE(getTaskRunner().getDiagnosticString().empty());
- }
-
- TEST_F(TaskRunnerTest, CallbackValues) {
- stdx::mutex mutex;
- bool called = false;
- OperationContext* txn = nullptr;
- Status status = getDetectableErrorStatus();
- auto task = [&](OperationContext* theTxn, const Status& theStatus) {
- stdx::lock_guard<stdx::mutex> lk(mutex);
- called = true;
- txn = theTxn;
- status = theStatus;
- return TaskRunner::NextAction::kCancel;
- };
- getTaskRunner().schedule(task);
- getThreadPool().join();
- ASSERT_FALSE(getTaskRunner().isActive());
-
+using namespace mongo;
+using namespace mongo::repl;
+
+using Task = TaskRunner::Task;
+
+TEST_F(TaskRunnerTest, InvalidConstruction) {
+ // Null thread pool.
+ ASSERT_THROWS_CODE(TaskRunner(nullptr, []() -> OperationContext* { return nullptr; }),
+ UserException,
+ ErrorCodes::BadValue);
+
+ // Null function for creating operation contexts.
+ ASSERT_THROWS_CODE(TaskRunner(&getThreadPool(), TaskRunner::CreateOperationContextFn()),
+ UserException,
+ ErrorCodes::BadValue);
+}
+
+TEST_F(TaskRunnerTest, GetDiagnosticString) {
+ ASSERT_FALSE(getTaskRunner().getDiagnosticString().empty());
+}
+
+TEST_F(TaskRunnerTest, CallbackValues) {
+ stdx::mutex mutex;
+ bool called = false;
+ OperationContext* txn = nullptr;
+ Status status = getDetectableErrorStatus();
+ auto task = [&](OperationContext* theTxn, const Status& theStatus) {
stdx::lock_guard<stdx::mutex> lk(mutex);
- ASSERT_TRUE(called);
- ASSERT(txn);
- ASSERT_OK(status);
- }
-
- TEST_F(TaskRunnerTest, OperationContextFactoryReturnsNull) {
- resetTaskRunner(new TaskRunner(&getThreadPool(), []() -> OperationContext* {
- return nullptr;
- }));
- stdx::mutex mutex;
- bool called = false;
- OperationContextNoop opCtxNoop;
- OperationContext* txn = &opCtxNoop;
- Status status = getDetectableErrorStatus();
- auto task = [&](OperationContext* theTxn, const Status& theStatus) {
- stdx::lock_guard<stdx::mutex> lk(mutex);
- called = true;
- txn = theTxn;
- status = theStatus;
- return TaskRunner::NextAction::kCancel;
- };
- getTaskRunner().schedule(task);
- getThreadPool().join();
- ASSERT_FALSE(getTaskRunner().isActive());
-
+ called = true;
+ txn = theTxn;
+ status = theStatus;
+ return TaskRunner::NextAction::kCancel;
+ };
+ getTaskRunner().schedule(task);
+ getThreadPool().join();
+ ASSERT_FALSE(getTaskRunner().isActive());
+
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ ASSERT_TRUE(called);
+ ASSERT(txn);
+ ASSERT_OK(status);
+}
+
+TEST_F(TaskRunnerTest, OperationContextFactoryReturnsNull) {
+ resetTaskRunner(
+ new TaskRunner(&getThreadPool(), []() -> OperationContext* { return nullptr; }));
+ stdx::mutex mutex;
+ bool called = false;
+ OperationContextNoop opCtxNoop;
+ OperationContext* txn = &opCtxNoop;
+ Status status = getDetectableErrorStatus();
+ auto task = [&](OperationContext* theTxn, const Status& theStatus) {
stdx::lock_guard<stdx::mutex> lk(mutex);
- ASSERT_TRUE(called);
- ASSERT_FALSE(txn);
- ASSERT_OK(status);
- }
-
- std::vector<int> _testRunTaskTwice(TaskRunnerTest& test,
- TaskRunner::NextAction nextAction,
- stdx::function<void(const Task& task)> schedule) {
- unittest::Barrier barrier(2U);
- stdx::mutex mutex;
- int i = 0;
- OperationContext* txn[2] = {nullptr, nullptr};
- int txnId[2] = {-100, -100};
- auto task = [&](OperationContext* theTxn, const Status& theStatus) {
- stdx::lock_guard<stdx::mutex> lk(mutex);
- int j = i++;
- if (j >= 2) {
- return TaskRunner::NextAction::kInvalid;
- }
- txn[j] = theTxn;
- txnId[j] = TaskRunnerTest::getOperationContextId(txn[j]);
- TaskRunner::NextAction result = j == 0 ? nextAction : TaskRunner::NextAction::kCancel;
- barrier.countDownAndWait();
- return result;
- };
-
- schedule(task);
- ASSERT_TRUE(test.getTaskRunner().isActive());
- barrier.countDownAndWait();
-
- schedule(task);
- ASSERT_TRUE(test.getTaskRunner().isActive());
+ called = true;
+ txn = theTxn;
+ status = theStatus;
+ return TaskRunner::NextAction::kCancel;
+ };
+ getTaskRunner().schedule(task);
+ getThreadPool().join();
+ ASSERT_FALSE(getTaskRunner().isActive());
+
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ ASSERT_TRUE(called);
+ ASSERT_FALSE(txn);
+ ASSERT_OK(status);
+}
+
+std::vector<int> _testRunTaskTwice(TaskRunnerTest& test,
+ TaskRunner::NextAction nextAction,
+ stdx::function<void(const Task& task)> schedule) {
+ unittest::Barrier barrier(2U);
+ stdx::mutex mutex;
+ int i = 0;
+ OperationContext* txn[2] = {nullptr, nullptr};
+ int txnId[2] = {-100, -100};
+ auto task = [&](OperationContext* theTxn, const Status& theStatus) {
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ int j = i++;
+ if (j >= 2) {
+ return TaskRunner::NextAction::kInvalid;
+ }
+ txn[j] = theTxn;
+ txnId[j] = TaskRunnerTest::getOperationContextId(txn[j]);
+ TaskRunner::NextAction result = j == 0 ? nextAction : TaskRunner::NextAction::kCancel;
barrier.countDownAndWait();
+ return result;
+ };
+
+ schedule(task);
+ ASSERT_TRUE(test.getTaskRunner().isActive());
+ barrier.countDownAndWait();
+
+ schedule(task);
+ ASSERT_TRUE(test.getTaskRunner().isActive());
+ barrier.countDownAndWait();
+
+ test.getThreadPool().join();
+ ASSERT_FALSE(test.getTaskRunner().isActive());
+
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ ASSERT_EQUALS(2, i);
+ ASSERT(txn[0]);
+ ASSERT(txn[1]);
+ ASSERT_NOT_LESS_THAN(txnId[0], 0);
+ ASSERT_NOT_LESS_THAN(txnId[1], 0);
+ return {txnId[0], txnId[1]};
+}
+
+std::vector<int> _testRunTaskTwice(TaskRunnerTest& test, TaskRunner::NextAction nextAction) {
+ auto schedule = [&](const Task& task) { test.getTaskRunner().schedule(task); };
+ return _testRunTaskTwice(test, nextAction, schedule);
+}
+
+TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContext) {
+ std::vector<int> txnId =
+ _testRunTaskTwice(*this, TaskRunner::NextAction::kDisposeOperationContext);
+ ASSERT_NOT_EQUALS(txnId[0], txnId[1]);
+}
+
+// Joining thread pool before scheduling first task has no effect.
+// Joining thread pool before scheduling second task ensures that task runner releases
+// thread back to pool after disposing of operation context.
+TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContextJoinThreadPoolBeforeScheduling) {
+ auto schedule = [this](const Task& task) {
+ getThreadPool().join();
+ getTaskRunner().schedule(task);
+ };
+ std::vector<int> txnId =
+ _testRunTaskTwice(*this, TaskRunner::NextAction::kDisposeOperationContext, schedule);
+ ASSERT_NOT_EQUALS(txnId[0], txnId[1]);
+}
+
+TEST_F(TaskRunnerTest, RunTaskTwiceKeepOperationContext) {
+ std::vector<int> txnId =
+ _testRunTaskTwice(*this, TaskRunner::NextAction::kKeepOperationContext);
+ ASSERT_EQUALS(txnId[0], txnId[1]);
+}
+
+TEST_F(TaskRunnerTest, SkipSecondTask) {
+ stdx::mutex mutex;
+ int i = 0;
+ OperationContext* txn[2] = {nullptr, nullptr};
+ Status status[2] = {getDetectableErrorStatus(), getDetectableErrorStatus()};
+ stdx::condition_variable condition;
+ bool schedulingDone = false;
+ auto task = [&](OperationContext* theTxn, const Status& theStatus) {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ int j = i++;
+ if (j >= 2) {
+ return TaskRunner::NextAction::kCancel;
+ }
+ txn[j] = theTxn;
+ status[j] = theStatus;
- test.getThreadPool().join();
- ASSERT_FALSE(test.getTaskRunner().isActive());
+ // Wait for the test code to schedule the second task.
+ while (!schedulingDone) {
+ condition.wait(lk);
+ }
+ return TaskRunner::NextAction::kCancel;
+ };
+ getTaskRunner().schedule(task);
+ ASSERT_TRUE(getTaskRunner().isActive());
+ getTaskRunner().schedule(task);
+ {
stdx::lock_guard<stdx::mutex> lk(mutex);
- ASSERT_EQUALS(2, i);
- ASSERT(txn[0]);
- ASSERT(txn[1]);
- ASSERT_NOT_LESS_THAN(txnId[0], 0);
- ASSERT_NOT_LESS_THAN(txnId[1], 0);
- return {txnId[0], txnId[1]};
+ schedulingDone = true;
+ condition.notify_all();
}
-
- std::vector<int> _testRunTaskTwice(TaskRunnerTest& test, TaskRunner::NextAction nextAction) {
- auto schedule = [&](const Task& task) { test.getTaskRunner().schedule(task); };
- return _testRunTaskTwice(test, nextAction, schedule);
- }
-
- TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContext) {
- std::vector<int> txnId =
- _testRunTaskTwice(*this, TaskRunner::NextAction::kDisposeOperationContext);
- ASSERT_NOT_EQUALS(txnId[0], txnId[1]);
- }
-
- // Joining thread pool before scheduling first task has no effect.
- // Joining thread pool before scheduling second task ensures that task runner releases
- // thread back to pool after disposing of operation context.
- TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContextJoinThreadPoolBeforeScheduling) {
- auto schedule = [this](const Task& task) {
- getThreadPool().join();
- getTaskRunner().schedule(task);
- };
- std::vector<int> txnId =
- _testRunTaskTwice(*this, TaskRunner::NextAction::kDisposeOperationContext, schedule);
- ASSERT_NOT_EQUALS(txnId[0], txnId[1]);
- }
-
- TEST_F(TaskRunnerTest, RunTaskTwiceKeepOperationContext) {
- std::vector<int> txnId =
- _testRunTaskTwice(*this, TaskRunner::NextAction::kKeepOperationContext);
- ASSERT_EQUALS(txnId[0], txnId[1]);
- }
-
- TEST_F(TaskRunnerTest, SkipSecondTask) {
- stdx::mutex mutex;
- int i = 0;
- OperationContext* txn[2] = {nullptr, nullptr};
- Status status[2] = {getDetectableErrorStatus(), getDetectableErrorStatus()};
- stdx::condition_variable condition;
- bool schedulingDone = false;
- auto task = [&](OperationContext* theTxn, const Status& theStatus) {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- int j = i++;
- if (j >= 2) {
- return TaskRunner::NextAction::kCancel;
- }
- txn[j] = theTxn;
- status[j] = theStatus;
-
- // Wait for the test code to schedule the second task.
- while (!schedulingDone) {
- condition.wait(lk);
- }
-
+ getThreadPool().join();
+ ASSERT_FALSE(getTaskRunner().isActive());
+
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ ASSERT_EQUALS(2, i);
+ ASSERT(txn[0]);
+ ASSERT_OK(status[0]);
+ ASSERT_FALSE(txn[1]);
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status[1].code());
+}
+
+TEST_F(TaskRunnerTest, FirstTaskThrowsException) {
+ stdx::mutex mutex;
+ int i = 0;
+ OperationContext* txn[2] = {nullptr, nullptr};
+ Status status[2] = {getDetectableErrorStatus(), getDetectableErrorStatus()};
+ stdx::condition_variable condition;
+ bool schedulingDone = false;
+ auto task = [&](OperationContext* theTxn, const Status& theStatus) {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ int j = i++;
+ if (j >= 2) {
return TaskRunner::NextAction::kCancel;
- };
- getTaskRunner().schedule(task);
- ASSERT_TRUE(getTaskRunner().isActive());
- getTaskRunner().schedule(task);
- {
- stdx::lock_guard<stdx::mutex> lk(mutex);
- schedulingDone = true;
- condition.notify_all();
}
- getThreadPool().join();
- ASSERT_FALSE(getTaskRunner().isActive());
+ txn[j] = theTxn;
+ status[j] = theStatus;
- stdx::lock_guard<stdx::mutex> lk(mutex);
- ASSERT_EQUALS(2, i);
- ASSERT(txn[0]);
- ASSERT_OK(status[0]);
- ASSERT_FALSE(txn[1]);
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status[1].code());
- }
-
- TEST_F(TaskRunnerTest, FirstTaskThrowsException) {
- stdx::mutex mutex;
- int i = 0;
- OperationContext* txn[2] = {nullptr, nullptr};
- Status status[2] = {getDetectableErrorStatus(), getDetectableErrorStatus()};
- stdx::condition_variable condition;
- bool schedulingDone = false;
- auto task = [&](OperationContext* theTxn, const Status& theStatus) {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- int j = i++;
- if (j >= 2) {
- return TaskRunner::NextAction::kCancel;
- }
- txn[j] = theTxn;
- status[j] = theStatus;
-
- // Wait for the test code to schedule the second task.
- while (!schedulingDone) {
- condition.wait(lk);
- }
-
- // Throwing an exception from the first task should cancel
- // unscheduled tasks and make the task runner inactive.
- // When the second (canceled) task throws an exception, it should be ignored.
- uassert(ErrorCodes::OperationFailed, "task failure", false);
-
- // not reached.
- invariant(false);
- return TaskRunner::NextAction::kKeepOperationContext;
- };
- getTaskRunner().schedule(task);
- ASSERT_TRUE(getTaskRunner().isActive());
- getTaskRunner().schedule(task);
- {
- stdx::lock_guard<stdx::mutex> lk(mutex);
- schedulingDone = true;
- condition.notify_all();
+ // Wait for the test code to schedule the second task.
+ while (!schedulingDone) {
+ condition.wait(lk);
}
- getThreadPool().join();
- ASSERT_FALSE(getTaskRunner().isActive());
+ // Throwing an exception from the first task should cancel
+ // unscheduled tasks and make the task runner inactive.
+ // When the second (canceled) task throws an exception, it should be ignored.
+ uassert(ErrorCodes::OperationFailed, "task failure", false);
+
+ // not reached.
+ invariant(false);
+ return TaskRunner::NextAction::kKeepOperationContext;
+ };
+ getTaskRunner().schedule(task);
+ ASSERT_TRUE(getTaskRunner().isActive());
+ getTaskRunner().schedule(task);
+ {
stdx::lock_guard<stdx::mutex> lk(mutex);
- ASSERT_EQUALS(2, i);
- ASSERT(txn[0]);
- ASSERT_OK(status[0]);
- ASSERT_FALSE(txn[1]);
- ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status[1].code());
+ schedulingDone = true;
+ condition.notify_all();
}
-
- TEST_F(TaskRunnerTest, Cancel) {
- stdx::mutex mutex;
- stdx::condition_variable condition;
- Status status = getDetectableErrorStatus();
- bool taskRunning = false;
-
- // Running this task causes the task runner to wait for another task that
- // is never scheduled.
- auto task = [&](OperationContext* theTxn, const Status& theStatus) {
- stdx::lock_guard<stdx::mutex> lk(mutex);
- status = theStatus;
- taskRunning = true;
- condition.notify_all();
- return TaskRunner::NextAction::kKeepOperationContext;
- };
-
- // Calling cancel() before schedule() has no effect.
- // The task should still be invoked with a successful status.
- getTaskRunner().cancel();
-
- getTaskRunner().schedule(task);
- ASSERT_TRUE(getTaskRunner().isActive());
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- while (!taskRunning) {
- condition.wait(lk);
- }
+ getThreadPool().join();
+ ASSERT_FALSE(getTaskRunner().isActive());
+
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ ASSERT_EQUALS(2, i);
+ ASSERT(txn[0]);
+ ASSERT_OK(status[0]);
+ ASSERT_FALSE(txn[1]);
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status[1].code());
+}
+
+TEST_F(TaskRunnerTest, Cancel) {
+ stdx::mutex mutex;
+ stdx::condition_variable condition;
+ Status status = getDetectableErrorStatus();
+ bool taskRunning = false;
+
+ // Running this task causes the task runner to wait for another task that
+ // is never scheduled.
+ auto task = [&](OperationContext* theTxn, const Status& theStatus) {
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ status = theStatus;
+ taskRunning = true;
+ condition.notify_all();
+ return TaskRunner::NextAction::kKeepOperationContext;
+ };
+
+ // Calling cancel() before schedule() has no effect.
+ // The task should still be invoked with a successful status.
+ getTaskRunner().cancel();
+
+ getTaskRunner().schedule(task);
+ ASSERT_TRUE(getTaskRunner().isActive());
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ while (!taskRunning) {
+ condition.wait(lk);
}
+ }
- // It is fine to call cancel() multiple times.
- getTaskRunner().cancel();
- getTaskRunner().cancel();
+ // It is fine to call cancel() multiple times.
+ getTaskRunner().cancel();
+ getTaskRunner().cancel();
- getThreadPool().join();
- ASSERT_FALSE(getTaskRunner().isActive());
+ getThreadPool().join();
+ ASSERT_FALSE(getTaskRunner().isActive());
- // This status will not be OK if canceling the task runner
- // before scheduling the task results in the task being canceled.
- stdx::lock_guard<stdx::mutex> lk(mutex);
- ASSERT_OK(status);
- }
+ // This status will not be OK if canceling the task runner
+ // before scheduling the task results in the task being canceled.
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ ASSERT_OK(status);
+}
- TEST_F(TaskRunnerTest, DestroyShouldWaitForTasksToComplete) {
- stdx::mutex mutex;
- stdx::condition_variable condition;
- Status status = getDetectableErrorStatus();
- bool taskRunning = false;
-
- // Running this task causes the task runner to wait for another task that
- // is never scheduled.
- auto task = [&](OperationContext* theTxn, const Status& theStatus) {
- stdx::lock_guard<stdx::mutex> lk(mutex);
- status = theStatus;
- taskRunning = true;
- condition.notify_all();
- return TaskRunner::NextAction::kKeepOperationContext;
- };
+TEST_F(TaskRunnerTest, DestroyShouldWaitForTasksToComplete) {
+ stdx::mutex mutex;
+ stdx::condition_variable condition;
+ Status status = getDetectableErrorStatus();
+ bool taskRunning = false;
- getTaskRunner().schedule(task);
- ASSERT_TRUE(getTaskRunner().isActive());
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- while (!taskRunning) {
- condition.wait(lk);
- }
+ // Running this task causes the task runner to wait for another task that
+ // is never scheduled.
+ auto task = [&](OperationContext* theTxn, const Status& theStatus) {
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ status = theStatus;
+ taskRunning = true;
+ condition.notify_all();
+ return TaskRunner::NextAction::kKeepOperationContext;
+ };
+
+ getTaskRunner().schedule(task);
+ ASSERT_TRUE(getTaskRunner().isActive());
+ {
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+ while (!taskRunning) {
+ condition.wait(lk);
}
+ }
- destroyTaskRunner();
+ destroyTaskRunner();
- getThreadPool().join();
+ getThreadPool().join();
- // This status will not be OK if canceling the task runner
- // before scheduling the task results in the task being canceled.
- stdx::lock_guard<stdx::mutex> lk(mutex);
- ASSERT_OK(status);
- }
+ // This status will not be OK if canceling the task runner
+ // before scheduling the task results in the task being canceled.
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ ASSERT_OK(status);
+}
-} // namespace
+} // namespace