summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2015-05-20 10:55:08 -0400
committerBenety Goh <benety@mongodb.com>2015-05-21 19:44:24 -0400
commita2841541e1ccb32351f0301343d71b70481d5572 (patch)
tree6b7ffab5f8246b7bb929e44813cc715646e26544 /src/mongo/db
parentfb26d2b0397e5b3c4bf2e90ba6838a0f5784f35c (diff)
downloadmongo-a2841541e1ccb32351f0301343d71b70481d5572.tar.gz
SERVER-18016 fixed race in TaskRunner unit test
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/SConscript3
-rw-r--r--src/mongo/db/repl/applier_test.cpp38
-rw-r--r--src/mongo/db/repl/replication_executor_test.cpp8
-rw-r--r--src/mongo/db/repl/task_runner_test.cpp70
4 files changed, 65 insertions, 54 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 73cb9de9012..b6e2fae6a97 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -62,6 +62,7 @@ env.CppUnitTest(
],
LIBDEPS=[
'replication_executor_test_fixture',
+ '$BUILD_DIR/mongo/unittest/concurrency',
],
)
@@ -466,6 +467,7 @@ env.Library(
],
LIBDEPS=[
'task_runner',
+ '$BUILD_DIR/mongo/unittest/concurrency',
'$BUILD_DIR/mongo/util/decorable',
],
)
@@ -539,5 +541,6 @@ env.CppUnitTest(
LIBDEPS=[
'applier',
'replication_executor_test_fixture',
+ '$BUILD_DIR/mongo/unittest/concurrency',
],
)
diff --git a/src/mongo/db/repl/applier_test.cpp b/src/mongo/db/repl/applier_test.cpp
index c38bcce4c74..ce1f52d627d 100644
--- a/src/mongo/db/repl/applier_test.cpp
+++ b/src/mongo/db/repl/applier_test.cpp
@@ -28,11 +28,6 @@
#include "mongo/platform/basic.h"
-#include <boost/thread/barrier.hpp>
-#include <boost/thread/condition.hpp>
-#include <boost/thread/lock_guard.hpp>
-#include <boost/thread/lock_types.hpp>
-#include <boost/thread/mutex.hpp>
#include <memory>
#include "mongo/db/jsobj.h"
@@ -41,6 +36,9 @@
#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/replication_executor_test_fixture.h"
#include "mongo/platform/compiler.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/unittest/barrier.h"
namespace {
@@ -68,7 +66,7 @@ namespace {
void _testApplyOperationFailed(size_t opIndex, stdx::function<Status ()> fail);
std::unique_ptr<Applier> _applier;
- std::unique_ptr<boost::barrier> _barrier;
+ std::unique_ptr<unittest::Barrier> _barrier;
};
Status ApplierTest::getDetectableErrorStatus() {
@@ -84,7 +82,7 @@ namespace {
apply,
[this](const StatusWith<Timestamp>&, const Operations&) {
if (_barrier.get()) {
- _barrier->count_down_and_wait();
+ _barrier->countDownAndWait();
}
}));
}
@@ -132,22 +130,22 @@ namespace {
TEST_F(ApplierTest, IsActiveAfterStart) {
// Use a barrier to ensure that the callback blocks while
// we check isActive().
- _barrier.reset(new boost::barrier(2U));
+ _barrier.reset(new unittest::Barrier(2U));
ASSERT_FALSE(getApplier()->isActive());
ASSERT_OK(getApplier()->start());
ASSERT_TRUE(getApplier()->isActive());
- _barrier->count_down_and_wait();
+ _barrier->countDownAndWait();
}
TEST_F(ApplierTest, StartWhenActive) {
// Use a barrier to ensure that the callback blocks while
// we check isActive().
- _barrier.reset(new boost::barrier(2U));
+ _barrier.reset(new unittest::Barrier(2U));
ASSERT_OK(getApplier()->start());
ASSERT_TRUE(getApplier()->isActive());
ASSERT_NOT_OK(getApplier()->start());
ASSERT_TRUE(getApplier()->isActive());
- _barrier->count_down_and_wait();
+ _barrier->countDownAndWait();
}
TEST_F(ApplierTest, CancelWithoutStart) {
@@ -171,10 +169,10 @@ namespace {
TEST_F(ApplierTest, CancelBeforeStartingDBWork) {
// Schedule a blocking DB work item before the applier to allow us to cancel the applier
// work item before the executor runs it.
- boost::barrier barrier(2U);
+ unittest::Barrier barrier(2U);
using CallbackData = ReplicationExecutor::CallbackData;
getExecutor().scheduleDBWork([&](const CallbackData& cbd) {
- barrier.count_down_and_wait(); // generation 0
+ barrier.countDownAndWait(); // generation 0
});
const BSONObj operation = BSON("ts" << Timestamp(Seconds(123), 0));
boost::mutex mutex;
@@ -194,7 +192,7 @@ namespace {
getApplier()->cancel();
ASSERT_TRUE(getApplier()->isActive());
- barrier.count_down_and_wait(); // generation 0
+ barrier.countDownAndWait(); // generation 0
getApplier()->wait();
ASSERT_FALSE(getApplier()->isActive());
@@ -208,10 +206,10 @@ namespace {
TEST_F(ApplierTest, DestroyBeforeStartingDBWork) {
// Schedule a blocking DB work item before the applier to allow us to destroy the applier
// before the executor runs the work item.
- boost::barrier barrier(2U);
+ unittest::Barrier barrier(2U);
using CallbackData = ReplicationExecutor::CallbackData;
getExecutor().scheduleDBWork([&](const CallbackData& cbd) {
- barrier.count_down_and_wait(); // generation 0
+ barrier.countDownAndWait(); // generation 0
// Give the main thread a head start in invoking the applier destructor.
sleepmillis(1);
});
@@ -232,7 +230,7 @@ namespace {
getApplier()->start();
ASSERT_TRUE(getApplier()->isActive());
- barrier.count_down_and_wait(); // generation 0
+ barrier.countDownAndWait(); // generation 0
// It is possible the executor may have invoked the callback before we
// destroy the applier. Therefore both OK and CallbackCanceled are acceptable
@@ -277,7 +275,7 @@ namespace {
TEST_F(ApplierTest, DestroyShouldBlockUntilInactive) {
const Timestamp timestamp(Seconds(123), 0);
- boost::barrier barrier(2U);
+ unittest::Barrier barrier(2U);
boost::mutex mutex;
StatusWith<Timestamp> result = getDetectableErrorStatus();
Applier::Operations operations;
@@ -289,11 +287,11 @@ namespace {
boost::lock_guard<boost::mutex> lock(mutex);
result = theResult;
operations = theOperations;
- barrier.count_down_and_wait();
+ barrier.countDownAndWait();
}));
getApplier()->start();
- barrier.count_down_and_wait();
+ barrier.countDownAndWait();
_applier.reset();
boost::lock_guard<boost::mutex> lock(mutex);
diff --git a/src/mongo/db/repl/replication_executor_test.cpp b/src/mongo/db/repl/replication_executor_test.cpp
index b2f95b1c612..dbc4f00b6fa 100644
--- a/src/mongo/db/repl/replication_executor_test.cpp
+++ b/src/mongo/db/repl/replication_executor_test.cpp
@@ -30,7 +30,6 @@
#include <map>
#include <boost/scoped_ptr.hpp>
-#include <boost/thread/barrier.hpp>
#include <boost/thread/thread.hpp>
#include "mongo/db/namespace_string.h"
@@ -39,6 +38,7 @@
#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/replication_executor_test_fixture.h"
#include "mongo/stdx/functional.h"
+#include "mongo/unittest/barrier.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/map_util.h"
@@ -383,7 +383,7 @@ namespace {
}
TEST_F(ReplicationExecutorTest, ScheduleDBWorkAndExclusiveWorkConcurrently) {
- boost::barrier barrier(2U);
+ unittest::Barrier barrier(2U);
NamespaceString nss("mydb", "mycoll");
ReplicationExecutor& executor = getExecutor();
Status status1(ErrorCodes::InternalError, "Not mutated");
@@ -392,13 +392,13 @@ namespace {
ASSERT_OK(executor.scheduleDBWork([&](const CallbackData& cbData) {
status1 = cbData.status;
txn = cbData.txn;
- barrier.count_down_and_wait();
+ barrier.countDownAndWait();
if (cbData.status != ErrorCodes::CallbackCanceled) {
cbData.executor->shutdown();
}
}).getStatus());
ASSERT_OK(executor.scheduleWorkWithGlobalExclusiveLock([&](const CallbackData& cbData) {
- barrier.count_down_and_wait();
+ barrier.countDownAndWait();
}).getStatus());
executor.run();
ASSERT_OK(status1);
diff --git a/src/mongo/db/repl/task_runner_test.cpp b/src/mongo/db/repl/task_runner_test.cpp
index 504e6862f78..ba94acce307 100644
--- a/src/mongo/db/repl/task_runner_test.cpp
+++ b/src/mongo/db/repl/task_runner_test.cpp
@@ -28,12 +28,14 @@
#include "mongo/platform/basic.h"
-#include <boost/thread/lock_types.hpp>
#include <vector>
#include "mongo/db/operation_context_noop.h"
#include "mongo/db/repl/task_runner.h"
#include "mongo/db/repl/task_runner_test_fixture.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/unittest/barrier.h"
#include "mongo/util/concurrency/thread_pool.h"
namespace {
@@ -60,12 +62,12 @@ namespace {
}
TEST_F(TaskRunnerTest, CallbackValues) {
- boost::mutex mutex;
+ stdx::mutex mutex;
bool called = false;
OperationContext* txn = nullptr;
Status status = getDetectableErrorStatus();
auto task = [&](OperationContext* theTxn, const Status& theStatus) {
- boost::lock_guard<boost::mutex> lk(mutex);
+ stdx::lock_guard<stdx::mutex> lk(mutex);
called = true;
txn = theTxn;
status = theStatus;
@@ -75,7 +77,7 @@ namespace {
getThreadPool().join();
ASSERT_FALSE(getTaskRunner().isActive());
- boost::lock_guard<boost::mutex> lk(mutex);
+ stdx::lock_guard<stdx::mutex> lk(mutex);
ASSERT_TRUE(called);
ASSERT(txn);
ASSERT_OK(status);
@@ -85,13 +87,13 @@ namespace {
resetTaskRunner(new TaskRunner(&getThreadPool(), []() -> OperationContext* {
return nullptr;
}));
- boost::mutex mutex;
+ stdx::mutex mutex;
bool called = false;
OperationContextNoop opCtxNoop;
OperationContext* txn = &opCtxNoop;
Status status = getDetectableErrorStatus();
auto task = [&](OperationContext* theTxn, const Status& theStatus) {
- boost::lock_guard<boost::mutex> lk(mutex);
+ stdx::lock_guard<stdx::mutex> lk(mutex);
called = true;
txn = theTxn;
status = theStatus;
@@ -101,7 +103,7 @@ namespace {
getThreadPool().join();
ASSERT_FALSE(getTaskRunner().isActive());
- boost::lock_guard<boost::mutex> lk(mutex);
+ stdx::lock_guard<stdx::mutex> lk(mutex);
ASSERT_TRUE(called);
ASSERT_FALSE(txn);
ASSERT_OK(status);
@@ -110,12 +112,13 @@ namespace {
std::vector<int> _testRunTaskTwice(TaskRunnerTest& test,
TaskRunner::NextAction nextAction,
stdx::function<void(const Task& task)> schedule) {
- boost::mutex mutex;
+ unittest::Barrier barrier(2U);
+ stdx::mutex mutex;
int i = 0;
OperationContext* txn[2] = {nullptr, nullptr};
int txnId[2] = {-100, -100};
auto task = [&](OperationContext* theTxn, const Status& theStatus) {
- boost::lock_guard<boost::mutex> lk(mutex);
+ stdx::lock_guard<stdx::mutex> lk(mutex);
int j = i++;
if (j >= 2) {
return TaskRunner::NextAction::kInvalid;
@@ -123,15 +126,22 @@ namespace {
txn[j] = theTxn;
txnId[j] = TaskRunnerTest::getOperationContextId(txn[j]);
TaskRunner::NextAction result = j == 0 ? nextAction : TaskRunner::NextAction::kCancel;
+ barrier.countDownAndWait();
return result;
};
+
schedule(task);
ASSERT_TRUE(test.getTaskRunner().isActive());
+ barrier.countDownAndWait();
+
schedule(task);
+ ASSERT_TRUE(test.getTaskRunner().isActive());
+ barrier.countDownAndWait();
+
test.getThreadPool().join();
ASSERT_FALSE(test.getTaskRunner().isActive());
- boost::lock_guard<boost::mutex> lk(mutex);
+ stdx::lock_guard<stdx::mutex> lk(mutex);
ASSERT_EQUALS(2, i);
ASSERT(txn[0]);
ASSERT(txn[1]);
@@ -171,14 +181,14 @@ namespace {
}
TEST_F(TaskRunnerTest, SkipSecondTask) {
- boost::mutex mutex;
+ stdx::mutex mutex;
int i = 0;
OperationContext* txn[2] = {nullptr, nullptr};
Status status[2] = {getDetectableErrorStatus(), getDetectableErrorStatus()};
- boost::condition condition;
+ stdx::condition_variable condition;
bool schedulingDone = false;
auto task = [&](OperationContext* theTxn, const Status& theStatus) {
- boost::unique_lock<boost::mutex> lk(mutex);
+ stdx::unique_lock<stdx::mutex> lk(mutex);
int j = i++;
if (j >= 2) {
return TaskRunner::NextAction::kCancel;
@@ -197,14 +207,14 @@ namespace {
ASSERT_TRUE(getTaskRunner().isActive());
getTaskRunner().schedule(task);
{
- boost::lock_guard<boost::mutex> lk(mutex);
+ stdx::lock_guard<stdx::mutex> lk(mutex);
schedulingDone = true;
condition.notify_all();
}
getThreadPool().join();
ASSERT_FALSE(getTaskRunner().isActive());
- boost::lock_guard<boost::mutex> lk(mutex);
+ stdx::lock_guard<stdx::mutex> lk(mutex);
ASSERT_EQUALS(2, i);
ASSERT(txn[0]);
ASSERT_OK(status[0]);
@@ -213,14 +223,14 @@ namespace {
}
TEST_F(TaskRunnerTest, FirstTaskThrowsException) {
- boost::mutex mutex;
+ stdx::mutex mutex;
int i = 0;
OperationContext* txn[2] = {nullptr, nullptr};
Status status[2] = {getDetectableErrorStatus(), getDetectableErrorStatus()};
- boost::condition condition;
+ stdx::condition_variable condition;
bool schedulingDone = false;
auto task = [&](OperationContext* theTxn, const Status& theStatus) {
- boost::unique_lock<boost::mutex> lk(mutex);
+ stdx::unique_lock<stdx::mutex> lk(mutex);
int j = i++;
if (j >= 2) {
return TaskRunner::NextAction::kCancel;
@@ -246,14 +256,14 @@ namespace {
ASSERT_TRUE(getTaskRunner().isActive());
getTaskRunner().schedule(task);
{
- boost::lock_guard<boost::mutex> lk(mutex);
+ stdx::lock_guard<stdx::mutex> lk(mutex);
schedulingDone = true;
condition.notify_all();
}
getThreadPool().join();
ASSERT_FALSE(getTaskRunner().isActive());
- boost::lock_guard<boost::mutex> lk(mutex);
+ stdx::lock_guard<stdx::mutex> lk(mutex);
ASSERT_EQUALS(2, i);
ASSERT(txn[0]);
ASSERT_OK(status[0]);
@@ -262,15 +272,15 @@ namespace {
}
TEST_F(TaskRunnerTest, Cancel) {
- boost::mutex mutex;
- boost::condition condition;
+ stdx::mutex mutex;
+ stdx::condition_variable condition;
Status status = getDetectableErrorStatus();
bool taskRunning = false;
// Running this task causes the task runner to wait for another task that
// is never scheduled.
auto task = [&](OperationContext* theTxn, const Status& theStatus) {
- boost::lock_guard<boost::mutex> lk(mutex);
+ stdx::lock_guard<stdx::mutex> lk(mutex);
status = theStatus;
taskRunning = true;
condition.notify_all();
@@ -284,7 +294,7 @@ namespace {
getTaskRunner().schedule(task);
ASSERT_TRUE(getTaskRunner().isActive());
{
- boost::unique_lock<boost::mutex> lk(mutex);
+ stdx::unique_lock<stdx::mutex> lk(mutex);
while (!taskRunning) {
condition.wait(lk);
}
@@ -299,20 +309,20 @@ namespace {
// This status will not be OK if canceling the task runner
// before scheduling the task results in the task being canceled.
- boost::lock_guard<boost::mutex> lk(mutex);
+ stdx::lock_guard<stdx::mutex> lk(mutex);
ASSERT_OK(status);
}
TEST_F(TaskRunnerTest, DestroyShouldWaitForTasksToComplete) {
- boost::mutex mutex;
- boost::condition condition;
+ stdx::mutex mutex;
+ stdx::condition_variable condition;
Status status = getDetectableErrorStatus();
bool taskRunning = false;
// Running this task causes the task runner to wait for another task that
// is never scheduled.
auto task = [&](OperationContext* theTxn, const Status& theStatus) {
- boost::lock_guard<boost::mutex> lk(mutex);
+ stdx::lock_guard<stdx::mutex> lk(mutex);
status = theStatus;
taskRunning = true;
condition.notify_all();
@@ -322,7 +332,7 @@ namespace {
getTaskRunner().schedule(task);
ASSERT_TRUE(getTaskRunner().isActive());
{
- boost::unique_lock<boost::mutex> lk(mutex);
+ stdx::unique_lock<stdx::mutex> lk(mutex);
while (!taskRunning) {
condition.wait(lk);
}
@@ -334,7 +344,7 @@ namespace {
// This status will not be OK if canceling the task runner
// before scheduling the task results in the task being canceled.
- boost::lock_guard<boost::mutex> lk(mutex);
+ stdx::lock_guard<stdx::mutex> lk(mutex);
ASSERT_OK(status);
}