diff options
author | Benety Goh <benety@mongodb.com> | 2015-05-20 10:55:08 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2015-05-21 19:44:24 -0400 |
commit | a2841541e1ccb32351f0301343d71b70481d5572 (patch) | |
tree | 6b7ffab5f8246b7bb929e44813cc715646e26544 /src/mongo/db | |
parent | fb26d2b0397e5b3c4bf2e90ba6838a0f5784f35c (diff) | |
download | mongo-a2841541e1ccb32351f0301343d71b70481d5572.tar.gz |
SERVER-18016 fixed race in TaskRunner unit test
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/applier_test.cpp | 38 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_executor_test.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/task_runner_test.cpp | 70 |
4 files changed, 65 insertions, 54 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 73cb9de9012..b6e2fae6a97 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -62,6 +62,7 @@ env.CppUnitTest( ], LIBDEPS=[ 'replication_executor_test_fixture', + '$BUILD_DIR/mongo/unittest/concurrency', ], ) @@ -466,6 +467,7 @@ env.Library( ], LIBDEPS=[ 'task_runner', + '$BUILD_DIR/mongo/unittest/concurrency', '$BUILD_DIR/mongo/util/decorable', ], ) @@ -539,5 +541,6 @@ env.CppUnitTest( LIBDEPS=[ 'applier', 'replication_executor_test_fixture', + '$BUILD_DIR/mongo/unittest/concurrency', ], ) diff --git a/src/mongo/db/repl/applier_test.cpp b/src/mongo/db/repl/applier_test.cpp index c38bcce4c74..ce1f52d627d 100644 --- a/src/mongo/db/repl/applier_test.cpp +++ b/src/mongo/db/repl/applier_test.cpp @@ -28,11 +28,6 @@ #include "mongo/platform/basic.h" -#include <boost/thread/barrier.hpp> -#include <boost/thread/condition.hpp> -#include <boost/thread/lock_guard.hpp> -#include <boost/thread/lock_types.hpp> -#include <boost/thread/mutex.hpp> #include <memory> #include "mongo/db/jsobj.h" @@ -41,6 +36,9 @@ #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/replication_executor_test_fixture.h" #include "mongo/platform/compiler.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" +#include "mongo/unittest/barrier.h" namespace { @@ -68,7 +66,7 @@ namespace { void _testApplyOperationFailed(size_t opIndex, stdx::function<Status ()> fail); std::unique_ptr<Applier> _applier; - std::unique_ptr<boost::barrier> _barrier; + std::unique_ptr<unittest::Barrier> _barrier; }; Status ApplierTest::getDetectableErrorStatus() { @@ -84,7 +82,7 @@ namespace { apply, [this](const StatusWith<Timestamp>&, const Operations&) { if (_barrier.get()) { - _barrier->count_down_and_wait(); + _barrier->countDownAndWait(); } })); } @@ -132,22 +130,22 @@ namespace { TEST_F(ApplierTest, IsActiveAfterStart) { // Use a barrier to ensure that the callback blocks while // we check isActive(). - _barrier.reset(new boost::barrier(2U)); + _barrier.reset(new unittest::Barrier(2U)); ASSERT_FALSE(getApplier()->isActive()); ASSERT_OK(getApplier()->start()); ASSERT_TRUE(getApplier()->isActive()); - _barrier->count_down_and_wait(); + _barrier->countDownAndWait(); } TEST_F(ApplierTest, StartWhenActive) { // Use a barrier to ensure that the callback blocks while // we check isActive(). - _barrier.reset(new boost::barrier(2U)); + _barrier.reset(new unittest::Barrier(2U)); ASSERT_OK(getApplier()->start()); ASSERT_TRUE(getApplier()->isActive()); ASSERT_NOT_OK(getApplier()->start()); ASSERT_TRUE(getApplier()->isActive()); - _barrier->count_down_and_wait(); + _barrier->countDownAndWait(); } TEST_F(ApplierTest, CancelWithoutStart) { @@ -171,10 +169,10 @@ namespace { TEST_F(ApplierTest, CancelBeforeStartingDBWork) { // Schedule a blocking DB work item before the applier to allow us to cancel the applier // work item before the executor runs it. - boost::barrier barrier(2U); + unittest::Barrier barrier(2U); using CallbackData = ReplicationExecutor::CallbackData; getExecutor().scheduleDBWork([&](const CallbackData& cbd) { - barrier.count_down_and_wait(); // generation 0 + barrier.countDownAndWait(); // generation 0 }); const BSONObj operation = BSON("ts" << Timestamp(Seconds(123), 0)); boost::mutex mutex; @@ -194,7 +192,7 @@ namespace { getApplier()->cancel(); ASSERT_TRUE(getApplier()->isActive()); - barrier.count_down_and_wait(); // generation 0 + barrier.countDownAndWait(); // generation 0 getApplier()->wait(); ASSERT_FALSE(getApplier()->isActive()); @@ -208,10 +206,10 @@ namespace { TEST_F(ApplierTest, DestroyBeforeStartingDBWork) { // Schedule a blocking DB work item before the applier to allow us to destroy the applier // before the executor runs the work item. - boost::barrier barrier(2U); + unittest::Barrier barrier(2U); using CallbackData = ReplicationExecutor::CallbackData; getExecutor().scheduleDBWork([&](const CallbackData& cbd) { - barrier.count_down_and_wait(); // generation 0 + barrier.countDownAndWait(); // generation 0 // Give the main thread a head start in invoking the applier destructor. sleepmillis(1); }); @@ -232,7 +230,7 @@ namespace { getApplier()->start(); ASSERT_TRUE(getApplier()->isActive()); - barrier.count_down_and_wait(); // generation 0 + barrier.countDownAndWait(); // generation 0 // It is possible the executor may have invoked the callback before we // destroy the applier. Therefore both OK and CallbackCanceled are acceptable @@ -277,7 +275,7 @@ namespace { TEST_F(ApplierTest, DestroyShouldBlockUntilInactive) { const Timestamp timestamp(Seconds(123), 0); - boost::barrier barrier(2U); + unittest::Barrier barrier(2U); boost::mutex mutex; StatusWith<Timestamp> result = getDetectableErrorStatus(); Applier::Operations operations; @@ -289,11 +287,11 @@ namespace { boost::lock_guard<boost::mutex> lock(mutex); result = theResult; operations = theOperations; - barrier.count_down_and_wait(); + barrier.countDownAndWait(); })); getApplier()->start(); - barrier.count_down_and_wait(); + barrier.countDownAndWait(); _applier.reset(); boost::lock_guard<boost::mutex> lock(mutex); diff --git a/src/mongo/db/repl/replication_executor_test.cpp b/src/mongo/db/repl/replication_executor_test.cpp index b2f95b1c612..dbc4f00b6fa 100644 --- a/src/mongo/db/repl/replication_executor_test.cpp +++ b/src/mongo/db/repl/replication_executor_test.cpp @@ -30,7 +30,6 @@ #include <map> #include <boost/scoped_ptr.hpp> -#include <boost/thread/barrier.hpp> #include <boost/thread/thread.hpp> #include "mongo/db/namespace_string.h" @@ -39,6 +38,7 @@ #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/replication_executor_test_fixture.h" #include "mongo/stdx/functional.h" +#include "mongo/unittest/barrier.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" #include "mongo/util/map_util.h" @@ -383,7 +383,7 @@ namespace { } TEST_F(ReplicationExecutorTest, ScheduleDBWorkAndExclusiveWorkConcurrently) { - boost::barrier barrier(2U); + unittest::Barrier barrier(2U); NamespaceString nss("mydb", "mycoll"); ReplicationExecutor& executor = getExecutor(); Status status1(ErrorCodes::InternalError, "Not mutated"); @@ -392,13 +392,13 @@ namespace { ASSERT_OK(executor.scheduleDBWork([&](const CallbackData& cbData) { status1 = cbData.status; txn = cbData.txn; - barrier.count_down_and_wait(); + barrier.countDownAndWait(); if (cbData.status != ErrorCodes::CallbackCanceled) { cbData.executor->shutdown(); } }).getStatus()); ASSERT_OK(executor.scheduleWorkWithGlobalExclusiveLock([&](const CallbackData& cbData) { - barrier.count_down_and_wait(); + barrier.countDownAndWait(); }).getStatus()); executor.run(); ASSERT_OK(status1); diff --git a/src/mongo/db/repl/task_runner_test.cpp b/src/mongo/db/repl/task_runner_test.cpp index 504e6862f78..ba94acce307 100644 --- a/src/mongo/db/repl/task_runner_test.cpp +++ b/src/mongo/db/repl/task_runner_test.cpp @@ -28,12 +28,14 @@ #include "mongo/platform/basic.h" -#include <boost/thread/lock_types.hpp> #include <vector> #include "mongo/db/operation_context_noop.h" #include "mongo/db/repl/task_runner.h" #include "mongo/db/repl/task_runner_test_fixture.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/mutex.h" +#include "mongo/unittest/barrier.h" #include "mongo/util/concurrency/thread_pool.h" namespace { @@ -60,12 +62,12 @@ namespace { } TEST_F(TaskRunnerTest, CallbackValues) { - boost::mutex mutex; + stdx::mutex mutex; bool called = false; OperationContext* txn = nullptr; Status status = getDetectableErrorStatus(); auto task = [&](OperationContext* theTxn, const Status& theStatus) { - boost::lock_guard<boost::mutex> lk(mutex); + stdx::lock_guard<stdx::mutex> lk(mutex); called = true; txn = theTxn; status = theStatus; @@ -75,7 +77,7 @@ namespace { getThreadPool().join(); ASSERT_FALSE(getTaskRunner().isActive()); - boost::lock_guard<boost::mutex> lk(mutex); + stdx::lock_guard<stdx::mutex> lk(mutex); ASSERT_TRUE(called); ASSERT(txn); ASSERT_OK(status); @@ -85,13 +87,13 @@ namespace { resetTaskRunner(new TaskRunner(&getThreadPool(), []() -> OperationContext* { return nullptr; })); - boost::mutex mutex; + stdx::mutex mutex; bool called = false; OperationContextNoop opCtxNoop; OperationContext* txn = &opCtxNoop; Status status = getDetectableErrorStatus(); auto task = [&](OperationContext* theTxn, const Status& theStatus) { - boost::lock_guard<boost::mutex> lk(mutex); + stdx::lock_guard<stdx::mutex> lk(mutex); called = true; txn = theTxn; status = theStatus; @@ -101,7 +103,7 @@ namespace { getThreadPool().join(); ASSERT_FALSE(getTaskRunner().isActive()); - boost::lock_guard<boost::mutex> lk(mutex); + stdx::lock_guard<stdx::mutex> lk(mutex); ASSERT_TRUE(called); ASSERT_FALSE(txn); ASSERT_OK(status); @@ -110,12 +112,13 @@ namespace { std::vector<int> _testRunTaskTwice(TaskRunnerTest& test, TaskRunner::NextAction nextAction, stdx::function<void(const Task& task)> schedule) { - boost::mutex mutex; + unittest::Barrier barrier(2U); + stdx::mutex mutex; int i = 0; OperationContext* txn[2] = {nullptr, nullptr}; int txnId[2] = {-100, -100}; auto task = [&](OperationContext* theTxn, const Status& theStatus) { - boost::lock_guard<boost::mutex> lk(mutex); + stdx::lock_guard<stdx::mutex> lk(mutex); int j = i++; if (j >= 2) { return TaskRunner::NextAction::kInvalid; @@ -123,15 +126,22 @@ namespace { txn[j] = theTxn; txnId[j] = TaskRunnerTest::getOperationContextId(txn[j]); TaskRunner::NextAction result = j == 0 ? nextAction : TaskRunner::NextAction::kCancel; + barrier.countDownAndWait(); return result; }; + schedule(task); ASSERT_TRUE(test.getTaskRunner().isActive()); + barrier.countDownAndWait(); + schedule(task); + ASSERT_TRUE(test.getTaskRunner().isActive()); + barrier.countDownAndWait(); + test.getThreadPool().join(); ASSERT_FALSE(test.getTaskRunner().isActive()); - boost::lock_guard<boost::mutex> lk(mutex); + stdx::lock_guard<stdx::mutex> lk(mutex); ASSERT_EQUALS(2, i); ASSERT(txn[0]); ASSERT(txn[1]); @@ -171,14 +181,14 @@ namespace { } TEST_F(TaskRunnerTest, SkipSecondTask) { - boost::mutex mutex; + stdx::mutex mutex; int i = 0; OperationContext* txn[2] = {nullptr, nullptr}; Status status[2] = {getDetectableErrorStatus(), getDetectableErrorStatus()}; - boost::condition condition; + stdx::condition_variable condition; bool schedulingDone = false; auto task = [&](OperationContext* theTxn, const Status& theStatus) { - boost::unique_lock<boost::mutex> lk(mutex); + stdx::unique_lock<stdx::mutex> lk(mutex); int j = i++; if (j >= 2) { return TaskRunner::NextAction::kCancel; @@ -197,14 +207,14 @@ namespace { ASSERT_TRUE(getTaskRunner().isActive()); getTaskRunner().schedule(task); { - boost::lock_guard<boost::mutex> lk(mutex); + stdx::lock_guard<stdx::mutex> lk(mutex); schedulingDone = true; condition.notify_all(); } getThreadPool().join(); ASSERT_FALSE(getTaskRunner().isActive()); - boost::lock_guard<boost::mutex> lk(mutex); + stdx::lock_guard<stdx::mutex> lk(mutex); ASSERT_EQUALS(2, i); ASSERT(txn[0]); ASSERT_OK(status[0]); @@ -213,14 +223,14 @@ namespace { } TEST_F(TaskRunnerTest, FirstTaskThrowsException) { - boost::mutex mutex; + stdx::mutex mutex; int i = 0; OperationContext* txn[2] = {nullptr, nullptr}; Status status[2] = {getDetectableErrorStatus(), getDetectableErrorStatus()}; - boost::condition condition; + stdx::condition_variable condition; bool schedulingDone = false; auto task = [&](OperationContext* theTxn, const Status& theStatus) { - boost::unique_lock<boost::mutex> lk(mutex); + stdx::unique_lock<stdx::mutex> lk(mutex); int j = i++; if (j >= 2) { return TaskRunner::NextAction::kCancel; @@ -246,14 +256,14 @@ namespace { ASSERT_TRUE(getTaskRunner().isActive()); getTaskRunner().schedule(task); { - boost::lock_guard<boost::mutex> lk(mutex); + stdx::lock_guard<stdx::mutex> lk(mutex); schedulingDone = true; condition.notify_all(); } getThreadPool().join(); ASSERT_FALSE(getTaskRunner().isActive()); - boost::lock_guard<boost::mutex> lk(mutex); + stdx::lock_guard<stdx::mutex> lk(mutex); ASSERT_EQUALS(2, i); ASSERT(txn[0]); ASSERT_OK(status[0]); @@ -262,15 +272,15 @@ namespace { } TEST_F(TaskRunnerTest, Cancel) { - boost::mutex mutex; - boost::condition condition; + stdx::mutex mutex; + stdx::condition_variable condition; Status status = getDetectableErrorStatus(); bool taskRunning = false; // Running this task causes the task runner to wait for another task that // is never scheduled. auto task = [&](OperationContext* theTxn, const Status& theStatus) { - boost::lock_guard<boost::mutex> lk(mutex); + stdx::lock_guard<stdx::mutex> lk(mutex); status = theStatus; taskRunning = true; condition.notify_all(); @@ -284,7 +294,7 @@ namespace { getTaskRunner().schedule(task); ASSERT_TRUE(getTaskRunner().isActive()); { - boost::unique_lock<boost::mutex> lk(mutex); + stdx::unique_lock<stdx::mutex> lk(mutex); while (!taskRunning) { condition.wait(lk); } @@ -299,20 +309,20 @@ namespace { // This status will not be OK if canceling the task runner // before scheduling the task results in the task being canceled. - boost::lock_guard<boost::mutex> lk(mutex); + stdx::lock_guard<stdx::mutex> lk(mutex); ASSERT_OK(status); } TEST_F(TaskRunnerTest, DestroyShouldWaitForTasksToComplete) { - boost::mutex mutex; - boost::condition condition; + stdx::mutex mutex; + stdx::condition_variable condition; Status status = getDetectableErrorStatus(); bool taskRunning = false; // Running this task causes the task runner to wait for another task that // is never scheduled. auto task = [&](OperationContext* theTxn, const Status& theStatus) { - boost::lock_guard<boost::mutex> lk(mutex); + stdx::lock_guard<stdx::mutex> lk(mutex); status = theStatus; taskRunning = true; condition.notify_all(); @@ -322,7 +332,7 @@ namespace { getTaskRunner().schedule(task); ASSERT_TRUE(getTaskRunner().isActive()); { - boost::unique_lock<boost::mutex> lk(mutex); + stdx::unique_lock<stdx::mutex> lk(mutex); while (!taskRunning) { condition.wait(lk); } @@ -334,7 +344,7 @@ namespace { // This status will not be OK if canceling the task runner // before scheduling the task results in the task being canceled. - boost::lock_guard<boost::mutex> lk(mutex); + stdx::lock_guard<stdx::mutex> lk(mutex); ASSERT_OK(status); } |