summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorSpencer T Brody <spencer@mongodb.com>2017-09-29 13:25:32 -0400
committerSpencer T Brody <spencer@mongodb.com>2017-10-04 12:03:47 -0400
commitd1153c137c6422b7f9333aefaacb536cc664aae5 (patch)
tree0c55dd3b6fae7a1576a8073c543344e7d3b260cf /src/mongo/db
parent763e9a4a675668ba3c0d3836321f53c77950cc53 (diff)
downloadmongo-d1153c137c6422b7f9333aefaacb536cc664aae5.tar.gz
SERVER-31330 Prevent stepdown during drain mode from triggering an invariant
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp9
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h3
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp8
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp10
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp22
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp15
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp122
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp28
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.h17
-rw-r--r--src/mongo/db/repl/topology_coordinator.h5
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp21
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.h1
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp6
13 files changed, 193 insertions, 74 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index a40d7612893..4bdfee93c5b 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -198,6 +198,10 @@ void ReplicationCoordinatorExternalStateMock::setStoreLocalLastVoteDocumentToHan
}
}
+void ReplicationCoordinatorExternalStateMock::setFirstOpTimeOfMyTerm(const OpTime& opTime) {
+ _firstOpTimeOfMyTerm = opTime;
+}
+
void ReplicationCoordinatorExternalStateMock::closeConnections() {
_connectionsClosed = true;
}
@@ -278,9 +282,8 @@ void ReplicationCoordinatorExternalStateMock::onDrainComplete(OperationContext*
OpTime ReplicationCoordinatorExternalStateMock::onTransitionToPrimary(OperationContext* opCtx,
bool isV1ElectionProtocol) {
- if (isV1ElectionProtocol) {
- _lastOpTime = OpTime(Timestamp(1, 0), 1);
- }
+ _lastOpTime = _firstOpTimeOfMyTerm;
+ _firstOpTimeOfMyTerm = OpTime();
return fassertStatusOK(40297, _lastOpTime);
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index 613389e1761..81eb5d634c3 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -152,6 +152,8 @@ public:
*/
void setStoreLocalLastVoteDocumentToHang(bool hang);
+ void setFirstOpTimeOfMyTerm(const OpTime& opTime);
+
/**
* Returns true if startThreads() has been called.
*/
@@ -199,6 +201,7 @@ private:
bool _threadsStarted;
bool _isReadCommittedSupported = true;
bool _areSnapshotsEnabled = true;
+ OpTime _firstOpTimeOfMyTerm;
};
} // namespace repl
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 27be3f605e3..cb7d44623b3 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -939,8 +939,10 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx,
Lock::GlobalWrite globalWriteLock(opCtx);
lk.lock();
- // Exit drain mode when the buffer is empty in the current term and we're in Draining mode.
- if (_applierState != ApplierState::Draining || termWhenBufferIsEmpty != _topCoord->getTerm()) {
+ // Exit drain mode only if we're actually in draining mode, the apply buffer is empty in the
+ // current term, and we're allowed to become the write master.
+ if (_applierState != ApplierState::Draining ||
+ !_topCoord->canCompleteTransitionToPrimary(termWhenBufferIsEmpty)) {
return;
}
_applierState = ApplierState::Stopped;
@@ -961,9 +963,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* opCtx,
_updateLastCommittedOpTime_inlock();
// Update _canAcceptNonLocalWrites
- invariant(!_canAcceptNonLocalWrites);
_updateMemberStateFromTopologyCoordinator_inlock(opCtx);
- invariant(_canAcceptNonLocalWrites);
log() << "transition to primary complete; database writes are now permitted" << rsLog;
_drainFinishedCond.notify_all();
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp
index 424f8c88689..e449665a483 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp
@@ -135,12 +135,12 @@ TEST_F(ReplCoordElectTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) {
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
// Fake OpTime from initiate, or a write op.
- getExternalState()->setLastOpTime(OpTime{{0, 0}, 0});
+ getExternalState()->setLastOpTime(OpTime{{0, 0}, -1});
ASSERT(getReplCoord()->getMemberState().secondary())
<< getReplCoord()->getMemberState().toString();
- getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), -1));
NetworkInterfaceMock* net = getNet();
net->enterNetwork();
@@ -188,7 +188,7 @@ TEST_F(ReplCoordElectTest, ElectionSucceedsWhenNodeIsTheOnlyNode) {
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
// Fake OpTime from initiate, or a write op.
- getExternalState()->setLastOpTime(OpTime{{0, 0}, 0});
+ getExternalState()->setLastOpTime(OpTime{{0, 0}, -1});
ASSERT(getReplCoord()->getMemberState().primary())
<< getReplCoord()->getMemberState().toString();
@@ -222,8 +222,8 @@ TEST_F(ReplCoordElectTest, ElectionSucceedsWhenAllNodesVoteYea) {
<< "node3:12345")));
assertStartSuccess(configObj, HostAndPort("node1", 12345));
OperationContextNoop opCtx;
- getReplCoord()->setMyLastAppliedOpTime(OpTime{{100, 1}, 0});
- getExternalState()->setLastOpTime(OpTime{{100, 1}, 0});
+ getReplCoord()->setMyLastAppliedOpTime(OpTime{{100, 1}, -1});
+ getExternalState()->setLastOpTime(OpTime{{100, 1}, -1});
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
startCapturingLogMessages();
diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
index 3b6f9500378..fd4db006a74 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_elect_v1_test.cpp
@@ -171,7 +171,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) {
getReplCoord()->fillIsMasterForReplSet(&imResponse);
ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString();
- getReplCoord()->signalDrainComplete(&opCtx, getReplCoord()->getTerm());
+ signalDrainComplete(&opCtx);
getReplCoord()->fillIsMasterForReplSet(&imResponse);
ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString();
@@ -233,7 +233,7 @@ TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyNode) {
getReplCoord()->fillIsMasterForReplSet(&imResponse);
ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString();
- getReplCoord()->signalDrainComplete(&opCtx, getReplCoord()->getTerm());
+ signalDrainComplete(&opCtx);
getReplCoord()->fillIsMasterForReplSet(&imResponse);
ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString();
@@ -2160,7 +2160,7 @@ TEST_F(PrimaryCatchUpTest, PrimaryDoesNotNeedToCatchUp) {
stopCapturingLogMessages();
ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest optime known via heartbeats"));
auto opCtx = makeOperationContext();
- getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
+ signalDrainComplete(opCtx.get());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
@@ -2183,7 +2183,7 @@ TEST_F(PrimaryCatchUpTest, CatchupSucceeds) {
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest known optime successfully"));
auto opCtx = makeOperationContext();
- getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
+ signalDrainComplete(opCtx.get());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
@@ -2203,7 +2203,7 @@ TEST_F(PrimaryCatchUpTest, CatchupTimeout) {
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Catchup timed out"));
auto opCtx = makeOperationContext();
- getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
+ signalDrainComplete(opCtx.get());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
@@ -2228,7 +2228,7 @@ TEST_F(PrimaryCatchUpTest, CannotSeeAllNodes) {
stopCapturingLogMessages();
ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest optime known via heartbeats"));
auto opCtx = makeOperationContext();
- getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
+ signalDrainComplete(opCtx.get());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
@@ -2253,7 +2253,7 @@ TEST_F(PrimaryCatchUpTest, HeartbeatTimeout) {
stopCapturingLogMessages();
ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest optime known via heartbeats"));
auto opCtx = makeOperationContext();
- getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
+ signalDrainComplete(opCtx.get());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
@@ -2352,7 +2352,7 @@ TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) {
Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
ASSERT_FALSE(replCoord->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
- replCoord->signalDrainComplete(opCtx.get(), replCoord->getTerm());
+ signalDrainComplete(opCtx.get());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
ASSERT(replCoord->getApplierState() == ApplierState::Stopped);
ASSERT_TRUE(replCoord->canAcceptWritesForDatabase(opCtx.get(), "test"));
@@ -2416,7 +2416,7 @@ TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) {
stopCapturingLogMessages();
ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest"));
auto opCtx = makeOperationContext();
- getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
+ signalDrainComplete(opCtx.get());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
@@ -2461,7 +2461,7 @@ TEST_F(PrimaryCatchUpTest, InfiniteTimeoutAndAbort) {
ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest"));
ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out"));
auto opCtx = makeOperationContext();
- getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
+ signalDrainComplete(opCtx.get());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
@@ -2475,7 +2475,7 @@ TEST_F(PrimaryCatchUpTest, ZeroTimeout) {
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Skipping primary catchup"));
auto opCtx = makeOperationContext();
- getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
+ signalDrainComplete(opCtx.get());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 1b02badec85..84ea786227a 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -371,7 +371,20 @@ void ReplicationCoordinatorImpl::_stepDownFinish(
return;
}
- MONGO_FAIL_POINT_PAUSE_WHILE_SET(blockHeartbeatStepdown);
+ if (MONGO_FAIL_POINT(blockHeartbeatStepdown)) {
+ // This log output is used in js tests so please leave it.
+ log() << "stepDown - blockHeartbeatStepdown fail point enabled. "
+ "Blocking until fail point is disabled.";
+
+ auto inShutdown = [&] {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _inShutdown;
+ };
+
+ while (MONGO_FAIL_POINT(blockHeartbeatStepdown) && !inShutdown()) {
+ mongo::sleepsecs(1);
+ }
+ }
auto opCtx = cc().makeOperationContext();
Lock::GlobalWrite globalExclusiveLock(opCtx.get());
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index f493b053c09..2684f002eaa 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -102,20 +102,6 @@ struct OpTimeWithTermOne {
Timestamp timestamp;
};
-void runSingleNodeElection(ServiceContext::UniqueOperationContext opCtx,
- ReplicationCoordinatorImpl* replCoord,
- executor::NetworkInterfaceMock* net) {
- replCoord->setMyLastAppliedOpTime(OpTime(Timestamp(1, 0), 0));
- replCoord->setMyLastDurableOpTime(OpTime(Timestamp(1, 0), 0));
- ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
- replCoord->waitForElectionFinish_forTest();
-
- ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining);
- ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString();
-
- replCoord->signalDrainComplete(opCtx.get(), replCoord->getTerm());
-}
-
/**
* Helper that kills an operation, taking the necessary locks.
*/
@@ -1597,6 +1583,54 @@ TEST_F(ReplCoordTest, ConcurrentStepDownShouldNotSignalTheSameFinishEventMoreTha
}
}
+TEST_F(ReplCoordTest, DrainCompletionMidStepDown) {
+ init("mySet/test1:1234,test2:1234,test3:1234");
+
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version"
+ << 1
+ << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "test1:1234")
+ << BSON("_id" << 1 << "host"
+ << "test2:1234")
+ << BSON("_id" << 2 << "host"
+ << "test3:1234"))
+ << "protocolVersion"
+ << 1),
+ HostAndPort("test1", 1234));
+ getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
+ ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+
+ simulateSuccessfulV1ElectionWithoutExitingDrainMode(
+ getReplCoord()->getElectionTimeout_forTest());
+
+ ASSERT_EQUALS(1, getReplCoord()->getTerm());
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+
+ // Now update term to trigger a stepdown.
+ TopologyCoordinator::UpdateTermResult termUpdated;
+ auto updateTermEvh = getReplCoord()->updateTerm_forTest(2, &termUpdated);
+ ASSERT(updateTermEvh.isValid());
+ ASSERT(termUpdated == TopologyCoordinator::UpdateTermResult::kTriggerStepDown);
+
+ // Now signal that replication applier is finished draining its buffer.
+ const auto opCtx = makeOperationContext();
+ getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
+
+ // Now wait for stepdown to complete
+ getReplExec()->waitForEvent(updateTermEvh);
+
+ // By now drain mode should be cancelled.
+ ASSERT_OK(getReplCoord()->waitForDrainFinish(Milliseconds(0)));
+
+ ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+ // ASSERT_EQUALS(2, getReplCoord()->getTerm()); // SERVER-28290
+}
+
TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) {
const auto opCtx = makeOperationContext();
@@ -1824,8 +1858,8 @@ TEST_F(ReplCoordTest, NodeBecomesPrimaryAgainWhenStepDownTimeoutExpiresInASingle
<< BSON_ARRAY(BSON("_id" << 0 << "host"
<< "test1:1234"))),
HostAndPort("test1", 1234));
- runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
- const auto opCtx = makeOperationContext();
+ auto opCtx = makeOperationContext();
+ runSingleNodeElection(opCtx.get());
ASSERT_OK(getReplCoord()->stepDown(opCtx.get(), true, Milliseconds(0), Milliseconds(1000)));
getNet()->enterNetwork(); // Must do this before inspecting the topocoord
@@ -1856,8 +1890,8 @@ TEST_F(
<< BSON_ARRAY(BSON("_id" << 0 << "host"
<< "test1:1234"))),
HostAndPort("test1", 1234));
- runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
const auto opCtx = makeOperationContext();
+ runSingleNodeElection(opCtx.get());
ASSERT_OK(getReplCoord()->stepDown(opCtx.get(), true, Milliseconds(0), Milliseconds(1000)));
getNet()->enterNetwork(); // Must do this before inspecting the topocoord
@@ -2857,15 +2891,14 @@ TEST_F(ReplCoordTest, IsMasterWithCommittedSnapshot) {
<< BSON_ARRAY(BSON("_id" << 0 << "host"
<< "test1:1234"))),
HostAndPort("test1", 1234));
- runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
+ auto opCtx = makeOperationContext();
+ runSingleNodeElection(opCtx.get());
time_t lastWriteDate = 101;
OpTime opTime = OpTime(Timestamp(lastWriteDate, 2), 1);
time_t majorityWriteDate = 100;
OpTime majorityOpTime = OpTime(Timestamp(majorityWriteDate, 1), 1);
- auto opCtx = makeOperationContext();
-
getReplCoord()->setMyLastAppliedOpTime(opTime);
getReplCoord()->setMyLastDurableOpTime(opTime);
getReplCoord()->createSnapshot(opCtx.get(), majorityOpTime, SnapshotName(1));
@@ -4095,12 +4128,12 @@ TEST_F(ReplCoordTest, ReadAfterCommittedWhileShutdown) {
<< 0))),
HostAndPort("node1", 12345));
- runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
+ auto opCtx = makeOperationContext();
+ runSingleNodeElection(opCtx.get());
getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), 0));
getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(10, 0), 0));
- auto opCtx = makeOperationContext();
shutdown(opCtx.get());
auto status = getReplCoord()->waitUntilOpTimeForRead(
@@ -4120,8 +4153,8 @@ TEST_F(ReplCoordTest, ReadAfterCommittedInterrupted) {
<< "_id"
<< 0))),
HostAndPort("node1", 12345));
- runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
const auto opCtx = makeOperationContext();
+ runSingleNodeElection(opCtx.get());
getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), 0));
getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(10, 0), 0));
@@ -4143,9 +4176,9 @@ TEST_F(ReplCoordTest, ReadAfterCommittedGreaterOpTime) {
<< "_id"
<< 0))),
HostAndPort("node1", 12345));
- runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
-
auto opCtx = makeOperationContext();
+ runSingleNodeElection(opCtx.get());
+
getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(100, 0), 1));
getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(100, 0), 1));
getReplCoord()->createSnapshot(opCtx.get(), OpTime(Timestamp(100, 0), 1), SnapshotName(1));
@@ -4166,8 +4199,8 @@ TEST_F(ReplCoordTest, ReadAfterCommittedEqualOpTime) {
<< "_id"
<< 0))),
HostAndPort("node1", 12345));
- runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
auto opCtx = makeOperationContext();
+ runSingleNodeElection(opCtx.get());
OpTime time(Timestamp(100, 0), 1);
getReplCoord()->setMyLastAppliedOpTime(time);
@@ -4189,7 +4222,9 @@ TEST_F(ReplCoordTest, ReadAfterCommittedDeferredGreaterOpTime) {
<< "_id"
<< 0))),
HostAndPort("node1", 12345));
- runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
+
+ auto opCtx = makeOperationContext();
+ runSingleNodeElection(opCtx.get());
getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(0, 0), 1));
getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(0, 0), 1));
OpTime committedOpTime(Timestamp(200, 0), 1);
@@ -4200,8 +4235,6 @@ TEST_F(ReplCoordTest, ReadAfterCommittedDeferredGreaterOpTime) {
getReplCoord()->createSnapshot(nullptr, committedOpTime, SnapshotName(1));
});
- auto opCtx = makeOperationContext();
-
ASSERT_OK(getReplCoord()->waitUntilOpTimeForRead(
opCtx.get(),
ReadConcernArgs(OpTime(Timestamp(100, 0), 1), ReadConcernLevel::kMajorityReadConcern)));
@@ -4218,7 +4251,8 @@ TEST_F(ReplCoordTest, ReadAfterCommittedDeferredEqualOpTime) {
<< "_id"
<< 0))),
HostAndPort("node1", 12345));
- runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
+ auto opCtx = makeOperationContext();
+ runSingleNodeElection(opCtx.get());
getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(0, 0), 1));
getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(0, 0), 1));
@@ -4231,8 +4265,6 @@ TEST_F(ReplCoordTest, ReadAfterCommittedDeferredEqualOpTime) {
getReplCoord()->createSnapshot(nullptr, opTimeToWait, SnapshotName(1));
});
- auto opCtx = makeOperationContext();
-
ASSERT_OK(getReplCoord()->waitUntilOpTimeForRead(
opCtx.get(), ReadConcernArgs(opTimeToWait, ReadConcernLevel::kMajorityReadConcern)));
pseudoLogOp.get();
@@ -4985,7 +5017,9 @@ TEST_F(ReplCoordTest, AdvanceCommittedSnapshotToMostRecentSnapshotPriorToOpTimeW
<< BSON_ARRAY(BSON("_id" << 0 << "host"
<< "test1:1234"))),
HostAndPort("test1", 1234));
- runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
+
+ auto opCtx = makeOperationContext();
+ runSingleNodeElection(opCtx.get());
OpTime time1(Timestamp(100, 1), 1);
OpTime time2(Timestamp(100, 2), 1);
@@ -4993,7 +5027,6 @@ TEST_F(ReplCoordTest, AdvanceCommittedSnapshotToMostRecentSnapshotPriorToOpTimeW
OpTime time4(Timestamp(100, 4), 1);
OpTime time5(Timestamp(100, 5), 1);
OpTime time6(Timestamp(100, 6), 1);
- auto opCtx = makeOperationContext();
getReplCoord()->createSnapshot(opCtx.get(), time1, SnapshotName(1));
getReplCoord()->createSnapshot(opCtx.get(), time2, SnapshotName(2));
@@ -5019,7 +5052,8 @@ TEST_F(ReplCoordTest, DoNotAdvanceCommittedSnapshotWhenAnOpTimeIsNewerThanOurLat
<< BSON_ARRAY(BSON("_id" << 0 << "host"
<< "test1:1234"))),
HostAndPort("test1", 1234));
- runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
+ auto opCtx = makeOperationContext();
+ runSingleNodeElection(opCtx.get());
OpTime time1(Timestamp(100, 1), 1);
OpTime time2(Timestamp(100, 2), 1);
@@ -5027,7 +5061,6 @@ TEST_F(ReplCoordTest, DoNotAdvanceCommittedSnapshotWhenAnOpTimeIsNewerThanOurLat
OpTime time4(Timestamp(100, 4), 1);
OpTime time5(Timestamp(100, 5), 1);
OpTime time6(Timestamp(100, 6), 1);
- auto opCtx = makeOperationContext();
getReplCoord()->createSnapshot(opCtx.get(), time1, SnapshotName(1));
getReplCoord()->createSnapshot(opCtx.get(), time2, SnapshotName(2));
@@ -5051,7 +5084,9 @@ TEST_F(ReplCoordTest,
<< BSON_ARRAY(BSON("_id" << 0 << "host"
<< "test1:1234"))),
HostAndPort("test1", 1234));
- runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
+
+ auto opCtx = makeOperationContext();
+ runSingleNodeElection(opCtx.get());
OpTime time1(Timestamp(100, 1), 1);
OpTime time2(Timestamp(100, 2), 1);
@@ -5059,7 +5094,6 @@ TEST_F(ReplCoordTest,
OpTime time4(Timestamp(100, 4), 1);
OpTime time5(Timestamp(100, 5), 1);
OpTime time6(Timestamp(100, 6), 1);
- auto opCtx = makeOperationContext();
getReplCoord()->createSnapshot(opCtx.get(), time1, SnapshotName(1));
getReplCoord()->createSnapshot(opCtx.get(), time2, SnapshotName(2));
@@ -5085,7 +5119,9 @@ TEST_F(ReplCoordTest, ZeroCommittedSnapshotWhenAllSnapshotsAreDropped) {
<< BSON_ARRAY(BSON("_id" << 0 << "host"
<< "test1:1234"))),
HostAndPort("test1", 1234));
- runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
+
+ auto opCtx = makeOperationContext();
+ runSingleNodeElection(opCtx.get());
OpTime time1(Timestamp(100, 1), 1);
OpTime time2(Timestamp(100, 2), 1);
@@ -5093,7 +5129,6 @@ TEST_F(ReplCoordTest, ZeroCommittedSnapshotWhenAllSnapshotsAreDropped) {
OpTime time4(Timestamp(100, 4), 1);
OpTime time5(Timestamp(100, 5), 1);
OpTime time6(Timestamp(100, 6), 1);
- auto opCtx = makeOperationContext();
getReplCoord()->createSnapshot(opCtx.get(), time1, SnapshotName(1));
getReplCoord()->createSnapshot(opCtx.get(), time2, SnapshotName(2));
@@ -5115,11 +5150,12 @@ TEST_F(ReplCoordTest, DoNotAdvanceCommittedSnapshotWhenAppliedOpTimeChanges) {
<< BSON_ARRAY(BSON("_id" << 0 << "host"
<< "test1:1234"))),
HostAndPort("test1", 1234));
- runSingleNodeElection(makeOperationContext(), getReplCoord(), getNet());
+
+ auto opCtx = makeOperationContext();
+ runSingleNodeElection(opCtx.get());
OpTime time1(Timestamp(100, 1), 1);
OpTime time2(Timestamp(100, 2), 1);
- auto opCtx = makeOperationContext();
getReplCoord()->createSnapshot(opCtx.get(), time1, SnapshotName(1));
@@ -5510,7 +5546,7 @@ TEST_F(ReplCoordTest, WaitForDrainFinish) {
ASSERT_EQUALS(ErrorCodes::BadValue, replCoord->waitForDrainFinish(Milliseconds(-1)));
const auto opCtx = makeOperationContext();
- replCoord->signalDrainComplete(opCtx.get(), replCoord->getTerm());
+ signalDrainComplete(opCtx.get());
ASSERT_OK(replCoord->waitForDrainFinish(timeout));
// Zero timeout is fine.
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index ffc5af1de2d..abdee75ed34 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -319,7 +319,7 @@ void ReplCoordTest::simulateSuccessfulV1Election() {
simulateSuccessfulV1ElectionAt(electionTimeoutWhen);
}
-void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) {
+void ReplCoordTest::simulateSuccessfulV1ElectionWithoutExitingDrainMode(Date_t electionTime) {
ReplicationCoordinatorImpl* replCoord = getReplCoord();
NetworkInterfaceMock* net = getNet();
@@ -374,11 +374,17 @@ void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) {
replCoord->fillIsMasterForReplSet(&imResponse);
ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString();
+}
+void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) {
+ simulateSuccessfulV1ElectionWithoutExitingDrainMode(electionTime);
+ ReplicationCoordinatorImpl* replCoord = getReplCoord();
+
{
auto opCtx = makeOperationContext();
- replCoord->signalDrainComplete(opCtx.get(), replCoord->getTerm());
+ signalDrainComplete(opCtx.get());
}
ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped);
+ IsMasterResponse imResponse;
replCoord->fillIsMasterForReplSet(&imResponse);
ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString();
@@ -448,6 +454,24 @@ void ReplCoordTest::simulateSuccessfulElection() {
ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString();
}
+void ReplCoordTest::signalDrainComplete(OperationContext* opCtx) {
+ getExternalState()->setFirstOpTimeOfMyTerm(OpTime(Timestamp(1, 1), getReplCoord()->getTerm()));
+ getReplCoord()->signalDrainComplete(opCtx, getReplCoord()->getTerm());
+}
+
+void ReplCoordTest::runSingleNodeElection(OperationContext* opCtx) {
+ getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(1, 0), 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(1, 0), 0));
+ ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ getReplCoord()->waitForElectionFinish_forTest();
+
+ ASSERT(getReplCoord()->getApplierState() == ReplicationCoordinator::ApplierState::Draining);
+ ASSERT(getReplCoord()->getMemberState().primary())
+ << getReplCoord()->getMemberState().toString();
+
+ signalDrainComplete(opCtx);
+}
+
void ReplCoordTest::shutdown(OperationContext* opCtx) {
invariant(_callShutdown);
_net->exitNetwork();
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h
index e1aaa9ca6d1..94b6a7ed6c6 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.h
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h
@@ -237,6 +237,23 @@ protected:
void simulateSuccessfulV1ElectionAt(Date_t electionTime);
/**
+ * When the test has been configured with a replica set config with a single member, use this
+ * to put that single member into state PRIMARY.
+ */
+ void runSingleNodeElection(OperationContext* opCtx);
+
+ /**
+ * Same as simulateSuccessfulV1ElectionAt, but stops short of signaling drain completion,
+ * so the node stays in drain mode.
+ */
+ void simulateSuccessfulV1ElectionWithoutExitingDrainMode(Date_t electionTime);
+
+ /**
+ * Transition the ReplicationCoordinator from drain mode to being fully primary/master.
+ */
+ void signalDrainComplete(OperationContext* opCtx);
+
+ /**
* Shuts down the objects under test.
*/
void shutdown(OperationContext* opCtx);
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 0716e04a546..a60318df57f 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -267,6 +267,11 @@ public:
virtual OpTime getLastCommittedOpTime() const = 0;
/**
+ * Returns true if it's safe to transition to LeaderMode::kMaster.
+ */
+ virtual bool canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const = 0;
+
+ /**
* Called by the ReplicationCoordinator to signal that we have finished catchup and drain modes
* and are ready to fully become primary and start accepting writes.
* "firstOpTimeOfTerm" is a floor on the OpTimes this node will be allowed to consider committed
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index f6e1d24c5af..c47da370350 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -2886,9 +2886,26 @@ OpTime TopologyCoordinatorImpl::getLastCommittedOpTime() const {
return _lastCommittedOpTime;
}
+bool TopologyCoordinatorImpl::canCompleteTransitionToPrimary(
+ long long termWhenDrainCompleted) const {
+
+ if (termWhenDrainCompleted != _term) {
+ return false;
+ }
+ // Allow completing the transition to primary even when in the middle of a stepdown attempt,
+ // in case the stepdown attempt fails.
+ if (_leaderMode != LeaderMode::kLeaderElect && _leaderMode != LeaderMode::kAttemptingStepDown) {
+ return false;
+ }
+
+ return true;
+}
+
void TopologyCoordinatorImpl::completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm) {
- invariant(_leaderMode == LeaderMode::kLeaderElect);
- _setLeaderMode(LeaderMode::kMaster);
+ invariant(canCompleteTransitionToPrimary(firstOpTimeOfTerm.getTerm()));
+ if (_leaderMode == LeaderMode::kLeaderElect) {
+ _setLeaderMode(LeaderMode::kMaster);
+ }
_firstOpTimeOfMyTerm = firstOpTimeOfTerm;
}
diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h
index aa30c194b1a..26dda7281db 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.h
+++ b/src/mongo/db/repl/topology_coordinator_impl.h
@@ -168,6 +168,7 @@ public:
virtual bool updateLastCommittedOpTime();
virtual bool advanceLastCommittedOpTime(const OpTime& committedOpTime);
virtual OpTime getLastCommittedOpTime() const;
+ virtual bool canCompleteTransitionToPrimary(long long termWhenDrainCompleted) const override;
virtual void completeTransitionToPrimary(const OpTime& firstOpTimeOfTerm);
virtual void adjustMaintenanceCountBy(int inc);
virtual void prepareSyncFromResponse(const HostAndPort& target,
diff --git a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
index 421b5d8b4ca..cba99624ffe 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_v1_test.cpp
@@ -110,11 +110,11 @@ protected:
stdx::bind(stringContains, stdx::placeholders::_1, needle));
}
- void makeSelfPrimary(const Timestamp& electionTimestamp = Timestamp(0, 0),
- const OpTime& firstOpTimeOfTerm = OpTime()) {
+ void makeSelfPrimary(const Timestamp& electionTimestamp = Timestamp(0, 0)) {
getTopoCoord().changeMemberState_forTest(MemberState::RS_PRIMARY, electionTimestamp);
getTopoCoord()._setCurrentPrimaryForTest(_selfIndex);
- getTopoCoord().completeTransitionToPrimary(firstOpTimeOfTerm);
+ OpTime dummyOpTime(Timestamp(1, 1), getTopoCoord().getTerm());
+ getTopoCoord().completeTransitionToPrimary(dummyOpTime);
}
void setMyOpTime(const OpTime& opTime) {