/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
#include "mongo/platform/basic.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context_noop.h"
#include "mongo/db/repl/is_master_response.h"
#include "mongo/db/repl/repl_set_config.h"
#include "mongo/db/repl/repl_set_heartbeat_args_v1.h"
#include "mongo/db/repl/repl_set_heartbeat_response.h"
#include "mongo/db/repl/replication_coordinator_external_state_mock.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_coordinator_test_fixture.h"
#include "mongo/db/repl/topology_coordinator_impl.h"
#include "mongo/executor/network_interface_mock.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
namespace mongo {
namespace repl {
namespace {
using executor::NetworkInterfaceMock;
using executor::RemoteCommandRequest;
using executor::RemoteCommandResponse;
using ApplierState = ReplicationCoordinator::ApplierState;
TEST_F(ReplCoordTest, RandomizedElectionOffsetWithinProperBounds) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345")));
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
Milliseconds electionTimeout = config.getElectionTimeoutPeriod();
long long randomOffsetUpperBound = durationCount(electionTimeout) *
getExternalState()->getElectionTimeoutOffsetLimitFraction();
Milliseconds randomOffset;
// Verify for numerous rounds of random number generation.
int rounds = 1000;
for (int i = 0; i < rounds; i++) {
randomOffset = getReplCoord()->getRandomizedElectionOffset_forTest();
ASSERT_GREATER_THAN_OR_EQUALS(randomOffset, Milliseconds(0));
ASSERT_LESS_THAN_OR_EQUALS(randomOffset, Milliseconds(randomOffsetUpperBound));
}
}
TEST_F(ReplCoordTest, RandomizedElectionOffsetAvoidsDivideByZero) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345"))
<< "protocolVersion"
<< 1
<< "settings"
<< BSON("electionTimeoutMillis" << 1));
assertStartSuccess(configObj, HostAndPort("node1", 12345));
// Make sure that an election timeout of 1ms doesn't make the random number
// generator attempt to divide by zero.
Milliseconds randomOffset = getReplCoord()->getRandomizedElectionOffset_forTest();
ASSERT_EQ(Milliseconds(0), randomOffset);
}
TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) {
assertStartSuccess(BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345"
<< "votes"
<< 0
<< "hidden"
<< true
<< "priority"
<< 0))
<< "protocolVersion"
<< 1),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
ASSERT(getReplCoord()->getMemberState().secondary())
<< getReplCoord()->getMemberState().toString();
getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), 0));
getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(10, 0), 0));
auto electionTimeoutWhen = getReplCoord()->getElectionTimeout_forTest();
ASSERT_NOT_EQUALS(Date_t(), electionTimeoutWhen);
log() << "Election timeout scheduled at " << electionTimeoutWhen << " (simulator time)";
NetworkInterfaceMock* net = getNet();
net->enterNetwork();
while (net->now() < electionTimeoutWhen) {
net->runUntil(electionTimeoutWhen);
if (!net->hasReadyRequests()) {
continue;
}
auto noi = net->getNextReadyRequest();
const auto& request = noi->getRequest();
error() << "Black holing irrelevant request to " << request.target << ": "
<< request.cmdObj;
net->blackHole(noi);
}
net->exitNetwork();
// _startElectSelfV1 is called when election timeout expires, so election
// finished event has been set.
getReplCoord()->waitForElectionFinish_forTest();
ASSERT(getReplCoord()->getMemberState().primary())
<< getReplCoord()->getMemberState().toString();
simulateCatchUpAbort();
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
const auto opCtxPtr = makeOperationContext();
auto& opCtx = *opCtxPtr;
// Since we're still in drain mode, expect that we report ismaster: false, issecondary:true.
IsMasterResponse imResponse;
getReplCoord()->fillIsMasterForReplSet(&imResponse);
ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString();
getReplCoord()->signalDrainComplete(&opCtx, getReplCoord()->getTerm());
getReplCoord()->fillIsMasterForReplSet(&imResponse);
ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString();
}
TEST_F(ReplCoordTest, StartElectionDoesNotStartAnElectionWhenNodeIsRecovering) {
assertStartSuccess(BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345"))
<< "protocolVersion"
<< 1),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_RECOVERING));
ASSERT(getReplCoord()->getMemberState().recovering())
<< getReplCoord()->getMemberState().toString();
getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), 0));
getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(10, 0), 0));
simulateEnoughHeartbeatsForAllNodesUp();
auto electionTimeoutWhen = getReplCoord()->getElectionTimeout_forTest();
ASSERT_EQUALS(Date_t(), electionTimeoutWhen);
}
TEST_F(ReplCoordTest, ElectionSucceedsWhenNodeIsTheOnlyNode) {
startCapturingLogMessages();
assertStartSuccess(BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345"))
<< "protocolVersion"
<< 1),
HostAndPort("node1", 12345));
getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), 0));
getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(10, 0), 0));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->waitForElectionFinish_forTest();
ASSERT(getReplCoord()->getMemberState().primary())
<< getReplCoord()->getMemberState().toString();
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
const auto opCtxPtr = makeOperationContext();
auto& opCtx = *opCtxPtr;
// Since we're still in drain mode, expect that we report ismaster: false, issecondary:true.
IsMasterResponse imResponse;
getReplCoord()->fillIsMasterForReplSet(&imResponse);
ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString();
getReplCoord()->signalDrainComplete(&opCtx, getReplCoord()->getTerm());
getReplCoord()->fillIsMasterForReplSet(&imResponse);
ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString();
}
TEST_F(ReplCoordTest, ElectionSucceedsWhenAllNodesVoteYea) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345"))
<< "protocolVersion"
<< 1);
assertStartSuccess(configObj, HostAndPort("node1", 12345));
OperationContextNoop opCtx;
getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
startCapturingLogMessages();
simulateSuccessfulV1Election();
getReplCoord()->waitForElectionFinish_forTest();
// Check last vote
auto lastVote = getExternalState()->loadLocalLastVoteDocument(nullptr);
ASSERT(lastVote.isOK());
ASSERT_EQ(0, lastVote.getValue().getCandidateIndex());
ASSERT_EQ(1, lastVote.getValue().getTerm());
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("election succeeded"));
}
TEST_F(ReplCoordTest, ElectionSucceedsWhenMaxSevenNodesVoteYea) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345")
<< BSON("_id" << 4 << "host"
<< "node4:12345")
<< BSON("_id" << 5 << "host"
<< "node5:12345")
<< BSON("_id" << 6 << "host"
<< "node6:12345")
<< BSON("_id" << 7 << "host"
<< "node7:12345"))
<< "protocolVersion"
<< 1);
assertStartSuccess(configObj, HostAndPort("node1", 12345));
OperationContextNoop opCtx;
getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0));
getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
startCapturingLogMessages();
simulateSuccessfulV1Election();
getReplCoord()->waitForElectionFinish_forTest();
// Check last vote
auto lastVote = getExternalState()->loadLocalLastVoteDocument(nullptr);
ASSERT(lastVote.isOK());
ASSERT_EQ(0, lastVote.getValue().getCandidateIndex());
ASSERT_EQ(1, lastVote.getValue().getTerm());
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("election succeeded"));
}
TEST_F(ReplCoordTest, ElectionFailsWhenInsufficientVotesAreReceivedDuringDryRun) {
startCapturingLogMessages();
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345"))
<< "protocolVersion"
<< 1);
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
OperationContextNoop opCtx;
OpTime time1(Timestamp(100, 1), 0);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
simulateEnoughHeartbeatsForAllNodesUp();
auto electionTimeoutWhen = getReplCoord()->getElectionTimeout_forTest();
ASSERT_NOT_EQUALS(Date_t(), electionTimeoutWhen);
log() << "Election timeout scheduled at " << electionTimeoutWhen << " (simulator time)";
int voteRequests = 0;
NetworkInterfaceMock* net = getNet();
net->enterNetwork();
while (voteRequests < 2) {
if (net->now() < electionTimeoutWhen) {
net->runUntil(electionTimeoutWhen);
}
ASSERT_TRUE(net->hasReadyRequests());
const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
log() << request.target.toString() << " processing " << request.cmdObj;
if (request.cmdObj.firstElement().fieldNameStringData() != "replSetRequestVotes") {
net->blackHole(noi);
} else {
net->scheduleResponse(noi,
net->now(),
makeResponseStatus(BSON(
"ok" << 1 << "term" << 0 << "voteGranted" << false << "reason"
<< "don't like him much")));
voteRequests++;
}
net->runReadyNetworkOperations();
}
net->exitNetwork();
stopCapturingLogMessages();
ASSERT_EQUALS(
1, countLogLinesContaining("not running for primary, we received insufficient votes"));
}
TEST_F(ReplCoordTest, ElectionFailsWhenDryRunResponseContainsANewerTerm) {
startCapturingLogMessages();
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345"))
<< "protocolVersion"
<< 1);
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
OperationContextNoop opCtx;
OpTime time1(Timestamp(100, 1), 0);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
simulateEnoughHeartbeatsForAllNodesUp();
auto electionTimeoutWhen = getReplCoord()->getElectionTimeout_forTest();
ASSERT_NOT_EQUALS(Date_t(), electionTimeoutWhen);
log() << "Election timeout scheduled at " << electionTimeoutWhen << " (simulator time)";
int voteRequests = 0;
NetworkInterfaceMock* net = getNet();
net->enterNetwork();
while (voteRequests < 1) {
if (net->now() < electionTimeoutWhen) {
net->runUntil(electionTimeoutWhen);
}
ASSERT_TRUE(net->hasReadyRequests());
const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
log() << request.target.toString() << " processing " << request.cmdObj;
if (request.cmdObj.firstElement().fieldNameStringData() != "replSetRequestVotes") {
net->blackHole(noi);
} else {
net->scheduleResponse(
noi,
net->now(),
makeResponseStatus(BSON("ok" << 1 << "term" << request.cmdObj["term"].Long() + 1
<< "voteGranted"
<< false
<< "reason"
<< "quit living in the past")));
voteRequests++;
}
net->runReadyNetworkOperations();
}
net->exitNetwork();
getReplCoord()->waitForElectionFinish_forTest();
stopCapturingLogMessages();
ASSERT_EQUALS(
1, countLogLinesContaining("not running for primary, we have been superceded already"));
}
TEST_F(ReplCoordTest, NodeWillNotStandForElectionDuringHeartbeatReconfig) {
// start up, receive reconfig via heartbeat while at the same time, become candidate.
// candidate state should be cleared.
OperationContextNoop opCtx;
assertStartSuccess(BSON("_id"
<< "mySet"
<< "version"
<< 2
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345")
<< BSON("_id" << 4 << "host"
<< "node4:12345")
<< BSON("_id" << 5 << "host"
<< "node5:12345"))
<< "protocolVersion"
<< 1),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(100, 0), 0));
getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(100, 0), 0));
getGlobalFailPointRegistry()
->getFailPoint("blockHeartbeatReconfigFinish")
->setMode(FailPoint::alwaysOn);
// hb reconfig
NetworkInterfaceMock* net = getNet();
net->enterNetwork();
ReplSetHeartbeatResponse hbResp2;
ReplSetConfig config;
config
.initialize(BSON("_id"
<< "mySet"
<< "version"
<< 3
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345"))
<< "protocolVersion"
<< 1))
.transitional_ignore();
hbResp2.setConfig(config);
hbResp2.setConfigVersion(3);
hbResp2.setSetName("mySet");
hbResp2.setState(MemberState::RS_SECONDARY);
net->runUntil(net->now() + Seconds(10)); // run until we've sent a heartbeat request
const NetworkInterfaceMock::NetworkOperationIterator noi2 = net->getNextReadyRequest();
net->scheduleResponse(noi2, net->now(), makeResponseStatus(hbResp2.toBSON(true)));
net->runReadyNetworkOperations();
getNet()->exitNetwork();
// prepare candidacy
BSONObjBuilder result;
ReplicationCoordinator::ReplSetReconfigArgs args;
args.force = false;
args.newConfigObj = config.toBSON();
ASSERT_EQUALS(ErrorCodes::ConfigurationInProgress,
getReplCoord()->processReplSetReconfig(&opCtx, args, &result));
logger::globalLogDomain()->setMinimumLoggedSeverity(logger::LogSeverity::Debug(2));
startCapturingLogMessages();
// receive sufficient heartbeats to allow the node to see a majority.
ReplicationCoordinatorImpl* replCoord = getReplCoord();
ReplSetConfig rsConfig = replCoord->getReplicaSetConfig_forTest();
net->enterNetwork();
for (int i = 0; i < 2; ++i) {
const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
log() << request.target.toString() << " processing " << request.cmdObj;
ReplSetHeartbeatArgsV1 hbArgs;
if (hbArgs.initialize(request.cmdObj).isOK()) {
ReplSetHeartbeatResponse hbResp;
hbResp.setSetName(rsConfig.getReplSetName());
hbResp.setState(MemberState::RS_SECONDARY);
hbResp.setConfigVersion(rsConfig.getConfigVersion());
BSONObjBuilder respObj;
net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON(true)));
} else {
error() << "Black holing unexpected request to " << request.target << ": "
<< request.cmdObj;
net->blackHole(noi);
}
net->runReadyNetworkOperations();
}
net->exitNetwork();
// Advance the simulator clock sufficiently to trigger an election.
auto electionTimeoutWhen = getReplCoord()->getElectionTimeout_forTest();
ASSERT_NOT_EQUALS(Date_t(), electionTimeoutWhen);
log() << "Election timeout scheduled at " << electionTimeoutWhen << " (simulator time)";
net->enterNetwork();
while (net->now() < electionTimeoutWhen) {
net->runUntil(electionTimeoutWhen);
if (!net->hasReadyRequests()) {
continue;
}
net->blackHole(net->getNextReadyRequest());
}
net->exitNetwork();
stopCapturingLogMessages();
// ensure node does not stand for election
ASSERT_EQUALS(1,
countLogLinesContaining("Not standing for election; processing "
"a configuration change"));
getGlobalFailPointRegistry()
->getFailPoint("blockHeartbeatReconfigFinish")
->setMode(FailPoint::off);
}
TEST_F(ReplCoordTest, ElectionFailsWhenInsufficientVotesAreReceivedDuringRequestVotes) {
startCapturingLogMessages();
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345"))
<< "protocolVersion"
<< 1);
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
OperationContextNoop opCtx;
OpTime time1(Timestamp(100, 1), 0);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
simulateEnoughHeartbeatsForAllNodesUp();
simulateSuccessfulDryRun();
NetworkInterfaceMock* net = getNet();
net->enterNetwork();
while (net->hasReadyRequests()) {
const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
log() << request.target.toString() << " processing " << request.cmdObj;
if (request.cmdObj.firstElement().fieldNameStringData() != "replSetRequestVotes") {
net->blackHole(noi);
} else {
net->scheduleResponse(noi,
net->now(),
makeResponseStatus(BSON(
"ok" << 1 << "term" << 1 << "voteGranted" << false << "reason"
<< "don't like him much")));
}
net->runReadyNetworkOperations();
}
net->exitNetwork();
getReplCoord()->waitForElectionFinish_forTest();
stopCapturingLogMessages();
ASSERT_EQUALS(1,
countLogLinesContaining("not becoming primary, we received insufficient votes"));
}
TEST_F(ReplCoordTest, TransitionToRollbackFailsWhenElectionInProgress) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345"))
<< "protocolVersion"
<< 1);
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
OperationContextNoop opCtx;
OpTime time1(Timestamp(100, 1), 0);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
simulateEnoughHeartbeatsForAllNodesUp();
simulateSuccessfulDryRun();
ASSERT_EQUALS(ErrorCodes::ElectionInProgress,
getReplCoord()->setFollowerMode(MemberState::RS_ROLLBACK));
ASSERT_FALSE(getReplCoord()->getMemberState().rollback());
// We do not need to respond to any pending network operations because setFollowerMode() will
// cancel the freshness checker and election command runner.
}
TEST_F(ReplCoordTest, ElectionFailsWhenVoteRequestResponseContainsANewerTerm) {
startCapturingLogMessages();
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345"))
<< "protocolVersion"
<< 1);
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
OperationContextNoop opCtx;
OpTime time1(Timestamp(100, 1), 0);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
simulateEnoughHeartbeatsForAllNodesUp();
simulateSuccessfulDryRun();
NetworkInterfaceMock* net = getNet();
net->enterNetwork();
while (net->hasReadyRequests()) {
const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
log() << request.target.toString() << " processing " << request.cmdObj;
if (request.cmdObj.firstElement().fieldNameStringData() != "replSetRequestVotes") {
net->blackHole(noi);
} else {
net->scheduleResponse(
noi,
net->now(),
makeResponseStatus(BSON("ok" << 1 << "term" << request.cmdObj["term"].Long() + 1
<< "voteGranted"
<< false
<< "reason"
<< "quit living in the past")));
}
net->runReadyNetworkOperations();
}
net->exitNetwork();
getReplCoord()->waitForElectionFinish_forTest();
stopCapturingLogMessages();
ASSERT_EQUALS(1,
countLogLinesContaining("not becoming primary, we have been superceded already"));
}
TEST_F(ReplCoordTest, ElectionFailsWhenTermChangesDuringDryRun) {
startCapturingLogMessages();
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345"))
<< "protocolVersion"
<< 1);
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
OperationContextNoop opCtx;
OpTime time1(Timestamp(100, 1), 0);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
simulateEnoughHeartbeatsForAllNodesUp();
auto onDryRunRequest = [this](const RemoteCommandRequest& request) {
// Update to a future term before dry run completes.
ASSERT_EQUALS(0, request.cmdObj.getIntField("candidateIndex"));
ASSERT(getTopoCoord().updateTerm(1000, getNet()->now()) ==
TopologyCoordinator::UpdateTermResult::kUpdatedTerm);
};
simulateSuccessfulDryRun(onDryRunRequest);
stopCapturingLogMessages();
ASSERT_EQUALS(
1, countLogLinesContaining("not running for primary, we have been superceded already"));
}
TEST_F(ReplCoordTest, ElectionFailsWhenTermChangesDuringActualElection) {
startCapturingLogMessages();
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345"))
<< "protocolVersion"
<< 1);
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
OperationContextNoop opCtx;
OpTime time1(Timestamp(100, 1), 0);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
simulateEnoughHeartbeatsForAllNodesUp();
simulateSuccessfulDryRun();
// update to a future term before the election completes
getReplCoord()->updateTerm(&opCtx, 1000).transitional_ignore();
NetworkInterfaceMock* net = getNet();
net->enterNetwork();
while (net->hasReadyRequests()) {
const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
log() << request.target.toString() << " processing " << request.cmdObj;
if (request.cmdObj.firstElement().fieldNameStringData() != "replSetRequestVotes") {
net->blackHole(noi);
} else {
net->scheduleResponse(
noi,
net->now(),
makeResponseStatus(BSON(
"ok" << 1 << "term" << request.cmdObj["term"].Long() << "voteGranted" << true
<< "reason"
<< "")));
}
net->runReadyNetworkOperations();
}
net->exitNetwork();
getReplCoord()->waitForElectionFinish_forTest();
stopCapturingLogMessages();
ASSERT_EQUALS(1,
countLogLinesContaining("not becoming primary, we have been superceded already"));
}
class TakeoverTest : public ReplCoordTest {
public:
/*
* Verify that a given priority takeover delay is valid. Takeover delays are
* verified in terms of bounds since the delay value is randomized.
*/
void assertValidPriorityTakeoverDelay(ReplSetConfig config,
Date_t now,
Date_t priorityTakeoverTime,
int nodeIndex) {
Milliseconds priorityTakeoverDelay = priorityTakeoverTime - now;
Milliseconds electionTimeout = config.getElectionTimeoutPeriod();
long long baseTakeoverDelay =
durationCount(config.getPriorityTakeoverDelay(nodeIndex));
long long randomOffsetUpperBound = durationCount(electionTimeout) *
getExternalState()->getElectionTimeoutOffsetLimitFraction();
auto takeoverDelayUpperBound = Milliseconds(baseTakeoverDelay + randomOffsetUpperBound);
auto takeoverDelayLowerBound = Milliseconds(baseTakeoverDelay);
ASSERT_GREATER_THAN_OR_EQUALS(priorityTakeoverDelay, takeoverDelayLowerBound);
ASSERT_LESS_THAN_OR_EQUALS(priorityTakeoverDelay, takeoverDelayUpperBound);
}
/*
* Processes and mocks responses to any pending PV1 heartbeat requests that have been
* scheduled at or before 'until'. For any such scheduled heartbeat requests, the
* heartbeat responses will be mocked at the same time the request was made. So,
* for a heartbeat request made at time 't', the response will be mocked as
* occurring at time 't'. This function will always run the clock forward to time
* 'until'.
*
* The applied & durable optimes of the mocked response will be set to
* 'otherNodesOpTime', and the primary set as 'primaryHostAndPort'.
*
* Returns the time that it ran until, which should always be equal to 'until'.
*/
Date_t respondToHeartbeatsUntil(const ReplSetConfig& config,
Date_t until,
const HostAndPort& primaryHostAndPort,
const OpTime& otherNodesOpTime) {
auto net = getNet();
net->enterNetwork();
// If 'until' is equal to net->now(), process any currently queued requests and return,
// without running the clock.
if (net->now() == until) {
_respondToHeartbeatsNow(config, primaryHostAndPort, otherNodesOpTime);
} else {
// Otherwise, run clock and process heartbeats along the way.
while (net->now() < until) {
// Run clock forward to time 'until', or until the time of the next queued request.
net->runUntil(until);
_respondToHeartbeatsNow(config, primaryHostAndPort, otherNodesOpTime);
}
}
net->runReadyNetworkOperations();
net->exitNetwork();
ASSERT_EQ(net->now(), until);
return net->now();
}
void performSuccessfulPriorityTakeover(Date_t priorityTakeoverTime) {
startCapturingLogMessages();
simulateSuccessfulV1ElectionAt(priorityTakeoverTime);
getReplCoord()->waitForElectionFinish_forTest();
stopCapturingLogMessages();
ASSERT(getReplCoord()->getMemberState().primary());
// Check last vote
auto lastVote = getExternalState()->loadLocalLastVoteDocument(nullptr);
ASSERT(lastVote.isOK());
ASSERT_EQ(0, lastVote.getValue().getCandidateIndex());
ASSERT_EQ(1, lastVote.getValue().getTerm());
ASSERT_EQUALS(1, countLogLinesContaining("Starting an election for a priority takeover"));
ASSERT_EQUALS(1, countLogLinesContaining("election succeeded"));
}
private:
/*
* Processes and schedules mock responses to any PV1 heartbeat requests scheduled at or
* before the current time. Assumes that the caller has already entered the network with
* 'enterNetwork()'. It does not run the virtual clock.
*
* Intended as a helper function only.
*/
void _respondToHeartbeatsNow(const ReplSetConfig& config,
const HostAndPort& primaryHostAndPort,
const OpTime& otherNodesOpTime) {
auto replCoord = getReplCoord();
auto net = getNet();
// Process all requests queued at the present time.
while (net->hasReadyRequests()) {
auto noi = net->getNextReadyRequest();
auto&& request = noi->getRequest();
log() << request.target << " processing " << request.cmdObj;
ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData());
// Make sure the heartbeat request is valid.
ReplSetHeartbeatArgsV1 hbArgs;
ASSERT_OK(hbArgs.initialize(request.cmdObj));
// Build the mock heartbeat response.
ReplSetHeartbeatResponse hbResp;
hbResp.setSetName(config.getReplSetName());
if (request.target == primaryHostAndPort) {
hbResp.setState(MemberState::RS_PRIMARY);
} else {
hbResp.setState(MemberState::RS_SECONDARY);
}
hbResp.setConfigVersion(config.getConfigVersion());
hbResp.setTerm(replCoord->getTerm());
hbResp.setAppliedOpTime(otherNodesOpTime);
hbResp.setDurableOpTime(otherNodesOpTime);
auto response = makeResponseStatus(hbResp.toBSON(replCoord->isV1ElectionProtocol()));
net->scheduleResponse(noi, net->now(), response);
}
}
};
TEST_F(TakeoverTest, SchedulesCatchupTakeoverIfNodeIsFresherThanCurrentPrimary) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345"))
<< "protocolVersion"
<< 1);
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
auto replCoord = getReplCoord();
auto now = getNet()->now();
OperationContextNoop opCtx;
OpTime currentOptime(Timestamp(200, 1), 0);
replCoord->setMyLastAppliedOpTime(currentOptime);
replCoord->setMyLastDurableOpTime(currentOptime);
OpTime behindOptime(Timestamp(100, 1), 0);
// Make sure we're secondary and that no catchup takeover has been scheduled yet.
ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
ASSERT_FALSE(replCoord->getCatchupTakeover_forTest());
// Mock a first round of heartbeat responses, which should give us enough information to know
// that we are fresher than the current primary, prompting the scheduling of a catchup
// takeover.
now = respondToHeartbeatsUntil(config, now, HostAndPort("node2", 12345), behindOptime);
// Make sure that the catchup takeover has actually been scheduled and at the
// correct time.
ASSERT(replCoord->getCatchupTakeover_forTest());
auto catchupTakeoverTime = replCoord->getCatchupTakeover_forTest().get();
Milliseconds catchupTakeoverDelay = catchupTakeoverTime - now;
ASSERT_EQUALS(config.getCatchupTakeoverDelay(), catchupTakeoverDelay);
}
TEST_F(TakeoverTest, SchedulesCatchupTakeoverIfBothTakeoversAnOption) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345"
<< "priority"
<< 2)
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345"))
<< "protocolVersion"
<< 1);
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
auto replCoord = getReplCoord();
auto now = getNet()->now();
OperationContextNoop opCtx;
OpTime currentOptime(Timestamp(200, 1), 0);
replCoord->setMyLastAppliedOpTime(currentOptime);
replCoord->setMyLastDurableOpTime(currentOptime);
OpTime behindOptime(Timestamp(100, 1), 0);
// Make sure we're secondary and that no catchup takeover has been scheduled.
ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
ASSERT_FALSE(replCoord->getCatchupTakeover_forTest());
// Mock a first round of heartbeat responses, which should give us enough information to know
// that we are fresher than the current primary, prompting the scheduling of a catchup
// takeover.
now = respondToHeartbeatsUntil(config, now, HostAndPort("node2", 12345), behindOptime);
// Make sure that the catchup takeover has actually been scheduled at the
// correct time and that a priority takeover has not been scheduled.
ASSERT(replCoord->getCatchupTakeover_forTest());
ASSERT_FALSE(replCoord->getPriorityTakeover_forTest());
auto catchupTakeoverTime = replCoord->getCatchupTakeover_forTest().get();
Milliseconds catchupTakeoverDelay = catchupTakeoverTime - now;
ASSERT_EQUALS(config.getCatchupTakeoverDelay(), catchupTakeoverDelay);
}
TEST_F(TakeoverTest, CatchupTakeoverNotScheduledTwice) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345"))
<< "protocolVersion"
<< 1);
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
auto replCoord = getReplCoord();
auto now = getNet()->now();
OperationContextNoop opCtx;
OpTime currentOptime(Timestamp(200, 1), 0);
replCoord->setMyLastAppliedOpTime(currentOptime);
replCoord->setMyLastDurableOpTime(currentOptime);
OpTime behindOptime(Timestamp(100, 1), 0);
// Make sure we're secondary and that no catchup takeover has been scheduled.
ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
ASSERT_FALSE(replCoord->getCatchupTakeover_forTest());
// Mock a first round of heartbeat responses, which should give us enough information to know
// that we are fresher than the current primary, prompting the scheduling of a catchup
// takeover.
now = respondToHeartbeatsUntil(config, now, HostAndPort("node2", 12345), behindOptime);
// Make sure that the catchup takeover has actually been scheduled and at the
// correct time.
ASSERT(replCoord->getCatchupTakeover_forTest());
executor::TaskExecutor::CallbackHandle catchupTakeoverCbh =
replCoord->getCatchupTakeoverCbh_forTest();
auto catchupTakeoverTime = replCoord->getCatchupTakeover_forTest().get();
Milliseconds catchupTakeoverDelay = catchupTakeoverTime - now;
ASSERT_EQUALS(config.getCatchupTakeoverDelay(), catchupTakeoverDelay);
// Mock another round of heartbeat responses
now = respondToHeartbeatsUntil(
config, now + config.getHeartbeatInterval(), HostAndPort("node2", 12345), behindOptime);
// Make sure another catchup takeover wasn't scheduled
ASSERT_EQUALS(catchupTakeoverTime, replCoord->getCatchupTakeover_forTest().get());
ASSERT_TRUE(catchupTakeoverCbh == replCoord->getCatchupTakeoverCbh_forTest());
}
TEST_F(TakeoverTest, CatchupAndPriorityTakeoverNotScheduledAtSameTime) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345"
<< "priority"
<< 2)
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345"))
<< "protocolVersion"
<< 1);
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
auto replCoord = getReplCoord();
auto now = getNet()->now();
OperationContextNoop opCtx;
OpTime currentOptime(Timestamp(200, 1), 0);
replCoord->setMyLastAppliedOpTime(currentOptime);
replCoord->setMyLastDurableOpTime(currentOptime);
OpTime behindOptime(Timestamp(100, 1), 0);
// Make sure we're secondary and that no catchup takeover has been scheduled.
ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
ASSERT_FALSE(replCoord->getCatchupTakeover_forTest());
// Mock a first round of heartbeat responses, which should give us enough information to know
// that we are fresher than the current primary, prompting the scheduling of a catchup
// takeover.
now = respondToHeartbeatsUntil(config, now, HostAndPort("node2", 12345), behindOptime);
// Make sure that the catchup takeover has actually been scheduled and at the
// correct time.
ASSERT(replCoord->getCatchupTakeover_forTest());
auto catchupTakeoverTime = replCoord->getCatchupTakeover_forTest().get();
Milliseconds catchupTakeoverDelay = catchupTakeoverTime - now;
ASSERT_EQUALS(config.getCatchupTakeoverDelay(), catchupTakeoverDelay);
// Mock another heartbeat where the primary is now up to date
now = respondToHeartbeatsUntil(
config, now + catchupTakeoverDelay / 2, HostAndPort("node2", 12345), currentOptime);
// Since we are no longer ahead of the primary, we can't schedule a catchup
// takeover anymore. But we are still higher priority than the primary, so
// after the heartbeat we will try to schedule a priority takeover.
// Because we can't schedule two takeovers at the same time and the
// catchup takeover hasn't fired yet, make sure that we don't schedule a
// priority takeover.
ASSERT(replCoord->getCatchupTakeover_forTest());
ASSERT_FALSE(replCoord->getPriorityTakeover_forTest());
}
TEST_F(TakeoverTest, CatchupTakeoverCallbackCanceledIfElectionTimeoutRuns) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345"))
<< "protocolVersion"
<< 1);
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
auto replCoord = getReplCoord();
auto now = getNet()->now();
OperationContextNoop opCtx;
OpTime currentOptime(Timestamp(200, 1), 0);
replCoord->setMyLastAppliedOpTime(currentOptime);
replCoord->setMyLastDurableOpTime(currentOptime);
OpTime behindOptime(Timestamp(100, 1), 0);
// Make sure we're secondary and that no catchup takeover has been scheduled.
ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
ASSERT_FALSE(replCoord->getCatchupTakeover_forTest());
startCapturingLogMessages();
// Mock a first round of heartbeat responses, which should give us enough information to know
// that we are fresher than the current primary, prompting the scheduling of a catchup
// takeover.
now = respondToHeartbeatsUntil(config, now, HostAndPort("node2", 12345), behindOptime);
// Make sure that the catchup takeover has actually been scheduled and at the
// correct time.
ASSERT(replCoord->getCatchupTakeover_forTest());
auto catchupTakeoverTime = replCoord->getCatchupTakeover_forTest().get();
Milliseconds catchupTakeoverDelay = catchupTakeoverTime - now;
ASSERT_EQUALS(config.getCatchupTakeoverDelay(), catchupTakeoverDelay);
// Fast forward clock to after electionTimeout and black hole all
// heartbeat requests to make sure the election timeout runs.
Date_t electionTimeout = replCoord->getElectionTimeout_forTest();
auto net = getNet();
net->enterNetwork();
while (net->now() < electionTimeout) {
net->runUntil(electionTimeout);
while (net->hasReadyRequests()) {
auto noi = net->getNextReadyRequest();
net->blackHole(noi);
}
}
ASSERT_EQUALS(electionTimeout, net->now());
net->exitNetwork();
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Starting an election, since we've seen no PRIMARY"));
// Make sure catchup takeover never happend and CatchupTakeover callback was canceled.
ASSERT_FALSE(replCoord->getCatchupTakeover_forTest());
ASSERT(replCoord->getMemberState().secondary());
ASSERT_EQUALS(1, countLogLinesContaining("Canceling catchup takeover callback"));
ASSERT_EQUALS(0, countLogLinesContaining("Starting an election for a catchup takeover"));
}
TEST_F(TakeoverTest, CatchupTakeoverCanceledIfTransitionToRollback) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345"))
<< "protocolVersion"
<< 1);
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
auto replCoord = getReplCoord();
auto now = getNet()->now();
OperationContextNoop opCtx;
OpTime currentOptime(Timestamp(200, 1), 0);
replCoord->setMyLastAppliedOpTime(currentOptime);
replCoord->setMyLastDurableOpTime(currentOptime);
OpTime behindOptime(Timestamp(100, 1), 0);
// Make sure we're secondary and that no catchup takeover has been scheduled.
ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
ASSERT_FALSE(replCoord->getCatchupTakeover_forTest());
startCapturingLogMessages();
// Mock a first round of heartbeat responses, which should give us enough information to know
// that we are fresher than the current primary, prompting the scheduling of a catchup
// takeover.
now = respondToHeartbeatsUntil(config, now, HostAndPort("node2", 12345), behindOptime);
// Make sure that the catchup takeover has actually been scheduled and at the
// correct time.
ASSERT(replCoord->getCatchupTakeover_forTest());
auto catchupTakeoverTime = replCoord->getCatchupTakeover_forTest().get();
Milliseconds catchupTakeoverDelay = catchupTakeoverTime - now;
ASSERT_EQUALS(config.getCatchupTakeoverDelay(), catchupTakeoverDelay);
// Transitioning to rollback state should cancel the takeover
ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_ROLLBACK));
ASSERT_TRUE(replCoord->getMemberState().rollback());
stopCapturingLogMessages();
// Make sure catchup takeover never happend and CatchupTakeover callback was canceled.
ASSERT_FALSE(replCoord->getCatchupTakeover_forTest());
ASSERT_EQUALS(1, countLogLinesContaining("Canceling catchup takeover callback"));
ASSERT_EQUALS(0, countLogLinesContaining("Starting an election for a catchup takeover"));
}
TEST_F(TakeoverTest, CatchupTakeoverElectionIsANoop) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345"))
<< "protocolVersion"
<< 1);
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
HostAndPort primaryHostAndPort("node2", 12345);
auto replCoord = getReplCoord();
auto now = getNet()->now();
OperationContextNoop opCtx;
OpTime currentOptime(Timestamp(100, 5000), 0);
OpTime behindOptime(Timestamp(100, 4000), 0);
replCoord->setMyLastAppliedOpTime(currentOptime);
replCoord->setMyLastDurableOpTime(currentOptime);
// Make sure we're secondary and that no takeover has been scheduled.
ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
ASSERT_FALSE(replCoord->getCatchupTakeover_forTest());
// Mock a first round of heartbeat responses.
now = respondToHeartbeatsUntil(config, now, primaryHostAndPort, behindOptime);
// Make sure that the catchup takeover has actually been scheduled and at the
// correct time.
ASSERT(replCoord->getCatchupTakeover_forTest());
auto catchupTakeoverTime = replCoord->getCatchupTakeover_forTest().get();
Milliseconds catchupTakeoverDelay = catchupTakeoverTime - now;
ASSERT_EQUALS(config.getCatchupTakeoverDelay(), catchupTakeoverDelay);
startCapturingLogMessages();
now = respondToHeartbeatsUntil(config, catchupTakeoverTime, primaryHostAndPort, behindOptime);
stopCapturingLogMessages();
// Make sure that the catchup takeover fired as a NOOP.
ASSERT(replCoord->getMemberState().secondary());
ASSERT_EQUALS(1, countLogLinesContaining("Starting an election for a catchup takeover [NOOP]"));
}
TEST_F(TakeoverTest, SchedulesPriorityTakeoverIfNodeHasHigherPriorityThanCurrentPrimary) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345"
<< "priority"
<< 2)
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345"))
<< "protocolVersion"
<< 1);
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
auto replCoord = getReplCoord();
auto now = getNet()->now();
OperationContextNoop opCtx;
OpTime myOptime(Timestamp(100, 1), 0);
replCoord->setMyLastAppliedOpTime(myOptime);
replCoord->setMyLastDurableOpTime(myOptime);
// Make sure we're secondary and that no priority takeover has been scheduled.
ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
ASSERT_FALSE(replCoord->getPriorityTakeover_forTest());
// Mock a first round of heartbeat responses, which should give us enough information to know
// that we supersede priorities of all other nodes, prompting the scheduling of a priority
// takeover.
now = respondToHeartbeatsUntil(config, now, HostAndPort("node2", 12345), myOptime);
// Make sure that the priority takeover has actually been scheduled and at the
// correct time.
ASSERT(replCoord->getPriorityTakeover_forTest());
auto priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get();
assertValidPriorityTakeoverDelay(config, now, priorityTakeoverTime, 0);
// Also make sure that updating the term cancels the scheduled priority takeover.
ASSERT_EQUALS(ErrorCodes::StaleTerm, replCoord->updateTerm(&opCtx, replCoord->getTerm() + 1));
ASSERT_FALSE(replCoord->getPriorityTakeover_forTest());
}
TEST_F(TakeoverTest, SuccessfulPriorityTakeover) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345"
<< "priority"
<< 2)
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345"))
<< "protocolVersion"
<< 1);
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
auto replCoord = getReplCoord();
auto now = getNet()->now();
OperationContextNoop opCtx;
OpTime myOptime(Timestamp(100, 1), 0);
replCoord->setMyLastAppliedOpTime(myOptime);
replCoord->setMyLastDurableOpTime(myOptime);
// Make sure we're secondary and that no priority takeover has been scheduled.
ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
ASSERT_FALSE(replCoord->getPriorityTakeover_forTest());
// Mock a first round of heartbeat responses, which should give us enough information to know
// that we supersede priorities of all other nodes, prompting the scheduling of a priority
// takeover.
now = respondToHeartbeatsUntil(config, now, HostAndPort("node2", 12345), myOptime);
// Make sure that the priority takeover has actually been scheduled and at the
// correct time.
ASSERT(replCoord->getPriorityTakeover_forTest());
auto priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get();
assertValidPriorityTakeoverDelay(config, now, priorityTakeoverTime, 0);
// The priority takeover might be scheduled at a time later than one election
// timeout after our initial heartbeat responses, so mock another round of
// heartbeat responses to prevent a normal election timeout.
Milliseconds halfElectionTimeout = config.getElectionTimeoutPeriod() / 2;
now = respondToHeartbeatsUntil(
config, now + halfElectionTimeout, HostAndPort("node2", 12345), myOptime);
performSuccessfulPriorityTakeover(priorityTakeoverTime);
}
TEST_F(TakeoverTest, DontCallForPriorityTakeoverWhenLaggedSameSecond) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345"
<< "priority"
<< 2)
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345"))
<< "protocolVersion"
<< 1);
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
HostAndPort primaryHostAndPort("node2", 12345);
auto replCoord = getReplCoord();
auto timeZero = getNet()->now();
auto now = getNet()->now();
OperationContextNoop opCtx;
OpTime currentOpTime(Timestamp(100, 5000), 0);
OpTime behindOpTime(Timestamp(100, 3999), 0);
OpTime closeEnoughOpTime(Timestamp(100, 4000), 0);
replCoord->setMyLastAppliedOpTime(behindOpTime);
replCoord->setMyLastDurableOpTime(behindOpTime);
// Make sure we're secondary and that no priority takeover has been scheduled.
ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
ASSERT_FALSE(replCoord->getPriorityTakeover_forTest());
// Mock a first round of heartbeat responses.
now = respondToHeartbeatsUntil(config, now, primaryHostAndPort, currentOpTime);
// Make sure that the priority takeover has actually been scheduled and at the
// correct time.
ASSERT(replCoord->getPriorityTakeover_forTest());
auto priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get();
assertValidPriorityTakeoverDelay(config, now, priorityTakeoverTime, 0);
// At this point the other nodes are all ahead of the current node, so it can't call for
// priority takeover.
startCapturingLogMessages();
now = respondToHeartbeatsUntil(config, priorityTakeoverTime, primaryHostAndPort, currentOpTime);
stopCapturingLogMessages();
ASSERT(replCoord->getMemberState().secondary());
ASSERT_EQUALS(1,
countLogLinesContaining("Not standing for election because member is not "
"caught up enough to the most up-to-date member to "
"call for priority takeover"));
// Mock another round of heartbeat responses that occur after the previous
// 'priorityTakeoverTime', which should schedule a new priority takeover
Milliseconds halfElectionTimeout = config.getElectionTimeoutPeriod() / 2;
now = respondToHeartbeatsUntil(
config, timeZero + halfElectionTimeout * 3, primaryHostAndPort, currentOpTime);
// Make sure that a new priority takeover has been scheduled and at the
// correct time.
ASSERT(replCoord->getPriorityTakeover_forTest());
priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get();
assertValidPriorityTakeoverDelay(config, now, priorityTakeoverTime, 0);
// Now make us caught up enough to call for priority takeover to succeed.
replCoord->setMyLastAppliedOpTime(closeEnoughOpTime);
replCoord->setMyLastDurableOpTime(closeEnoughOpTime);
performSuccessfulPriorityTakeover(priorityTakeoverTime);
}
TEST_F(TakeoverTest, DontCallForPriorityTakeoverWhenLaggedDifferentSecond) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345"
<< "priority"
<< 2)
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345"))
<< "protocolVersion"
<< 1);
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
HostAndPort primaryHostAndPort("node2", 12345);
auto replCoord = getReplCoord();
auto timeZero = getNet()->now();
auto now = getNet()->now();
OperationContextNoop opCtx;
OpTime currentOpTime(Timestamp(100, 0), 0);
OpTime behindOpTime(Timestamp(97, 0), 0);
OpTime closeEnoughOpTime(Timestamp(98, 0), 0);
replCoord->setMyLastAppliedOpTime(behindOpTime);
replCoord->setMyLastDurableOpTime(behindOpTime);
// Make sure we're secondary and that no priority takeover has been scheduled.
ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
ASSERT_FALSE(replCoord->getPriorityTakeover_forTest());
now = respondToHeartbeatsUntil(config, now, primaryHostAndPort, currentOpTime);
// Make sure that the priority takeover has actually been scheduled and at the
// correct time.
ASSERT(replCoord->getPriorityTakeover_forTest());
auto priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get();
assertValidPriorityTakeoverDelay(config, now, priorityTakeoverTime, 0);
// At this point the other nodes are all ahead of the current node, so it can't call for
// priority takeover.
startCapturingLogMessages();
now = respondToHeartbeatsUntil(config, priorityTakeoverTime, primaryHostAndPort, currentOpTime);
stopCapturingLogMessages();
ASSERT(replCoord->getMemberState().secondary());
ASSERT_EQUALS(1,
countLogLinesContaining("Not standing for election because member is not "
"caught up enough to the most up-to-date member to "
"call for priority takeover"));
// Mock another round of heartbeat responses that occur after the previous
// 'priorityTakeoverTime', which should schedule a new priority takeover
Milliseconds halfElectionTimeout = config.getElectionTimeoutPeriod() / 2;
now = respondToHeartbeatsUntil(
config, timeZero + halfElectionTimeout * 3, primaryHostAndPort, currentOpTime);
// Make sure that a new priority takeover has been scheduled and at the
// correct time.
ASSERT(replCoord->getPriorityTakeover_forTest());
priorityTakeoverTime = replCoord->getPriorityTakeover_forTest().get();
assertValidPriorityTakeoverDelay(config, now, priorityTakeoverTime, 0);
// Now make us caught up enough to call for priority takeover to succeed.
replCoord->setMyLastAppliedOpTime(closeEnoughOpTime);
replCoord->setMyLastDurableOpTime(closeEnoughOpTime);
performSuccessfulPriorityTakeover(priorityTakeoverTime);
}
TEST_F(ReplCoordTest, NodeCancelsElectionUponReceivingANewConfigDuringDryRun) {
// Start up and become electable.
assertStartSuccess(BSON("_id"
<< "mySet"
<< "version"
<< 2
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345"))
<< "settings"
<< BSON("heartbeatIntervalMillis" << 100)),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(100, 0), 0));
getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(100, 0), 0));
simulateEnoughHeartbeatsForAllNodesUp();
// Advance to dry run vote request phase.
NetworkInterfaceMock* net = getNet();
net->enterNetwork();
while (TopologyCoordinator::Role::candidate != getTopoCoord().getRole()) {
net->runUntil(net->now() + Seconds(1));
if (!net->hasReadyRequests()) {
continue;
}
net->blackHole(net->getNextReadyRequest());
}
net->exitNetwork();
ASSERT(TopologyCoordinator::Role::candidate == getTopoCoord().getRole());
// Submit a reconfig and confirm it cancels the election.
ReplicationCoordinatorImpl::ReplSetReconfigArgs config = {
BSON("_id"
<< "mySet"
<< "version"
<< 4
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345"))),
true};
BSONObjBuilder result;
const auto opCtx = makeOperationContext();
ASSERT_OK(getReplCoord()->processReplSetReconfig(opCtx.get(), config, &result));
// Wait until election cancels.
net->enterNetwork();
net->runReadyNetworkOperations();
net->exitNetwork();
ASSERT(TopologyCoordinator::Role::follower == getTopoCoord().getRole());
}
TEST_F(ReplCoordTest, NodeCancelsElectionUponReceivingANewConfigDuringVotePhase) {
// Start up and become electable.
assertStartSuccess(BSON("_id"
<< "mySet"
<< "version"
<< 2
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345"))
<< "settings"
<< BSON("heartbeatIntervalMillis" << 100)),
HostAndPort("node1", 12345));
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(100, 0), 0));
getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(100, 0), 0));
simulateEnoughHeartbeatsForAllNodesUp();
simulateSuccessfulDryRun();
ASSERT(TopologyCoordinator::Role::candidate == getTopoCoord().getRole());
// Submit a reconfig and confirm it cancels the election.
ReplicationCoordinatorImpl::ReplSetReconfigArgs config = {
BSON("_id"
<< "mySet"
<< "version"
<< 4
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345"))),
true};
BSONObjBuilder result;
const auto opCtx = makeOperationContext();
ASSERT_OK(getReplCoord()->processReplSetReconfig(opCtx.get(), config, &result));
// Wait until election cancels.
getNet()->enterNetwork();
getNet()->runReadyNetworkOperations();
getNet()->exitNetwork();
ASSERT(TopologyCoordinator::Role::follower == getTopoCoord().getRole());
}
class PrimaryCatchUpTest : public ReplCoordTest {
protected:
using NetworkOpIter = NetworkInterfaceMock::NetworkOperationIterator;
using NetworkRequestFn = stdx::function;
const Timestamp smallTimestamp{1, 1};
executor::RemoteCommandResponse makeHeartbeatResponse(OpTime opTime) {
ReplSetConfig rsConfig = getReplCoord()->getReplicaSetConfig_forTest();
ReplSetHeartbeatResponse hbResp;
hbResp.setSetName(rsConfig.getReplSetName());
hbResp.setState(MemberState::RS_SECONDARY);
hbResp.setConfigVersion(rsConfig.getConfigVersion());
hbResp.setAppliedOpTime(opTime);
hbResp.setDurableOpTime(opTime);
return makeResponseStatus(hbResp.toBSON(true));
}
void simulateSuccessfulV1Voting() {
ReplicationCoordinatorImpl* replCoord = getReplCoord();
NetworkInterfaceMock* net = getNet();
auto electionTimeoutWhen = replCoord->getElectionTimeout_forTest();
ASSERT_NOT_EQUALS(Date_t(), electionTimeoutWhen);
log() << "Election timeout scheduled at " << electionTimeoutWhen << " (simulator time)";
ASSERT(replCoord->getMemberState().secondary()) << replCoord->getMemberState().toString();
// Process requests until we're primary but leave the heartbeats for the notification
// of election win. Exit immediately on unexpected requests.
while (!replCoord->getMemberState().primary()) {
log() << "Waiting on network in state " << replCoord->getMemberState();
net->enterNetwork();
if (net->now() < electionTimeoutWhen) {
net->runUntil(electionTimeoutWhen);
}
// Peek the next request, don't consume it yet.
const NetworkOpIter noi = net->getFrontOfUnscheduledQueue();
const RemoteCommandRequest& request = noi->getRequest();
log() << request.target.toString() << " processing " << request.cmdObj;
if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) {
OpTime opTime(Timestamp(), getReplCoord()->getTerm());
net->scheduleResponse(
net->getNextReadyRequest(), net->now(), makeHeartbeatResponse(opTime));
} else if (request.cmdObj.firstElement().fieldNameStringData() ==
"replSetRequestVotes") {
net->scheduleResponse(net->getNextReadyRequest(),
net->now(),
makeResponseStatus(BSON("ok" << 1 << "reason"
<< ""
<< "term"
<< request.cmdObj["term"].Long()
<< "voteGranted"
<< true)));
} else {
// Stop the loop and let the caller handle unexpected requests.
net->exitNetwork();
break;
}
net->runReadyNetworkOperations();
net->exitNetwork();
}
}
ReplSetConfig setUp3NodeReplSetAndRunForElection(OpTime opTime, long long timeout = 5000) {
BSONObj configObj = BSON("_id"
<< "mySet"
<< "version"
<< 1
<< "members"
<< BSON_ARRAY(BSON("_id" << 1 << "host"
<< "node1:12345")
<< BSON("_id" << 2 << "host"
<< "node2:12345")
<< BSON("_id" << 3 << "host"
<< "node3:12345"))
<< "protocolVersion"
<< 1
<< "settings"
<< BSON("heartbeatTimeoutSecs" << 1 << "catchUpTimeoutMillis"
<< timeout));
assertStartSuccess(configObj, HostAndPort("node1", 12345));
ReplSetConfig config = assertMakeRSConfig(configObj);
getReplCoord()->setMyLastAppliedOpTime(opTime);
getReplCoord()->setMyLastDurableOpTime(opTime);
ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
simulateSuccessfulV1Voting();
IsMasterResponse imResponse;
getReplCoord()->fillIsMasterForReplSet(&imResponse);
ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString();
ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString();
return config;
}
executor::RemoteCommandResponse makeFreshnessScanResponse(OpTime opTime) {
// OpTime part of replSetGetStatus.
return makeResponseStatus(BSON("optimes" << BSON("appliedOpTime" << opTime)));
}
void processHeartbeatRequests(NetworkRequestFn onHeartbeatRequest) {
NetworkInterfaceMock* net = getNet();
net->enterNetwork();
while (net->hasReadyRequests()) {
const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
const RemoteCommandRequest& request = noi->getRequest();
log() << request.target.toString() << " processing " << request.cmdObj;
if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) {
onHeartbeatRequest(noi);
} else {
log() << "Black holing unexpected request to " << request.target << ": "
<< request.cmdObj;
net->blackHole(noi);
}
net->runReadyNetworkOperations();
}
net->exitNetwork();
}
// Response heartbeats with opTime until the given time. Exit if it sees any other request.
void replyHeartbeatsAndRunUntil(Date_t until, NetworkRequestFn onHeartbeatRequest) {
auto net = getNet();
net->enterNetwork();
while (net->now() < until) {
while (net->hasReadyRequests()) {
// Peek the next request
auto noi = net->getFrontOfUnscheduledQueue();
auto& request = noi->getRequest();
log() << request.target << " at " << net->now() << " processing " << request.cmdObj;
if (ReplSetHeartbeatArgsV1().initialize(request.cmdObj).isOK()) {
// Consume the next request
onHeartbeatRequest(net->getNextReadyRequest());
} else {
// Cannot consume other requests than heartbeats.
net->exitNetwork();
return;
}
}
net->runUntil(until);
}
net->exitNetwork();
}
// Simulate the work done by bgsync and applier threads. setMyLastAppliedOpTime() will signal
// the optime waiter.
void advanceMyLastAppliedOpTime(OpTime opTime) {
getReplCoord()->setMyLastAppliedOpTime(opTime);
getNet()->enterNetwork();
getNet()->runReadyNetworkOperations();
getNet()->exitNetwork();
}
};
// The first round of heartbeats indicates we are the most up-to-date.
TEST_F(PrimaryCatchUpTest, PrimaryDoesNotNeedToCatchUp) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
int count = 0;
processHeartbeatRequests([this, time1, &count](const NetworkOpIter noi) {
count++;
auto net = getNet();
// The old primary accepted one more op and all nodes caught up after voting for me.
net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time1));
});
// Get 2 heartbeats from secondaries.
ASSERT_EQUALS(2, count);
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest optime known via heartbeats"));
auto opCtx = makeOperationContext();
getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
// Heartbeats set a future target OpTime and we reached that successfully.
TEST_F(PrimaryCatchUpTest, CatchupSucceeds) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
processHeartbeatRequests([this, time2](const NetworkOpIter noi) {
auto net = getNet();
// The old primary accepted one more op and all nodes caught up after voting for me.
net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2));
});
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
advanceMyLastAppliedOpTime(time2);
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest known optime successfully"));
auto opCtx = makeOperationContext();
getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
TEST_F(PrimaryCatchUpTest, CatchupTimeout) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
auto catchupTimeoutTime = getNet()->now() + config.getCatchUpTimeoutPeriod();
replyHeartbeatsAndRunUntil(catchupTimeoutTime, [this, time2](const NetworkOpIter noi) {
// Other nodes are ahead of me.
getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2));
});
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Catchup timed out"));
auto opCtx = makeOperationContext();
getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
TEST_F(PrimaryCatchUpTest, CannotSeeAllNodes) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
// We should get caught up by the timeout time.
auto catchupTimeoutTime = getNet()->now() + config.getCatchUpTimeoutPeriod();
replyHeartbeatsAndRunUntil(catchupTimeoutTime, [this, time1](const NetworkOpIter noi) {
const RemoteCommandRequest& request = noi->getRequest();
if (request.target.host() == "node2") {
auto status = Status(ErrorCodes::HostUnreachable, "Can't reach remote host");
getNet()->scheduleResponse(noi, getNet()->now(), status);
} else {
getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time1));
}
});
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest optime known via heartbeats"));
auto opCtx = makeOperationContext();
getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
TEST_F(PrimaryCatchUpTest, HeartbeatTimeout) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
// We should get caught up by the timeout time.
auto catchupTimeoutTime = getNet()->now() + config.getCatchUpTimeoutPeriod();
replyHeartbeatsAndRunUntil(catchupTimeoutTime, [this, time1](const NetworkOpIter noi) {
const RemoteCommandRequest& request = noi->getRequest();
if (request.target.host() == "node2") {
log() << "Black holing heartbeat from " << request.target.host();
getNet()->blackHole(noi);
} else {
getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time1));
}
});
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest optime known via heartbeats"));
auto opCtx = makeOperationContext();
getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
TEST_F(PrimaryCatchUpTest, PrimaryStepsDownBeforeHeartbeatRefreshing) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
// Step down immediately.
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
TopologyCoordinator::UpdateTermResult updateTermResult;
auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult);
ASSERT_TRUE(evh.isValid());
getReplExec()->waitForEvent(evh);
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode"));
ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest"));
ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out"));
auto opCtx = makeOperationContext();
Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringCatchUp) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
// Step down in the middle of catchup.
auto abortTime = getNet()->now() + config.getCatchUpTimeoutPeriod() / 2;
replyHeartbeatsAndRunUntil(abortTime, [this, time2](const NetworkOpIter noi) {
// Other nodes are ahead of me.
getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2));
});
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
TopologyCoordinator::UpdateTermResult updateTermResult;
auto evh = getReplCoord()->updateTerm_forTest(2, &updateTermResult);
ASSERT_TRUE(evh.isValid());
getReplExec()->waitForEvent(evh);
ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
// replyHeartbeatsAndRunUntil(getNet()->now() + config.getCatchUpTimeoutPeriod());
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode"));
ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest"));
ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out"));
auto opCtx = makeOperationContext();
Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
ASSERT_FALSE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
TEST_F(PrimaryCatchUpTest, PrimaryStepsDownDuringDrainMode) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
OpTime time2(Timestamp(100, 2), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
processHeartbeatRequests([this, time2](const NetworkOpIter noi) {
auto net = getNet();
// The old primary accepted one more op and all nodes caught up after voting for me.
net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2));
});
ReplicationCoordinatorImpl* replCoord = getReplCoord();
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
advanceMyLastAppliedOpTime(time2);
ASSERT(replCoord->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Caught up to the latest"));
// Step down during drain mode.
TopologyCoordinator::UpdateTermResult updateTermResult;
auto evh = replCoord->updateTerm_forTest(2, &updateTermResult);
ASSERT_TRUE(evh.isValid());
getReplExec()->waitForEvent(evh);
ASSERT_TRUE(replCoord->getMemberState().secondary());
// Step up again
ASSERT(replCoord->getApplierState() == ApplierState::Running);
simulateSuccessfulV1Voting();
ASSERT_TRUE(replCoord->getMemberState().primary());
// No need to catch up, so we enter drain mode.
processHeartbeatRequests([this, time2](const NetworkOpIter noi) {
auto net = getNet();
net->scheduleResponse(noi, net->now(), makeHeartbeatResponse(time2));
});
ASSERT(replCoord->getApplierState() == ApplierState::Draining);
auto opCtx = makeOperationContext();
{
Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
ASSERT_FALSE(replCoord->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
replCoord->signalDrainComplete(opCtx.get(), replCoord->getTerm());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
ASSERT(replCoord->getApplierState() == ApplierState::Stopped);
ASSERT_TRUE(replCoord->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
TEST_F(PrimaryCatchUpTest, FreshestNodeBecomesAvailableLater) {
OpTime time1(Timestamp(100, 1), 0);
OpTime time2(Timestamp(200, 1), 0);
OpTime time3(Timestamp(300, 1), 0);
OpTime time4(Timestamp(400, 1), 0);
// 1) The primary is at time 1 at the beginning.
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1);
// 2) It cannot see all nodes. It learns of time 3 from one node, but the other isn't available.
// So the target optime is time 3.
startCapturingLogMessages();
auto oneThirdOfTimeout = getNet()->now() + config.getCatchUpTimeoutPeriod() / 3;
replyHeartbeatsAndRunUntil(oneThirdOfTimeout, [this, time3](const NetworkOpIter noi) {
const RemoteCommandRequest& request = noi->getRequest();
if (request.target.host() == "node2") {
auto status = Status(ErrorCodes::HostUnreachable, "Can't reach remote host");
getNet()->scheduleResponse(noi, getNet()->now(), status);
} else {
getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time3));
}
});
// The node is still in catchup mode, but the target optime has been set.
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
stopCapturingLogMessages();
ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime"));
// 3) Advancing its applied optime to time 2 isn't enough.
advanceMyLastAppliedOpTime(time2);
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
// 4) After a while, the other node at time 4 becomes available. Time 4 becomes the new target.
startCapturingLogMessages();
auto twoThirdsOfTimeout = getNet()->now() + config.getCatchUpTimeoutPeriod() * 2 / 3;
replyHeartbeatsAndRunUntil(twoThirdsOfTimeout, [this, time3, time4](const NetworkOpIter noi) {
const RemoteCommandRequest& request = noi->getRequest();
if (request.target.host() == "node2") {
getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time4));
} else {
getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time3));
}
});
// The node is still in catchup mode, but the target optime has been updated.
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
stopCapturingLogMessages();
ASSERT_EQ(1, countLogLinesContaining("Heartbeats updated catchup target optime"));
// 5) Advancing to time 3 isn't enough now.
advanceMyLastAppliedOpTime(time3);
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
// 6) The node catches up time 4 eventually.
startCapturingLogMessages();
advanceMyLastAppliedOpTime(time4);
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQ(1, countLogLinesContaining("Caught up to the latest"));
auto opCtx = makeOperationContext();
getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
TEST_F(PrimaryCatchUpTest, InfiniteTimeoutAndAbort) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
OpTime time2(Timestamp(100, 2), 0);
auto infiniteTimeout = ReplSetConfig::kInfiniteCatchUpTimeout.count();
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1, infiniteTimeout);
// Run time far forward and ensure we are still in catchup mode.
// This is an arbitrary time 'far' into the future.
auto later = getNet()->now() + config.getElectionTimeoutPeriod() * 10;
replyHeartbeatsAndRunUntil(later, [this, &config, time2](const NetworkOpIter noi) {
// Other nodes are ahead of me.
getNet()->scheduleResponse(noi, getNet()->now(), makeHeartbeatResponse(time2));
// Simulate the heartbeats from secondaries to primary to update liveness info.
// TODO(sz): Remove this after merging liveness info and heartbeats.
const RemoteCommandRequest& request = noi->getRequest();
ReplSetHeartbeatArgsV1 hbArgs;
hbArgs.setConfigVersion(config.getConfigVersion());
hbArgs.setSetName(config.getReplSetName());
hbArgs.setSenderHost(request.target);
hbArgs.setSenderId(config.findMemberByHostAndPort(request.target)->getId());
hbArgs.setTerm(getReplCoord()->getTerm());
ASSERT(hbArgs.isInitialized());
ReplSetHeartbeatResponse response;
ASSERT_OK(getReplCoord()->processHeartbeatV1(hbArgs, &response));
});
ASSERT_TRUE(getReplCoord()->getMemberState().primary());
ASSERT(getReplCoord()->getApplierState() == ApplierState::Running);
// Simulate a user initiated abort.
ASSERT_OK(getReplCoord()->abortCatchupIfNeeded());
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Exited primary catch-up mode"));
ASSERT_EQUALS(0, countLogLinesContaining("Caught up to the latest"));
ASSERT_EQUALS(0, countLogLinesContaining("Catchup timed out"));
auto opCtx = makeOperationContext();
getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
TEST_F(PrimaryCatchUpTest, ZeroTimeout) {
startCapturingLogMessages();
OpTime time1(Timestamp(100, 1), 0);
ReplSetConfig config = setUp3NodeReplSetAndRunForElection(time1, 0);
ASSERT(getReplCoord()->getApplierState() == ApplierState::Draining);
stopCapturingLogMessages();
ASSERT_EQUALS(1, countLogLinesContaining("Skipping primary catchup"));
auto opCtx = makeOperationContext();
getReplCoord()->signalDrainComplete(opCtx.get(), getReplCoord()->getTerm());
Lock::GlobalLock lock(opCtx.get(), MODE_IX, UINT_MAX);
ASSERT_TRUE(getReplCoord()->canAcceptWritesForDatabase(opCtx.get(), "test"));
}
} // namespace
} // namespace repl
} // namespace mongo