diff options
Diffstat (limited to 'src/mongo/db/repl/task_runner_test.cpp')
-rw-r--r-- | src/mongo/db/repl/task_runner_test.cpp | 567 |
1 files changed, 283 insertions, 284 deletions
diff --git a/src/mongo/db/repl/task_runner_test.cpp b/src/mongo/db/repl/task_runner_test.cpp index c1e4d10b731..3c6f9d29f45 100644 --- a/src/mongo/db/repl/task_runner_test.cpp +++ b/src/mongo/db/repl/task_runner_test.cpp @@ -40,312 +40,311 @@ namespace { - using namespace mongo; - using namespace mongo::repl; - - using Task = TaskRunner::Task; - - TEST_F(TaskRunnerTest, InvalidConstruction) { - // Null thread pool. - ASSERT_THROWS_CODE(TaskRunner(nullptr, []() -> OperationContext* { return nullptr; }), - UserException, - ErrorCodes::BadValue); - - // Null function for creating operation contexts. - ASSERT_THROWS_CODE(TaskRunner(&getThreadPool(), TaskRunner::CreateOperationContextFn()), - UserException, - ErrorCodes::BadValue); - } - - TEST_F(TaskRunnerTest, GetDiagnosticString) { - ASSERT_FALSE(getTaskRunner().getDiagnosticString().empty()); - } - - TEST_F(TaskRunnerTest, CallbackValues) { - stdx::mutex mutex; - bool called = false; - OperationContext* txn = nullptr; - Status status = getDetectableErrorStatus(); - auto task = [&](OperationContext* theTxn, const Status& theStatus) { - stdx::lock_guard<stdx::mutex> lk(mutex); - called = true; - txn = theTxn; - status = theStatus; - return TaskRunner::NextAction::kCancel; - }; - getTaskRunner().schedule(task); - getThreadPool().join(); - ASSERT_FALSE(getTaskRunner().isActive()); - +using namespace mongo; +using namespace mongo::repl; + +using Task = TaskRunner::Task; + +TEST_F(TaskRunnerTest, InvalidConstruction) { + // Null thread pool. + ASSERT_THROWS_CODE(TaskRunner(nullptr, []() -> OperationContext* { return nullptr; }), + UserException, + ErrorCodes::BadValue); + + // Null function for creating operation contexts. + ASSERT_THROWS_CODE(TaskRunner(&getThreadPool(), TaskRunner::CreateOperationContextFn()), + UserException, + ErrorCodes::BadValue); +} + +TEST_F(TaskRunnerTest, GetDiagnosticString) { + ASSERT_FALSE(getTaskRunner().getDiagnosticString().empty()); +} + +TEST_F(TaskRunnerTest, CallbackValues) { + stdx::mutex mutex; + bool called = false; + OperationContext* txn = nullptr; + Status status = getDetectableErrorStatus(); + auto task = [&](OperationContext* theTxn, const Status& theStatus) { stdx::lock_guard<stdx::mutex> lk(mutex); - ASSERT_TRUE(called); - ASSERT(txn); - ASSERT_OK(status); - } - - TEST_F(TaskRunnerTest, OperationContextFactoryReturnsNull) { - resetTaskRunner(new TaskRunner(&getThreadPool(), []() -> OperationContext* { - return nullptr; - })); - stdx::mutex mutex; - bool called = false; - OperationContextNoop opCtxNoop; - OperationContext* txn = &opCtxNoop; - Status status = getDetectableErrorStatus(); - auto task = [&](OperationContext* theTxn, const Status& theStatus) { - stdx::lock_guard<stdx::mutex> lk(mutex); - called = true; - txn = theTxn; - status = theStatus; - return TaskRunner::NextAction::kCancel; - }; - getTaskRunner().schedule(task); - getThreadPool().join(); - ASSERT_FALSE(getTaskRunner().isActive()); - + called = true; + txn = theTxn; + status = theStatus; + return TaskRunner::NextAction::kCancel; + }; + getTaskRunner().schedule(task); + getThreadPool().join(); + ASSERT_FALSE(getTaskRunner().isActive()); + + stdx::lock_guard<stdx::mutex> lk(mutex); + ASSERT_TRUE(called); + ASSERT(txn); + ASSERT_OK(status); +} + +TEST_F(TaskRunnerTest, OperationContextFactoryReturnsNull) { + resetTaskRunner( + new TaskRunner(&getThreadPool(), []() -> OperationContext* { return nullptr; })); + stdx::mutex mutex; + bool called = false; + OperationContextNoop opCtxNoop; + OperationContext* txn = &opCtxNoop; + Status status = getDetectableErrorStatus(); + auto task = [&](OperationContext* theTxn, const Status& theStatus) { stdx::lock_guard<stdx::mutex> lk(mutex); - ASSERT_TRUE(called); - ASSERT_FALSE(txn); - ASSERT_OK(status); - } - - std::vector<int> _testRunTaskTwice(TaskRunnerTest& test, - TaskRunner::NextAction nextAction, - stdx::function<void(const Task& task)> schedule) { - unittest::Barrier barrier(2U); - stdx::mutex mutex; - int i = 0; - OperationContext* txn[2] = {nullptr, nullptr}; - int txnId[2] = {-100, -100}; - auto task = [&](OperationContext* theTxn, const Status& theStatus) { - stdx::lock_guard<stdx::mutex> lk(mutex); - int j = i++; - if (j >= 2) { - return TaskRunner::NextAction::kInvalid; - } - txn[j] = theTxn; - txnId[j] = TaskRunnerTest::getOperationContextId(txn[j]); - TaskRunner::NextAction result = j == 0 ? nextAction : TaskRunner::NextAction::kCancel; - barrier.countDownAndWait(); - return result; - }; - - schedule(task); - ASSERT_TRUE(test.getTaskRunner().isActive()); - barrier.countDownAndWait(); - - schedule(task); - ASSERT_TRUE(test.getTaskRunner().isActive()); + called = true; + txn = theTxn; + status = theStatus; + return TaskRunner::NextAction::kCancel; + }; + getTaskRunner().schedule(task); + getThreadPool().join(); + ASSERT_FALSE(getTaskRunner().isActive()); + + stdx::lock_guard<stdx::mutex> lk(mutex); + ASSERT_TRUE(called); + ASSERT_FALSE(txn); + ASSERT_OK(status); +} + +std::vector<int> _testRunTaskTwice(TaskRunnerTest& test, + TaskRunner::NextAction nextAction, + stdx::function<void(const Task& task)> schedule) { + unittest::Barrier barrier(2U); + stdx::mutex mutex; + int i = 0; + OperationContext* txn[2] = {nullptr, nullptr}; + int txnId[2] = {-100, -100}; + auto task = [&](OperationContext* theTxn, const Status& theStatus) { + stdx::lock_guard<stdx::mutex> lk(mutex); + int j = i++; + if (j >= 2) { + return TaskRunner::NextAction::kInvalid; + } + txn[j] = theTxn; + txnId[j] = TaskRunnerTest::getOperationContextId(txn[j]); + TaskRunner::NextAction result = j == 0 ? nextAction : TaskRunner::NextAction::kCancel; barrier.countDownAndWait(); + return result; + }; + + schedule(task); + ASSERT_TRUE(test.getTaskRunner().isActive()); + barrier.countDownAndWait(); + + schedule(task); + ASSERT_TRUE(test.getTaskRunner().isActive()); + barrier.countDownAndWait(); + + test.getThreadPool().join(); + ASSERT_FALSE(test.getTaskRunner().isActive()); + + stdx::lock_guard<stdx::mutex> lk(mutex); + ASSERT_EQUALS(2, i); + ASSERT(txn[0]); + ASSERT(txn[1]); + ASSERT_NOT_LESS_THAN(txnId[0], 0); + ASSERT_NOT_LESS_THAN(txnId[1], 0); + return {txnId[0], txnId[1]}; +} + +std::vector<int> _testRunTaskTwice(TaskRunnerTest& test, TaskRunner::NextAction nextAction) { + auto schedule = [&](const Task& task) { test.getTaskRunner().schedule(task); }; + return _testRunTaskTwice(test, nextAction, schedule); +} + +TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContext) { + std::vector<int> txnId = + _testRunTaskTwice(*this, TaskRunner::NextAction::kDisposeOperationContext); + ASSERT_NOT_EQUALS(txnId[0], txnId[1]); +} + +// Joining thread pool before scheduling first task has no effect. +// Joining thread pool before scheduling second task ensures that task runner releases +// thread back to pool after disposing of operation context. +TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContextJoinThreadPoolBeforeScheduling) { + auto schedule = [this](const Task& task) { + getThreadPool().join(); + getTaskRunner().schedule(task); + }; + std::vector<int> txnId = + _testRunTaskTwice(*this, TaskRunner::NextAction::kDisposeOperationContext, schedule); + ASSERT_NOT_EQUALS(txnId[0], txnId[1]); +} + +TEST_F(TaskRunnerTest, RunTaskTwiceKeepOperationContext) { + std::vector<int> txnId = + _testRunTaskTwice(*this, TaskRunner::NextAction::kKeepOperationContext); + ASSERT_EQUALS(txnId[0], txnId[1]); +} + +TEST_F(TaskRunnerTest, SkipSecondTask) { + stdx::mutex mutex; + int i = 0; + OperationContext* txn[2] = {nullptr, nullptr}; + Status status[2] = {getDetectableErrorStatus(), getDetectableErrorStatus()}; + stdx::condition_variable condition; + bool schedulingDone = false; + auto task = [&](OperationContext* theTxn, const Status& theStatus) { + stdx::unique_lock<stdx::mutex> lk(mutex); + int j = i++; + if (j >= 2) { + return TaskRunner::NextAction::kCancel; + } + txn[j] = theTxn; + status[j] = theStatus; - test.getThreadPool().join(); - ASSERT_FALSE(test.getTaskRunner().isActive()); + // Wait for the test code to schedule the second task. + while (!schedulingDone) { + condition.wait(lk); + } + return TaskRunner::NextAction::kCancel; + }; + getTaskRunner().schedule(task); + ASSERT_TRUE(getTaskRunner().isActive()); + getTaskRunner().schedule(task); + { stdx::lock_guard<stdx::mutex> lk(mutex); - ASSERT_EQUALS(2, i); - ASSERT(txn[0]); - ASSERT(txn[1]); - ASSERT_NOT_LESS_THAN(txnId[0], 0); - ASSERT_NOT_LESS_THAN(txnId[1], 0); - return {txnId[0], txnId[1]}; + schedulingDone = true; + condition.notify_all(); } - - std::vector<int> _testRunTaskTwice(TaskRunnerTest& test, TaskRunner::NextAction nextAction) { - auto schedule = [&](const Task& task) { test.getTaskRunner().schedule(task); }; - return _testRunTaskTwice(test, nextAction, schedule); - } - - TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContext) { - std::vector<int> txnId = - _testRunTaskTwice(*this, TaskRunner::NextAction::kDisposeOperationContext); - ASSERT_NOT_EQUALS(txnId[0], txnId[1]); - } - - // Joining thread pool before scheduling first task has no effect. - // Joining thread pool before scheduling second task ensures that task runner releases - // thread back to pool after disposing of operation context. - TEST_F(TaskRunnerTest, RunTaskTwiceDisposeOperationContextJoinThreadPoolBeforeScheduling) { - auto schedule = [this](const Task& task) { - getThreadPool().join(); - getTaskRunner().schedule(task); - }; - std::vector<int> txnId = - _testRunTaskTwice(*this, TaskRunner::NextAction::kDisposeOperationContext, schedule); - ASSERT_NOT_EQUALS(txnId[0], txnId[1]); - } - - TEST_F(TaskRunnerTest, RunTaskTwiceKeepOperationContext) { - std::vector<int> txnId = - _testRunTaskTwice(*this, TaskRunner::NextAction::kKeepOperationContext); - ASSERT_EQUALS(txnId[0], txnId[1]); - } - - TEST_F(TaskRunnerTest, SkipSecondTask) { - stdx::mutex mutex; - int i = 0; - OperationContext* txn[2] = {nullptr, nullptr}; - Status status[2] = {getDetectableErrorStatus(), getDetectableErrorStatus()}; - stdx::condition_variable condition; - bool schedulingDone = false; - auto task = [&](OperationContext* theTxn, const Status& theStatus) { - stdx::unique_lock<stdx::mutex> lk(mutex); - int j = i++; - if (j >= 2) { - return TaskRunner::NextAction::kCancel; - } - txn[j] = theTxn; - status[j] = theStatus; - - // Wait for the test code to schedule the second task. - while (!schedulingDone) { - condition.wait(lk); - } - + getThreadPool().join(); + ASSERT_FALSE(getTaskRunner().isActive()); + + stdx::lock_guard<stdx::mutex> lk(mutex); + ASSERT_EQUALS(2, i); + ASSERT(txn[0]); + ASSERT_OK(status[0]); + ASSERT_FALSE(txn[1]); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status[1].code()); +} + +TEST_F(TaskRunnerTest, FirstTaskThrowsException) { + stdx::mutex mutex; + int i = 0; + OperationContext* txn[2] = {nullptr, nullptr}; + Status status[2] = {getDetectableErrorStatus(), getDetectableErrorStatus()}; + stdx::condition_variable condition; + bool schedulingDone = false; + auto task = [&](OperationContext* theTxn, const Status& theStatus) { + stdx::unique_lock<stdx::mutex> lk(mutex); + int j = i++; + if (j >= 2) { return TaskRunner::NextAction::kCancel; - }; - getTaskRunner().schedule(task); - ASSERT_TRUE(getTaskRunner().isActive()); - getTaskRunner().schedule(task); - { - stdx::lock_guard<stdx::mutex> lk(mutex); - schedulingDone = true; - condition.notify_all(); } - getThreadPool().join(); - ASSERT_FALSE(getTaskRunner().isActive()); + txn[j] = theTxn; + status[j] = theStatus; - stdx::lock_guard<stdx::mutex> lk(mutex); - ASSERT_EQUALS(2, i); - ASSERT(txn[0]); - ASSERT_OK(status[0]); - ASSERT_FALSE(txn[1]); - ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status[1].code()); - } - - TEST_F(TaskRunnerTest, FirstTaskThrowsException) { - stdx::mutex mutex; - int i = 0; - OperationContext* txn[2] = {nullptr, nullptr}; - Status status[2] = {getDetectableErrorStatus(), getDetectableErrorStatus()}; - stdx::condition_variable condition; - bool schedulingDone = false; - auto task = [&](OperationContext* theTxn, const Status& theStatus) { - stdx::unique_lock<stdx::mutex> lk(mutex); - int j = i++; - if (j >= 2) { - return TaskRunner::NextAction::kCancel; - } - txn[j] = theTxn; - status[j] = theStatus; - - // Wait for the test code to schedule the second task. - while (!schedulingDone) { - condition.wait(lk); - } - - // Throwing an exception from the first task should cancel - // unscheduled tasks and make the task runner inactive. - // When the second (canceled) task throws an exception, it should be ignored. - uassert(ErrorCodes::OperationFailed, "task failure", false); - - // not reached. - invariant(false); - return TaskRunner::NextAction::kKeepOperationContext; - }; - getTaskRunner().schedule(task); - ASSERT_TRUE(getTaskRunner().isActive()); - getTaskRunner().schedule(task); - { - stdx::lock_guard<stdx::mutex> lk(mutex); - schedulingDone = true; - condition.notify_all(); + // Wait for the test code to schedule the second task. + while (!schedulingDone) { + condition.wait(lk); } - getThreadPool().join(); - ASSERT_FALSE(getTaskRunner().isActive()); + // Throwing an exception from the first task should cancel + // unscheduled tasks and make the task runner inactive. + // When the second (canceled) task throws an exception, it should be ignored. + uassert(ErrorCodes::OperationFailed, "task failure", false); + + // not reached. + invariant(false); + return TaskRunner::NextAction::kKeepOperationContext; + }; + getTaskRunner().schedule(task); + ASSERT_TRUE(getTaskRunner().isActive()); + getTaskRunner().schedule(task); + { stdx::lock_guard<stdx::mutex> lk(mutex); - ASSERT_EQUALS(2, i); - ASSERT(txn[0]); - ASSERT_OK(status[0]); - ASSERT_FALSE(txn[1]); - ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status[1].code()); + schedulingDone = true; + condition.notify_all(); } - - TEST_F(TaskRunnerTest, Cancel) { - stdx::mutex mutex; - stdx::condition_variable condition; - Status status = getDetectableErrorStatus(); - bool taskRunning = false; - - // Running this task causes the task runner to wait for another task that - // is never scheduled. - auto task = [&](OperationContext* theTxn, const Status& theStatus) { - stdx::lock_guard<stdx::mutex> lk(mutex); - status = theStatus; - taskRunning = true; - condition.notify_all(); - return TaskRunner::NextAction::kKeepOperationContext; - }; - - // Calling cancel() before schedule() has no effect. - // The task should still be invoked with a successful status. - getTaskRunner().cancel(); - - getTaskRunner().schedule(task); - ASSERT_TRUE(getTaskRunner().isActive()); - { - stdx::unique_lock<stdx::mutex> lk(mutex); - while (!taskRunning) { - condition.wait(lk); - } + getThreadPool().join(); + ASSERT_FALSE(getTaskRunner().isActive()); + + stdx::lock_guard<stdx::mutex> lk(mutex); + ASSERT_EQUALS(2, i); + ASSERT(txn[0]); + ASSERT_OK(status[0]); + ASSERT_FALSE(txn[1]); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, status[1].code()); +} + +TEST_F(TaskRunnerTest, Cancel) { + stdx::mutex mutex; + stdx::condition_variable condition; + Status status = getDetectableErrorStatus(); + bool taskRunning = false; + + // Running this task causes the task runner to wait for another task that + // is never scheduled. + auto task = [&](OperationContext* theTxn, const Status& theStatus) { + stdx::lock_guard<stdx::mutex> lk(mutex); + status = theStatus; + taskRunning = true; + condition.notify_all(); + return TaskRunner::NextAction::kKeepOperationContext; + }; + + // Calling cancel() before schedule() has no effect. + // The task should still be invoked with a successful status. + getTaskRunner().cancel(); + + getTaskRunner().schedule(task); + ASSERT_TRUE(getTaskRunner().isActive()); + { + stdx::unique_lock<stdx::mutex> lk(mutex); + while (!taskRunning) { + condition.wait(lk); } + } - // It is fine to call cancel() multiple times. - getTaskRunner().cancel(); - getTaskRunner().cancel(); + // It is fine to call cancel() multiple times. + getTaskRunner().cancel(); + getTaskRunner().cancel(); - getThreadPool().join(); - ASSERT_FALSE(getTaskRunner().isActive()); + getThreadPool().join(); + ASSERT_FALSE(getTaskRunner().isActive()); - // This status will not be OK if canceling the task runner - // before scheduling the task results in the task being canceled. - stdx::lock_guard<stdx::mutex> lk(mutex); - ASSERT_OK(status); - } + // This status will not be OK if canceling the task runner + // before scheduling the task results in the task being canceled. + stdx::lock_guard<stdx::mutex> lk(mutex); + ASSERT_OK(status); +} - TEST_F(TaskRunnerTest, DestroyShouldWaitForTasksToComplete) { - stdx::mutex mutex; - stdx::condition_variable condition; - Status status = getDetectableErrorStatus(); - bool taskRunning = false; - - // Running this task causes the task runner to wait for another task that - // is never scheduled. - auto task = [&](OperationContext* theTxn, const Status& theStatus) { - stdx::lock_guard<stdx::mutex> lk(mutex); - status = theStatus; - taskRunning = true; - condition.notify_all(); - return TaskRunner::NextAction::kKeepOperationContext; - }; +TEST_F(TaskRunnerTest, DestroyShouldWaitForTasksToComplete) { + stdx::mutex mutex; + stdx::condition_variable condition; + Status status = getDetectableErrorStatus(); + bool taskRunning = false; - getTaskRunner().schedule(task); - ASSERT_TRUE(getTaskRunner().isActive()); - { - stdx::unique_lock<stdx::mutex> lk(mutex); - while (!taskRunning) { - condition.wait(lk); - } + // Running this task causes the task runner to wait for another task that + // is never scheduled. + auto task = [&](OperationContext* theTxn, const Status& theStatus) { + stdx::lock_guard<stdx::mutex> lk(mutex); + status = theStatus; + taskRunning = true; + condition.notify_all(); + return TaskRunner::NextAction::kKeepOperationContext; + }; + + getTaskRunner().schedule(task); + ASSERT_TRUE(getTaskRunner().isActive()); + { + stdx::unique_lock<stdx::mutex> lk(mutex); + while (!taskRunning) { + condition.wait(lk); } + } - destroyTaskRunner(); + destroyTaskRunner(); - getThreadPool().join(); + getThreadPool().join(); - // This status will not be OK if canceling the task runner - // before scheduling the task results in the task being canceled. - stdx::lock_guard<stdx::mutex> lk(mutex); - ASSERT_OK(status); - } + // This status will not be OK if canceling the task runner + // before scheduling the task results in the task being canceled. + stdx::lock_guard<stdx::mutex> lk(mutex); + ASSERT_OK(status); +} -} // namespace +} // namespace |