summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_impl_test.cpp
diff options
context:
space:
mode:
authorAndy Schwerin <schwerin@mongodb.com>2016-10-05 13:30:11 -0400
committerAndy Schwerin <schwerin@mongodb.com>2016-10-05 13:30:11 -0400
commit360d6ae39305dbc2f41ca7e0bfd424081fd4f030 (patch)
tree2bce193d6cc7dd5034b34988593f7cfb21350feb /src/mongo/db/repl/replication_coordinator_impl_test.cpp
parent0cb49ab4d834376615d4a3c8b1e1b7b9f5bf9559 (diff)
downloadmongo-360d6ae39305dbc2f41ca7e0bfd424081fd4f030.tar.gz
SERVER-26305 Use interruptible condition variables in ReplicationCoordinatorImpl instead of KillOpListener
While this change also improves the readability of _awaitReplication_inlock and stepDown, it resovles SERVER-26305 by breaking a deadlock cycle caused by the fact that KillOpListener methods get run under a mutex in ServiceContext.
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl_test.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp112
1 files changed, 48 insertions, 64 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index 659096ed571..64f8e1d1dcf 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -65,6 +65,7 @@
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
+#include "mongo/util/scopeguard.h"
#include "mongo/util/time_support.h"
#include "mongo/util/timer.h"
@@ -115,6 +116,14 @@ void runSingleNodeElection(ServiceContext::UniqueOperationContext txn,
replCoord->signalDrainComplete(txn.get());
}
+/**
+ * Helper that kills an operation, taking the necessary locks.
+ */
+void killOperation(OperationContext* txn) {
+ stdx::lock_guard<Client> lkClient(*txn->getClient());
+ txn->getServiceContext()->killOperation(txn);
+}
+
TEST_F(ReplCoordTest, NodeEntersStartup2StateWhenStartingUpWithValidLocalConfig) {
assertStartSuccess(BSON("_id"
<< "mySet"
@@ -1319,7 +1328,7 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenSteppingDownBeforeSatisfyingAWrite
awaiter.start();
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 2, time1));
- getReplCoord()->stepDown(txn.get(), true, Milliseconds(0), Milliseconds(1000));
+ ASSERT_OK(getReplCoord()->stepDown(txn.get(), true, Milliseconds(0), Milliseconds(1000)));
ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
ASSERT_EQUALS(ErrorCodes::PrimarySteppedDown, statusAndDur.status);
awaiter.reset();
@@ -1362,13 +1371,7 @@ TEST_F(ReplCoordTest,
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1));
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 2, time1));
- OperationContext* txn = awaiter.getOperationContext();
- auto opID = txn->getOpID();
- {
- stdx::lock_guard<Client> lk(*txn->getClient());
- txn->markKilled(ErrorCodes::Interrupted);
- }
- getReplCoord()->interrupt(opID);
+ killOperation(awaiter.getOperationContext());
ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
ASSERT_EQUALS(ErrorCodes::Interrupted, statusAndDur.status);
awaiter.reset();
@@ -1376,6 +1379,33 @@ TEST_F(ReplCoordTest,
class StepDownTest : public ReplCoordTest {
protected:
+ struct SharedClientAndOperation {
+ static SharedClientAndOperation make(ServiceContext* serviceContext) {
+ SharedClientAndOperation result;
+ result.client = serviceContext->makeClient("StepDownThread");
+ result.txn = result.client->makeOperationContext();
+ return result;
+ }
+ std::shared_ptr<Client> client;
+ std::shared_ptr<OperationContext> txn;
+ };
+
+ std::pair<SharedClientAndOperation, stdx::future<boost::optional<Status>>> stepDown_nonBlocking(
+ bool force, Milliseconds waitTime, Milliseconds stepDownTime) {
+ using PromisedClientAndOperation = stdx::promise<SharedClientAndOperation>;
+ auto task = stdx::packaged_task<boost::optional<Status>(PromisedClientAndOperation)>(
+ [=](PromisedClientAndOperation operationPromise) -> boost::optional<Status> {
+ auto result = SharedClientAndOperation::make(getServiceContext());
+ operationPromise.set_value(result);
+ return getReplCoord()->stepDown(result.txn.get(), force, waitTime, stepDownTime);
+ });
+ auto result = task.get_future();
+ PromisedClientAndOperation operationPromise;
+ auto operationFuture = operationPromise.get_future();
+ stdx::thread(std::move(task), std::move(operationPromise)).detach();
+ return std::make_pair(operationFuture.get(), std::move(result));
+ }
+
OID myRid;
OID rid2;
OID rid3;
@@ -1700,25 +1730,16 @@ TEST_F(StepDownTest,
simulateSuccessfulV1Election();
- const auto txn = makeOperationContext();
-
-
// Step down where the secondary actually has to catch up before the stepDown can succeed.
// On entering the network, _stepDownContinue should cancel the heartbeats scheduled for
// T + 2 seconds and send out a new round of heartbeats immediately.
// This makes it unnecessary to advance the clock after entering the network to process
// the heartbeat requests.
- Status result(ErrorCodes::InternalError, "not mutated");
- auto globalReadLockAndEventHandle = repl->stepDown_nonBlocking(
- txn.get(), false, Milliseconds(10000), Milliseconds(60000), &result);
- const auto& eventHandle = globalReadLockAndEventHandle.second;
- ASSERT_TRUE(eventHandle);
- ASSERT_TRUE(txn->lockState()->isReadLocked());
+ auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60));
// Make a secondary actually catch up
enterNetwork();
getNet()->runUntil(getNet()->now() + Milliseconds(1000));
- ASSERT(getNet()->hasReadyRequests());
NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
RemoteCommandRequest request = noi->getRequest();
log() << request.target.toString() << " processing " << request.cmdObj;
@@ -1743,8 +1764,7 @@ TEST_F(StepDownTest,
getNet()->runReadyNetworkOperations();
exitNetwork();
- getReplExec()->waitForEvent(eventHandle);
- ASSERT_OK(result);
+ ASSERT_OK(*result.second.get());
ASSERT_TRUE(repl->getMemberState().secondary());
}
@@ -1761,25 +1781,16 @@ TEST_F(StepDownTest,
simulateSuccessfulV1Election();
- const auto txn = makeOperationContext();
-
-
// Step down where the secondary actually has to catch up before the stepDown can succeed.
// On entering the network, _stepDownContinue should cancel the heartbeats scheduled for
// T + 2 seconds and send out a new round of heartbeats immediately.
// This makes it unnecessary to advance the clock after entering the network to process
// the heartbeat requests.
- Status result(ErrorCodes::InternalError, "not mutated");
- auto globalReadLockAndEventHandle = repl->stepDown_nonBlocking(
- txn.get(), false, Milliseconds(10000), Milliseconds(60000), &result);
- const auto& eventHandle = globalReadLockAndEventHandle.second;
- ASSERT_TRUE(eventHandle);
- ASSERT_TRUE(txn->lockState()->isReadLocked());
+ auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60));
// Secondary has not caught up on first round of heartbeats.
enterNetwork();
getNet()->runUntil(getNet()->now() + Milliseconds(1000));
- ASSERT(getNet()->hasReadyRequests());
NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
RemoteCommandRequest request = noi->getRequest();
log() << "HB1: " << request.target.toString() << " processing " << request.cmdObj;
@@ -1808,7 +1819,6 @@ TEST_F(StepDownTest,
auto until = getNet()->now() + heartbeatInterval;
getNet()->runUntil(until);
ASSERT_EQUALS(until, getNet()->now());
- ASSERT(getNet()->hasReadyRequests());
noi = getNet()->getNextReadyRequest();
request = noi->getRequest();
log() << "HB2: " << request.target.toString() << " processing " << request.cmdObj;
@@ -1830,8 +1840,7 @@ TEST_F(StepDownTest,
getNet()->runReadyNetworkOperations();
exitNetwork();
- getReplExec()->waitForEvent(eventHandle);
- ASSERT_OK(result);
+ ASSERT_OK(*result.second.get());
ASSERT_TRUE(repl->getMemberState().secondary());
}
@@ -1847,28 +1856,12 @@ TEST_F(StepDownTest, NodeReturnsInterruptedWhenInterruptedDuringStepDown) {
simulateSuccessfulV1Election();
- const auto txn = makeOperationContext();
-
- const unsigned int opID = txn->getOpID();
-
ASSERT_TRUE(repl->getMemberState().primary());
// stepDown where the secondary actually has to catch up before the stepDown can succeed.
- Status result(ErrorCodes::InternalError, "not mutated");
- auto globalReadLockAndEventHandle = repl->stepDown_nonBlocking(
- txn.get(), false, Milliseconds(10000), Milliseconds(60000), &result);
- const auto& eventHandle = globalReadLockAndEventHandle.second;
- ASSERT_TRUE(eventHandle);
- ASSERT_TRUE(txn->lockState()->isReadLocked());
-
- {
- stdx::lock_guard<Client> lk(*(txn->getClient()));
- txn->markKilled(ErrorCodes::Interrupted);
- }
- getReplCoord()->interrupt(opID);
-
- getReplExec()->waitForEvent(eventHandle);
- ASSERT_EQUALS(ErrorCodes::Interrupted, result);
+ auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60));
+ killOperation(result.first.txn.get());
+ ASSERT_EQUALS(ErrorCodes::Interrupted, *result.second.get());
ASSERT_TRUE(repl->getMemberState().primary());
}
@@ -3416,12 +3409,8 @@ TEST_F(ReplCoordTest, NodeReturnsInterruptedWhenWaitingUntilAnOpTimeIsInterrupte
getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(10, 0));
getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(10, 0));
- auto txn = makeOperationContext();
-
- {
- stdx::lock_guard<Client> lk(*(txn->getClient()));
- txn->markKilled(ErrorCodes::Interrupted);
- }
+ const auto txn = makeOperationContext();
+ killOperation(txn.get());
auto status = getReplCoord()->waitUntilOpTimeForRead(
txn.get(), ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
@@ -3557,12 +3546,7 @@ TEST_F(ReplCoordTest, ReadAfterCommittedInterrupted) {
getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), 0));
getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(10, 0), 0));
-
- {
- stdx::lock_guard<Client> lk(*(txn->getClient()));
- txn->markKilled(ErrorCodes::Interrupted);
- }
-
+ killOperation(txn.get());
auto status = getReplCoord()->waitUntilOpTimeForRead(
txn.get(),
ReadConcernArgs(OpTime(Timestamp(50, 0), 0), ReadConcernLevel::kMajorityReadConcern));