diff options
author | Spencer T Brody <spencer@mongodb.com> | 2017-09-29 13:25:32 -0400 |
---|---|---|
committer | Spencer T Brody <spencer@mongodb.com> | 2017-10-04 12:03:47 -0400 |
commit | d1153c137c6422b7f9333aefaacb536cc664aae5 (patch) | |
tree | 0c55dd3b6fae7a1576a8073c543344e7d3b260cf /src/mongo/db | |
parent | 763e9a4a675668ba3c0d3836321f53c77950cc53 (diff) | |
download | mongo-d1153c137c6422b7f9333aefaacb536cc664aae5.tar.gz |
SERVER-31330 Prevent stepdown during drain mode from triggering an invariant
Diffstat (limited to 'src/mongo/db')
13 files changed, 193 insertions, 74 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index a40d7612893..4bdfee93c5b 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -198,6 +198,10 @@ void ReplicationCoordinatorExternalStateMock::setStoreLocalLastVoteDocumentToHan } } +void ReplicationCoordinatorExternalStateMock::setFirstOpTimeOfMyTerm(const OpTime& opTime) { + _firstOpTimeOfMyTerm = opTime; +} + void ReplicationCoordinatorExternalStateMock::closeConnections() { _connectionsClosed = true; } @@ -278,9 +282,8 @@ void ReplicationCoordinatorExternalStateMock::onDrainComplete(OperationContext* OpTime ReplicationCoordinatorExternalStateMock::onTransitionToPrimary(OperationContext* opCtx, bool isV1ElectionProtocol) { - if (isV1ElectionProtocol) { - _lastOpTime = OpTime(Timestamp(1, 0), 1); - } + _lastOpTime = _firstOpTimeOfMyTerm; + _firstOpTimeOfMyTerm = OpTime(); return fassertStatusOK(40297, _lastOpTime); } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 613389e1761..81eb5d634c3 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -152,6 +152,8 @@ public: */ void setStoreLocalLastVoteDocumentToHang(bool hang); + void setFirstOpTimeOfMyTerm(const OpTime& opTime); + /** * Returns true if startThreads() has been called. */ @@ -199,6 +201,7 @@ private: bool _threadsStarted; bool _isReadCommittedSupported = true; bool _areSnapshotsEnabled = true; + OpTime _firstOpTimeOfMyTerm; }; } // namespace repl diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 27be3f605e3..cb7d44623b3 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -939,8 +939,10 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx, Lock::GlobalWrite globalWriteLock(opCtx); lk.lock(); - // Exit drain mode when the buffer is empty in the current term and we're in Draining mode. - if (_applierState != ApplierState::Draining || termWhenBufferIsEmpty != _topCoord->getTerm()) { + // Exit drain mode only if we're actually in draining mode, the apply buffer is empty in the + // current term, and we're allowed to become the write master. + if (_applierState != ApplierState::Draining || + !_topCoord->canCompleteTransitionToPrimary(termWhenBufferIsEmpty)) { return; } _applierState = ApplierState::Stopped; @@ -961,9 +963,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx, _updateLastCommittedOpTime_inlock(); // Update _canAcceptNonLocalWrites - invariant(!_canAcceptNonLocalWrites); _updateMemberStateFromTopologyCoordinator_inlock(opCtx); - invariant(_canAcceptNonLocalWrites); log() << "transition to primary complete; database writes are now permitted" << rsLog; _drainFinishedCond.notify_all(); diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp index 424f8c88689..e449665a483 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp @@ -135,12 +135,12 @@ TEST_F(ReplCoordElectTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) { ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); // Fake OpTime from initiate, or a write op. - getExternalState()->setLastOpTime(OpTime{{0, 0}, 0}); + getExternalState()->setLastOpTime(OpTime{{0, 0}, -1}); ASSERT(getReplCoord()->getMemberState().secondary()) << getReplCoord()->getMemberState().toString(); - getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), -1)); NetworkInterfaceMock* net = getNet(); net->enterNetwork(); @@ -188,7 +188,7 @@ TEST_F(ReplCoordElectTest, ElectionSucceedsWhenNodeIsTheOnlyNode) { ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); // Fake OpTime from initiate, or a write op. - getExternalState()->setLastOpTime(OpTime{{0, 0}, 0}); + getExternalState()->setLastOpTime(OpTime{{0, 0}, -1}); ASSERT(getReplCoord()->getMemberState().primary()) << getReplCoord()->getMemberState().toString(); @@ -222,8 +222,8 @@ TEST_F(ReplCoordElectTest, ElectionSucceedsWhenAllNodesVoteYea) { << "node3:12345"))); assertStartSuccess(configObj, HostAndPort("node1", 12345)); OperationContextNoop opCtx; - getReplCoord()->setMyLastAppliedOpTime(OpTime{{100, 1}, 0}); - getExternalState()->setLastOpTime(OpTime{{100, 1}, 0}); + getReplCoord()->setMyLastAppliedOpTime(OpTime{{100, 1}, -1}); + getExternalState()->setLastOpTime(OpTime{{100, 1}, -1}); ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); startCapturingLogMessages(); diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp index 3b6f9500378..fd4db006a74 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp @@ -171,7 +171,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) { getReplCoord()->fillIsMasterForReplSet(&imResponse); ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString(); - getReplCoord()->signalDrainComplete(&opCtx, getReplCoord()->getTerm()); + signalDrainComplete(&opCtx); getReplCoord()->fillIsMasterForReplSet(&imResponse); ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString(); @@ -233,7 +233,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyNode) { getReplCoord()->fillIsMasterForReplSet(&imResponse); ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString(); - getReplCoord()->signalDrainComplete(&opCtx, getReplCoord()->getTerm()); + signalDrainComplete(&opCtx); getReplCoord()->fillIsMasterForReplSet(&imResponse); ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString(); @@ -2160,7 +2160,7 @@ TEST_F(PrimaryCatchUpTest, PrimaryDoesNotNeedToCatchUp) { stopCapturingLogMessages(); ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest optime known via heartbeats")); auto opCtx = makeOperationContext(); - getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); + signalDrainComplete(opCtx.get()); Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); } @@ -2183,7 +2183,7 @@ TEST_F(PrimaryCatchUpTest, CatchupSucceeds) { stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest known optime successfully")); auto opCtx = makeOperationContext(); - getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); + signalDrainComplete(opCtx.get()); Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); } @@ -2203,7 +2203,7 @@ TEST_F(PrimaryCatchUpTest, CatchupTimeout) { stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Catchup timed out")); auto opCtx = makeOperationContext(); - getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); + signalDrainComplete(opCtx.get()); Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); } @@ -2228,7 +2228,7 @@ TEST_F(PrimaryCatchUpTest, CannotSeeAllNodes) { stopCapturingLogMessages(); ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest optime known via heartbeats")); auto opCtx = makeOperationContext(); - getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); + signalDrainComplete(opCtx.get()); Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); } @@ -2253,7 +2253,7 @@ TEST_F(PrimaryCatchUpTest, HeartbeatTimeout) { stopCapturingLogMessages(); ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest optime known via heartbeats")); auto opCtx = makeOperationContext(); - getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); + signalDrainComplete(opCtx.get()); Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); } @@ -2352,7 +2352,7 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) { Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX); ASSERT_FALSE(replCoord->canAcceptWritesForDatabase(opCtx.get(), "test")); } - replCoord->signalDrainComplete(opCtx.get(), replCoord->getTerm()); + signalDrainComplete(opCtx.get()); Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX); ASSERT(replCoord->getApplierState() == ApplierState::Stopped); ASSERT_TRUE(replCoord->canAcceptWritesForDatabase(opCtx.get(), "test")); @@ -2416,7 +2416,7 @@ TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) { stopCapturingLogMessages(); ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest")); auto opCtx = makeOperationContext(); - getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); + signalDrainComplete(opCtx.get()); Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); } @@ -2461,7 +2461,7 @@ TEST_F(PrimaryCatchUpTest, InfiniteTimeoutAndAbort) { ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest")); ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out")); auto opCtx = makeOperationContext(); - getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); + signalDrainComplete(opCtx.get()); Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); } @@ -2475,7 +2475,7 @@ TEST_F(PrimaryCatchUpTest, ZeroTimeout) { stopCapturingLogMessages(); ASSERT_EQUALS(1, countLogLinesContaining("Skipping primary catchup")); auto opCtx = makeOperationContext(); - getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); + signalDrainComplete(opCtx.get()); Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX); ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test")); } diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 1b02badec85..84ea786227a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -371,7 +371,20 @@ void ReplicationCoordinatorImpl::_stepDownFinish( return; } - MONGO_FAIL_POINT_PAUSE_WHILE_SET(blockHeartbeatStepdown); + if (MONGO_FAIL_POINT(blockHeartbeatStepdown)) { + // This log output is used in js tests so please leave it. + log() << "stepDown - blockHeartbeatStepdown fail point enabled. " + "Blocking until fail point is disabled."; + + auto inShutdown = [&] { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _inShutdown; + }; + + while (MONGO_FAIL_POINT(blockHeartbeatStepdown) && !inShutdown()) { + mongo::sleepsecs(1); + } + } auto opCtx = cc().makeOperationContext(); Lock::GlobalWrite globalExclusiveLock(opCtx.get()); diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index f493b053c09..2684f002eaa 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -102,20 +102,6 @@ struct OpTimeWithTermOne { Timestamp timestamp; }; -void runSingleNodeElection(ServiceContext::UniqueOperationContext opCtx, - ReplicationCoordinatorImpl* replCoord, - executor::NetworkInterfaceMock* net) { - replCoord->setMyLastAppliedOpTime(OpTime(Timestamp(1, 0), 0)); - replCoord->setMyLastDurableOpTime(OpTime(Timestamp(1, 0), 0)); - ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); - replCoord->waitForElectionFinish_forTest(); - - ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining); - ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString(); - - replCoord->signalDrainComplete(opCtx.get(), replCoord->getTerm()); -} - /** * Helper that kills an operation, taking the necessary locks. */ @@ -1597,6 +1583,54 @@ TEST_F(ReplCoordTest, ConcurrentStepDownShouldNotSignalTheSameFinishEventMoreTha } } +TEST_F(ReplCoordTest, DrainCompletionMidStepDown) { + init("mySet/test1:1234,test2:1234,test3:1234"); + + assertStartSuccess(BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 0 << "host" + << "test1:1234") + << BSON("_id" << 1 << "host" + << "test2:1234") + << BSON("_id" << 2 << "host" + << "test3:1234")) + << "protocolVersion" + << 1), + HostAndPort("test1", 1234)); + getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); + getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); + + simulateSuccessfulV1ElectionWithoutExitingDrainMode( + getReplCoord()->getElectionTimeout_forTest()); + + ASSERT_EQUALS(1, getReplCoord()->getTerm()); + ASSERT_TRUE(getReplCoord()->getMemberState().primary()); + + // Now update term to trigger a stepdown. + TopologyCoordinator::UpdateTermResult termUpdated; + auto updateTermEvh = getReplCoord()->updateTerm_forTest(2, &termUpdated); + ASSERT(updateTermEvh.isValid()); + ASSERT(termUpdated == TopologyCoordinator::UpdateTermResult::kTriggerStepDown); + + // Now signal that replication applier is finished draining its buffer. + const auto opCtx = makeOperationContext(); + getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm()); + + // Now wait for stepdown to complete + getReplExec()->waitForEvent(updateTermEvh); + + // By now drain mode should be cancelled. + ASSERT_OK(getReplCoord()->waitForDrainFinish(Milliseconds(0))); + + ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); + // ASSERT_EQUALS(2, getReplCoord()->getTerm()); // SERVER-28290 +} + TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) { const auto opCtx = makeOperationContext(); @@ -1824,8 +1858,8 @@ TEST_F(ReplCoordTest, NodeBecomesPrimaryAgainWhenStepDownTimeoutExpiresInASingle << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234"))), HostAndPort("test1", 1234)); - runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); - const auto opCtx = makeOperationContext(); + auto opCtx = makeOperationContext(); + runSingleNodeElection(opCtx.get()); ASSERT_OK(getReplCoord()->stepDown(opCtx.get(), true, Milliseconds(0), Milliseconds(1000))); getNet()->enterNetwork(); // Must do this before inspecting the topocoord @@ -1856,8 +1890,8 @@ TEST_F( << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234"))), HostAndPort("test1", 1234)); - runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); const auto opCtx = makeOperationContext(); + runSingleNodeElection(opCtx.get()); ASSERT_OK(getReplCoord()->stepDown(opCtx.get(), true, Milliseconds(0), Milliseconds(1000))); getNet()->enterNetwork(); // Must do this before inspecting the topocoord @@ -2857,15 +2891,14 @@ TEST_F(ReplCoordTest, IsMasterWithCommittedSnapshot) { << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234"))), HostAndPort("test1", 1234)); - runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); + auto opCtx = makeOperationContext(); + runSingleNodeElection(opCtx.get()); time_t lastWriteDate = 101; OpTime opTime = OpTime(Timestamp(lastWriteDate, 2), 1); time_t majorityWriteDate = 100; OpTime majorityOpTime = OpTime(Timestamp(majorityWriteDate, 1), 1); - auto opCtx = makeOperationContext(); - getReplCoord()->setMyLastAppliedOpTime(opTime); getReplCoord()->setMyLastDurableOpTime(opTime); getReplCoord()->createSnapshot(opCtx.get(), majorityOpTime, SnapshotName(1)); @@ -4095,12 +4128,12 @@ TEST_F(ReplCoordTest, ReadAfterCommittedWhileShutdown) { << 0))), HostAndPort("node1", 12345)); - runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); + auto opCtx = makeOperationContext(); + runSingleNodeElection(opCtx.get()); getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), 0)); getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(10, 0), 0)); - auto opCtx = makeOperationContext(); shutdown(opCtx.get()); auto status = getReplCoord()->waitUntilOpTimeForRead( @@ -4120,8 +4153,8 @@ TEST_F(ReplCoordTest, ReadAfterCommittedInterrupted) { << "_id" << 0))), HostAndPort("node1", 12345)); - runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); const auto opCtx = makeOperationContext(); + runSingleNodeElection(opCtx.get()); getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), 0)); getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(10, 0), 0)); @@ -4143,9 +4176,9 @@ TEST_F(ReplCoordTest, ReadAfterCommittedGreaterOpTime) { << "_id" << 0))), HostAndPort("node1", 12345)); - runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); - auto opCtx = makeOperationContext(); + runSingleNodeElection(opCtx.get()); + getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(100, 0), 1)); getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(100, 0), 1)); getReplCoord()->createSnapshot(opCtx.get(), OpTime(Timestamp(100, 0), 1), SnapshotName(1)); @@ -4166,8 +4199,8 @@ TEST_F(ReplCoordTest, ReadAfterCommittedEqualOpTime) { << "_id" << 0))), HostAndPort("node1", 12345)); - runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); auto opCtx = makeOperationContext(); + runSingleNodeElection(opCtx.get()); OpTime time(Timestamp(100, 0), 1); getReplCoord()->setMyLastAppliedOpTime(time); @@ -4189,7 +4222,9 @@ TEST_F(ReplCoordTest, ReadAfterCommittedDeferredGreaterOpTime) { << "_id" << 0))), HostAndPort("node1", 12345)); - runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); + + auto opCtx = makeOperationContext(); + runSingleNodeElection(opCtx.get()); getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(0, 0), 1)); getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(0, 0), 1)); OpTime committedOpTime(Timestamp(200, 0), 1); @@ -4200,8 +4235,6 @@ TEST_F(ReplCoordTest, ReadAfterCommittedDeferredGreaterOpTime) { getReplCoord()->createSnapshot(nullptr, committedOpTime, SnapshotName(1)); }); - auto opCtx = makeOperationContext(); - ASSERT_OK(getReplCoord()->waitUntilOpTimeForRead( opCtx.get(), ReadConcernArgs(OpTime(Timestamp(100, 0), 1), ReadConcernLevel::kMajorityReadConcern))); @@ -4218,7 +4251,8 @@ TEST_F(ReplCoordTest, ReadAfterCommittedDeferredEqualOpTime) { << "_id" << 0))), HostAndPort("node1", 12345)); - runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); + auto opCtx = makeOperationContext(); + runSingleNodeElection(opCtx.get()); getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(0, 0), 1)); getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(0, 0), 1)); @@ -4231,8 +4265,6 @@ TEST_F(ReplCoordTest, ReadAfterCommittedDeferredEqualOpTime) { getReplCoord()->createSnapshot(nullptr, opTimeToWait, SnapshotName(1)); }); - auto opCtx = makeOperationContext(); - ASSERT_OK(getReplCoord()->waitUntilOpTimeForRead( opCtx.get(), ReadConcernArgs(opTimeToWait, ReadConcernLevel::kMajorityReadConcern))); pseudoLogOp.get(); @@ -4985,7 +5017,9 @@ TEST_F(ReplCoordTest, AdvanceCommittedSnapshotToMostRecentSnapshotPriorToOpTimeW << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234"))), HostAndPort("test1", 1234)); - runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); + + auto opCtx = makeOperationContext(); + runSingleNodeElection(opCtx.get()); OpTime time1(Timestamp(100, 1), 1); OpTime time2(Timestamp(100, 2), 1); @@ -4993,7 +5027,6 @@ TEST_F(ReplCoordTest, AdvanceCommittedSnapshotToMostRecentSnapshotPriorToOpTimeW OpTime time4(Timestamp(100, 4), 1); OpTime time5(Timestamp(100, 5), 1); OpTime time6(Timestamp(100, 6), 1); - auto opCtx = makeOperationContext(); getReplCoord()->createSnapshot(opCtx.get(), time1, SnapshotName(1)); getReplCoord()->createSnapshot(opCtx.get(), time2, SnapshotName(2)); @@ -5019,7 +5052,8 @@ TEST_F(ReplCoordTest, DoNotAdvanceCommittedSnapshotWhenAnOpTimeIsNewerThanOurLat << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234"))), HostAndPort("test1", 1234)); - runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); + auto opCtx = makeOperationContext(); + runSingleNodeElection(opCtx.get()); OpTime time1(Timestamp(100, 1), 1); OpTime time2(Timestamp(100, 2), 1); @@ -5027,7 +5061,6 @@ TEST_F(ReplCoordTest, DoNotAdvanceCommittedSnapshotWhenAnOpTimeIsNewerThanOurLat OpTime time4(Timestamp(100, 4), 1); OpTime time5(Timestamp(100, 5), 1); OpTime time6(Timestamp(100, 6), 1); - auto opCtx = makeOperationContext(); getReplCoord()->createSnapshot(opCtx.get(), time1, SnapshotName(1)); getReplCoord()->createSnapshot(opCtx.get(), time2, SnapshotName(2)); @@ -5051,7 +5084,9 @@ TEST_F(ReplCoordTest, << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234"))), HostAndPort("test1", 1234)); - runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); + + auto opCtx = makeOperationContext(); + runSingleNodeElection(opCtx.get()); OpTime time1(Timestamp(100, 1), 1); OpTime time2(Timestamp(100, 2), 1); @@ -5059,7 +5094,6 @@ TEST_F(ReplCoordTest, OpTime time4(Timestamp(100, 4), 1); OpTime time5(Timestamp(100, 5), 1); OpTime time6(Timestamp(100, 6), 1); - auto opCtx = makeOperationContext(); getReplCoord()->createSnapshot(opCtx.get(), time1, SnapshotName(1)); getReplCoord()->createSnapshot(opCtx.get(), time2, SnapshotName(2)); @@ -5085,7 +5119,9 @@ TEST_F(ReplCoordTest, ZeroCommittedSnapshotWhenAllSnapshotsAreDropped) { << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234"))), HostAndPort("test1", 1234)); - runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); + + auto opCtx = makeOperationContext(); + runSingleNodeElection(opCtx.get()); OpTime time1(Timestamp(100, 1), 1); OpTime time2(Timestamp(100, 2), 1); @@ -5093,7 +5129,6 @@ TEST_F(ReplCoordTest, ZeroCommittedSnapshotWhenAllSnapshotsAreDropped) { OpTime time4(Timestamp(100, 4), 1); OpTime time5(Timestamp(100, 5), 1); OpTime time6(Timestamp(100, 6), 1); - auto opCtx = makeOperationContext(); getReplCoord()->createSnapshot(opCtx.get(), time1, SnapshotName(1)); getReplCoord()->createSnapshot(opCtx.get(), time2, SnapshotName(2)); @@ -5115,11 +5150,12 @@ TEST_F(ReplCoordTest, DoNotAdvanceCommittedSnapshotWhenAppliedOpTimeChanges) { << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234"))), HostAndPort("test1", 1234)); - runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet()); + + auto opCtx = makeOperationContext(); + runSingleNodeElection(opCtx.get()); OpTime time1(Timestamp(100, 1), 1); OpTime time2(Timestamp(100, 2), 1); - auto opCtx = makeOperationContext(); getReplCoord()->createSnapshot(opCtx.get(), time1, SnapshotName(1)); @@ -5510,7 +5546,7 @@ TEST_F(ReplCoordTest, WaitForDrainFinish) { ASSERT_EQUALS(ErrorCodes::BadValue, replCoord->waitForDrainFinish(Milliseconds(-1))); const auto opCtx = makeOperationContext(); - replCoord->signalDrainComplete(opCtx.get(), replCoord->getTerm()); + signalDrainComplete(opCtx.get()); ASSERT_OK(replCoord->waitForDrainFinish(timeout)); // Zero timeout is fine. diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index ffc5af1de2d..abdee75ed34 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -319,7 +319,7 @@ void ReplCoordTest::simulateSuccessfulV1Election() { simulateSuccessfulV1ElectionAt(electionTimeoutWhen); } -void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) { +void ReplCoordTest::simulateSuccessfulV1ElectionWithoutExitingDrainMode(Date_t electionTime) { ReplicationCoordinatorImpl* replCoord = getReplCoord(); NetworkInterfaceMock* net = getNet(); @@ -374,11 +374,17 @@ void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) { replCoord->fillIsMasterForReplSet(&imResponse); ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString(); +} +void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) { + simulateSuccessfulV1ElectionWithoutExitingDrainMode(electionTime); + ReplicationCoordinatorImpl* replCoord = getReplCoord(); + { auto opCtx = makeOperationContext(); - replCoord->signalDrainComplete(opCtx.get(), replCoord->getTerm()); + signalDrainComplete(opCtx.get()); } ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped); + IsMasterResponse imResponse; replCoord->fillIsMasterForReplSet(&imResponse); ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString(); @@ -448,6 +454,24 @@ void ReplCoordTest::simulateSuccessfulElection() { ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString(); } +void ReplCoordTest::signalDrainComplete(OperationContext* opCtx) { + getExternalState()->setFirstOpTimeOfMyTerm(OpTime(Timestamp(1, 1), getReplCoord()->getTerm())); + getReplCoord()->signalDrainComplete(opCtx, getReplCoord()->getTerm()); +} + +void ReplCoordTest::runSingleNodeElection(OperationContext* opCtx) { + getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(1, 0), 0)); + getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(1, 0), 0)); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + getReplCoord()->waitForElectionFinish_forTest(); + + ASSERT(getReplCoord()->getApplierState() == ReplicationCoordinator::ApplierState::Draining); + ASSERT(getReplCoord()->getMemberState().primary()) + << getReplCoord()->getMemberState().toString(); + + signalDrainComplete(opCtx); +} + void ReplCoordTest::shutdown(OperationContext* opCtx) { invariant(_callShutdown); _net->exitNetwork(); diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h index e1aaa9ca6d1..94b6a7ed6c6 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.h +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h @@ -237,6 +237,23 @@ protected: void simulateSuccessfulV1ElectionAt(Date_t electionTime); /** + * When the test has been configured with a replica set config with a single member, use this + * to put that single member into state PRIMARY. + */ + void runSingleNodeElection(OperationContext* opCtx); + + /** + * Same as simulateSuccessfulV1ElectionAt, but stops short of signaling drain completion, + * so the node stays in drain mode. + */ + void simulateSuccessfulV1ElectionWithoutExitingDrainMode(Date_t electionTime); + + /** + * Transition the ReplicationCoordinator from drain mode to being fully primary/master. + */ + void signalDrainComplete(OperationContext* opCtx); + + /** * Shuts down the objects under test. */ void shutdown(OperationContext* opCtx); diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 0716e04a546..a60318df57f 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -267,6 +267,11 @@ public: virtual OpTime getLastCommittedOpTime() const = 0; /** + * Returns true if it's safe to transition to LeaderMode::kMaster. + */ + virtual bool canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const = 0; + + /** * Called by the ReplicationCoordinator to signal that we have finished catchup and drain modes * and are ready to fully become primary and start accepting writes. * "firstOpTimeOfTerm" is a floor on the OpTimes this node will be allowed to consider committed diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index f6e1d24c5af..c47da370350 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -2886,9 +2886,26 @@ OpTime TopologyCoordinatorImpl::getLastCommittedOpTime() const { return _lastCommittedOpTime; } +bool TopologyCoordinatorImpl::canCompleteTransitionToPrimary( + long long termWhenDrainCompleted) const { + + if (termWhenDrainCompleted != _term) { + return false; + } + // Allow completing the transition to primary even when in the middle of a stepdown attempt, + // in case the stepdown attempt fails. + if (_leaderMode != LeaderMode::kLeaderElect && _leaderMode != LeaderMode::kAttemptingStepDown) { + return false; + } + + return true; +} + void TopologyCoordinatorImpl::completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) { - invariant(_leaderMode == LeaderMode::kLeaderElect); - _setLeaderMode(LeaderMode::kMaster); + invariant(canCompleteTransitionToPrimary(firstOpTimeOfTerm.getTerm())); + if (_leaderMode == LeaderMode::kLeaderElect) { + _setLeaderMode(LeaderMode::kMaster); + } _firstOpTimeOfMyTerm = firstOpTimeOfTerm; } diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index aa30c194b1a..26dda7281db 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -168,6 +168,7 @@ public: virtual bool updateLastCommittedOpTime(); virtual bool advanceLastCommittedOpTime(const OpTime& committedOpTime); virtual OpTime getLastCommittedOpTime() const; + virtual bool canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const override; virtual void completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm); virtual void adjustMaintenanceCountBy(int inc); virtual void prepareSyncFromResponse(const HostAndPort& target, diff --git a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp index 421b5d8b4ca..cba99624ffe 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp @@ -110,11 +110,11 @@ protected: stdx::bind(stringContains, stdx::placeholders::_1, needle)); } - void makeSelfPrimary(const Timestamp& electionTimestamp = Timestamp(0, 0), - const OpTime& firstOpTimeOfTerm = OpTime()) { + void makeSelfPrimary(const Timestamp& electionTimestamp = Timestamp(0, 0)) { getTopoCoord().changeMemberState_forTest(MemberState::RS_PRIMARY, electionTimestamp); getTopoCoord()._setCurrentPrimaryForTest(_selfIndex); - getTopoCoord().completeTransitionToPrimary(firstOpTimeOfTerm); + OpTime dummyOpTime(Timestamp(1, 1), getTopoCoord().getTerm()); + getTopoCoord().completeTransitionToPrimary(dummyOpTime); } void setMyOpTime(const OpTime& opTime) { |