diff options
author | Andy Schwerin <schwerin@mongodb.com> | 2016-10-05 13:30:11 -0400 |
---|---|---|
committer | Andy Schwerin <schwerin@mongodb.com> | 2016-10-05 13:30:11 -0400 |
commit | 360d6ae39305dbc2f41ca7e0bfd424081fd4f030 (patch) | |
tree | 2bce193d6cc7dd5034b34988593f7cfb21350feb /src/mongo/db/repl/replication_coordinator_impl_test.cpp | |
parent | 0cb49ab4d834376615d4a3c8b1e1b7b9f5bf9559 (diff) | |
download | mongo-360d6ae39305dbc2f41ca7e0bfd424081fd4f030.tar.gz |
SERVER-26305 Use interruptible condition variables in ReplicationCoordinatorImpl instead of KillOpListener
While this change also improves the readability of _awaitReplication_inlock and
stepDown, it resovles SERVER-26305 by breaking a deadlock cycle caused by the
fact that KillOpListener methods get run under a mutex in ServiceContext.
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl_test.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_test.cpp | 112 |
1 files changed, 48 insertions, 64 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 659096ed571..64f8e1d1dcf 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -65,6 +65,7 @@ #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" #include "mongo/util/time_support.h" #include "mongo/util/timer.h" @@ -115,6 +116,14 @@ void runSingleNodeElection(ServiceContext::UniqueOperationContext txn, replCoord->signalDrainComplete(txn.get()); } +/** + * Helper that kills an operation, taking the necessary locks. + */ +void killOperation(OperationContext* txn) { + stdx::lock_guard<Client> lkClient(*txn->getClient()); + txn->getServiceContext()->killOperation(txn); +} + TEST_F(ReplCoordTest, NodeEntersStartup2StateWhenStartingUpWithValidLocalConfig) { assertStartSuccess(BSON("_id" << "mySet" @@ -1319,7 +1328,7 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenSteppingDownBeforeSatisfyingAWrite awaiter.start(); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 2, time1)); - getReplCoord()->stepDown(txn.get(), true, Milliseconds(0), Milliseconds(1000)); + ASSERT_OK(getReplCoord()->stepDown(txn.get(), true, Milliseconds(0), Milliseconds(1000))); ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult(); ASSERT_EQUALS(ErrorCodes::PrimarySteppedDown, statusAndDur.status); awaiter.reset(); @@ -1362,13 +1371,7 @@ TEST_F(ReplCoordTest, ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 1, time1)); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(2, 2, time1)); - OperationContext* txn = awaiter.getOperationContext(); - auto opID = txn->getOpID(); - { - stdx::lock_guard<Client> lk(*txn->getClient()); - txn->markKilled(ErrorCodes::Interrupted); - } - getReplCoord()->interrupt(opID); + killOperation(awaiter.getOperationContext()); ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult(); ASSERT_EQUALS(ErrorCodes::Interrupted, statusAndDur.status); awaiter.reset(); @@ -1376,6 +1379,33 @@ TEST_F(ReplCoordTest, class StepDownTest : public ReplCoordTest { protected: + struct SharedClientAndOperation { + static SharedClientAndOperation make(ServiceContext* serviceContext) { + SharedClientAndOperation result; + result.client = serviceContext->makeClient("StepDownThread"); + result.txn = result.client->makeOperationContext(); + return result; + } + std::shared_ptr<Client> client; + std::shared_ptr<OperationContext> txn; + }; + + std::pair<SharedClientAndOperation, stdx::future<boost::optional<Status>>> stepDown_nonBlocking( + bool force, Milliseconds waitTime, Milliseconds stepDownTime) { + using PromisedClientAndOperation = stdx::promise<SharedClientAndOperation>; + auto task = stdx::packaged_task<boost::optional<Status>(PromisedClientAndOperation)>( + [=](PromisedClientAndOperation operationPromise) -> boost::optional<Status> { + auto result = SharedClientAndOperation::make(getServiceContext()); + operationPromise.set_value(result); + return getReplCoord()->stepDown(result.txn.get(), force, waitTime, stepDownTime); + }); + auto result = task.get_future(); + PromisedClientAndOperation operationPromise; + auto operationFuture = operationPromise.get_future(); + stdx::thread(std::move(task), std::move(operationPromise)).detach(); + return std::make_pair(operationFuture.get(), std::move(result)); + } + OID myRid; OID rid2; OID rid3; @@ -1700,25 +1730,16 @@ TEST_F(StepDownTest, simulateSuccessfulV1Election(); - const auto txn = makeOperationContext(); - - // Step down where the secondary actually has to catch up before the stepDown can succeed. // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for // T + 2 seconds and send out a new round of heartbeats immediately. // This makes it unnecessary to advance the clock after entering the network to process // the heartbeat requests. - Status result(ErrorCodes::InternalError, "not mutated"); - auto globalReadLockAndEventHandle = repl->stepDown_nonBlocking( - txn.get(), false, Milliseconds(10000), Milliseconds(60000), &result); - const auto& eventHandle = globalReadLockAndEventHandle.second; - ASSERT_TRUE(eventHandle); - ASSERT_TRUE(txn->lockState()->isReadLocked()); + auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60)); // Make a secondary actually catch up enterNetwork(); getNet()->runUntil(getNet()->now() + Milliseconds(1000)); - ASSERT(getNet()->hasReadyRequests()); NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); RemoteCommandRequest request = noi->getRequest(); log() << request.target.toString() << " processing " << request.cmdObj; @@ -1743,8 +1764,7 @@ TEST_F(StepDownTest, getNet()->runReadyNetworkOperations(); exitNetwork(); - getReplExec()->waitForEvent(eventHandle); - ASSERT_OK(result); + ASSERT_OK(*result.second.get()); ASSERT_TRUE(repl->getMemberState().secondary()); } @@ -1761,25 +1781,16 @@ TEST_F(StepDownTest, simulateSuccessfulV1Election(); - const auto txn = makeOperationContext(); - - // Step down where the secondary actually has to catch up before the stepDown can succeed. // On entering the network, _stepDownContinue should cancel the heartbeats scheduled for // T + 2 seconds and send out a new round of heartbeats immediately. // This makes it unnecessary to advance the clock after entering the network to process // the heartbeat requests. - Status result(ErrorCodes::InternalError, "not mutated"); - auto globalReadLockAndEventHandle = repl->stepDown_nonBlocking( - txn.get(), false, Milliseconds(10000), Milliseconds(60000), &result); - const auto& eventHandle = globalReadLockAndEventHandle.second; - ASSERT_TRUE(eventHandle); - ASSERT_TRUE(txn->lockState()->isReadLocked()); + auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60)); // Secondary has not caught up on first round of heartbeats. enterNetwork(); getNet()->runUntil(getNet()->now() + Milliseconds(1000)); - ASSERT(getNet()->hasReadyRequests()); NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); RemoteCommandRequest request = noi->getRequest(); log() << "HB1: " << request.target.toString() << " processing " << request.cmdObj; @@ -1808,7 +1819,6 @@ TEST_F(StepDownTest, auto until = getNet()->now() + heartbeatInterval; getNet()->runUntil(until); ASSERT_EQUALS(until, getNet()->now()); - ASSERT(getNet()->hasReadyRequests()); noi = getNet()->getNextReadyRequest(); request = noi->getRequest(); log() << "HB2: " << request.target.toString() << " processing " << request.cmdObj; @@ -1830,8 +1840,7 @@ TEST_F(StepDownTest, getNet()->runReadyNetworkOperations(); exitNetwork(); - getReplExec()->waitForEvent(eventHandle); - ASSERT_OK(result); + ASSERT_OK(*result.second.get()); ASSERT_TRUE(repl->getMemberState().secondary()); } @@ -1847,28 +1856,12 @@ TEST_F(StepDownTest, NodeReturnsInterruptedWhenInterruptedDuringStepDown) { simulateSuccessfulV1Election(); - const auto txn = makeOperationContext(); - - const unsigned int opID = txn->getOpID(); - ASSERT_TRUE(repl->getMemberState().primary()); // stepDown where the secondary actually has to catch up before the stepDown can succeed. - Status result(ErrorCodes::InternalError, "not mutated"); - auto globalReadLockAndEventHandle = repl->stepDown_nonBlocking( - txn.get(), false, Milliseconds(10000), Milliseconds(60000), &result); - const auto& eventHandle = globalReadLockAndEventHandle.second; - ASSERT_TRUE(eventHandle); - ASSERT_TRUE(txn->lockState()->isReadLocked()); - - { - stdx::lock_guard<Client> lk(*(txn->getClient())); - txn->markKilled(ErrorCodes::Interrupted); - } - getReplCoord()->interrupt(opID); - - getReplExec()->waitForEvent(eventHandle); - ASSERT_EQUALS(ErrorCodes::Interrupted, result); + auto result = stepDown_nonBlocking(false, Seconds(10), Seconds(60)); + killOperation(result.first.txn.get()); + ASSERT_EQUALS(ErrorCodes::Interrupted, *result.second.get()); ASSERT_TRUE(repl->getMemberState().primary()); } @@ -3416,12 +3409,8 @@ TEST_F(ReplCoordTest, NodeReturnsInterruptedWhenWaitingUntilAnOpTimeIsInterrupte getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(10, 0)); getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(10, 0)); - auto txn = makeOperationContext(); - - { - stdx::lock_guard<Client> lk(*(txn->getClient())); - txn->markKilled(ErrorCodes::Interrupted); - } + const auto txn = makeOperationContext(); + killOperation(txn.get()); auto status = getReplCoord()->waitUntilOpTimeForRead( txn.get(), ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern)); @@ -3557,12 +3546,7 @@ TEST_F(ReplCoordTest, ReadAfterCommittedInterrupted) { getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), 0)); getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(10, 0), 0)); - - { - stdx::lock_guard<Client> lk(*(txn->getClient())); - txn->markKilled(ErrorCodes::Interrupted); - } - + killOperation(txn.get()); auto status = getReplCoord()->waitUntilOpTimeForRead( txn.get(), ReadConcernArgs(OpTime(Timestamp(50, 0), 0), ReadConcernLevel::kMajorityReadConcern)); |