summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_impl_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl_test.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp4106
1 files changed, 2104 insertions, 2002 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index b625f45fe07..452c07519e1 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -46,7 +46,7 @@
#include "mongo/db/repl/repl_set_heartbeat_args.h"
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/replica_set_config.h"
-#include "mongo/db/repl/replication_coordinator.h" // ReplSetReconfigArgs
+#include "mongo/db/repl/replication_coordinator.h" // ReplSetReconfigArgs
#include "mongo/db/repl/replication_coordinator_external_state_mock.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/replication_coordinator_test_fixture.h"
@@ -67,2039 +67,2141 @@ namespace mongo {
namespace repl {
namespace {
- using executor::NetworkInterfaceMock;
- typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs;
- Status kInterruptedStatus(ErrorCodes::Interrupted, "operation was interrupted");
+using executor::NetworkInterfaceMock;
+typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs;
+Status kInterruptedStatus(ErrorCodes::Interrupted, "operation was interrupted");
+
+// Helper class to wrap Timestamp as an OpTime with term 0.
+struct OpTimeWithTermZero {
+ OpTimeWithTermZero(unsigned int sec, unsigned int i) : timestamp(sec, i) {}
+ operator OpTime() const {
+ return OpTime(timestamp, 0);
+ }
+
+ Timestamp timestamp;
+};
+
+TEST_F(ReplCoordTest, StartupWithValidLocalConfig) {
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345"))),
+ HostAndPort("node1", 12345));
+}
+
+TEST_F(ReplCoordTest, StartupWithConfigMissingSelf) {
+ startCapturingLogMessages();
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345")
+ << BSON("_id" << 2 << "host"
+ << "node2:54321"))),
+ HostAndPort("node3", 12345));
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("NodeNotFound"));
+}
+
+TEST_F(ReplCoordTest, StartupWithLocalConfigSetNameMismatch) {
+ init("mySet");
+ startCapturingLogMessages();
+ assertStartSuccess(BSON("_id"
+ << "notMySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345"))),
+ HostAndPort("node1", 12345));
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1, countLogLinesContaining("reports set name of notMySet,"));
+}
+
+TEST_F(ReplCoordTest, StartupWithNoLocalConfig) {
+ startCapturingLogMessages();
+ start();
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(2, countLogLinesContaining("Did not find local "));
+ ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
+}
+
+TEST_F(ReplCoordTest, InitiateFailsWithEmptyConfig) {
+ OperationContextNoop txn;
+ init("mySet");
+ start(HostAndPort("node1", 12345));
+ BSONObjBuilder result;
+ ASSERT_EQUALS(ErrorCodes::InvalidReplicaSetConfig,
+ getReplCoord()->processReplSetInitiate(&txn, BSONObj(), &result));
+ ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
+}
+
+TEST_F(ReplCoordTest, InitiateSucceedsWithOneNodeConfig) {
+ OperationContextNoop txn;
+ init("mySet");
+ start(HostAndPort("node1", 12345));
+ ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
+
+ // Starting uninitialized, show that we can perform the initiate behavior.
+ BSONObjBuilder result1;
+ ASSERT_OK(
+ getReplCoord()->processReplSetInitiate(&txn,
+ BSON("_id"
+ << "mySet"
+ << "version" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "node1:12345"))),
+ &result1));
+ ASSERT_EQUALS(ReplicationCoordinator::modeReplSet, getReplCoord()->getReplicationMode());
+
+ // Show that initiate fails after it has already succeeded.
+ BSONObjBuilder result2;
+ ASSERT_EQUALS(
+ ErrorCodes::AlreadyInitialized,
+ getReplCoord()->processReplSetInitiate(&txn,
+ BSON("_id"
+ << "mySet"
+ << "version" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "node1:12345"))),
+ &result2));
+
+ // Still in repl set mode, even after failed reinitiate.
+ ASSERT_EQUALS(ReplicationCoordinator::modeReplSet, getReplCoord()->getReplicationMode());
+}
+
+TEST_F(ReplCoordTest, InitiateSucceedsAfterFailing) {
+ OperationContextNoop txn;
+ init("mySet");
+ start(HostAndPort("node1", 12345));
+ BSONObjBuilder result;
+ ASSERT_EQUALS(ErrorCodes::InvalidReplicaSetConfig,
+ getReplCoord()->processReplSetInitiate(&txn, BSONObj(), &result));
+ ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
+
+ // Having failed to initiate once, show that we can now initiate.
+ BSONObjBuilder result1;
+ ASSERT_OK(
+ getReplCoord()->processReplSetInitiate(&txn,
+ BSON("_id"
+ << "mySet"
+ << "version" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "node1:12345"))),
+ &result1));
+ ASSERT_EQUALS(ReplicationCoordinator::modeReplSet, getReplCoord()->getReplicationMode());
+}
+
+TEST_F(ReplCoordTest, InitiateFailsIfAlreadyInitialized) {
+ OperationContextNoop txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345"))),
+ HostAndPort("node1", 12345));
+ BSONObjBuilder result;
+ ASSERT_EQUALS(
+ ErrorCodes::AlreadyInitialized,
+ getReplCoord()->processReplSetInitiate(&txn,
+ BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("_id" << 1 << "host"
+ << "node1:12345"))),
+ &result));
+}
+
+TEST_F(ReplCoordTest, InitiateFailsIfSelfMissing) {
+ OperationContextNoop txn;
+ BSONObjBuilder result;
+ init("mySet");
+ start(HostAndPort("node1", 12345));
+ ASSERT_EQUALS(
+ ErrorCodes::InvalidReplicaSetConfig,
+ getReplCoord()->processReplSetInitiate(&txn,
+ BSON("_id"
+ << "mySet"
+ << "version" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "node4"))),
+ &result));
+}
+
+void doReplSetInitiate(ReplicationCoordinatorImpl* replCoord, Status* status) {
+ OperationContextNoop txn;
+ BSONObjBuilder garbage;
+ *status =
+ replCoord->processReplSetInitiate(&txn,
+ BSON("_id"
+ << "mySet"
+ << "version" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "node1:12345")
+ << BSON("_id" << 1 << "host"
+ << "node2:54321"))),
+ &garbage);
+}
+
+TEST_F(ReplCoordTest, InitiateFailsIfQuorumNotMet) {
+ init("mySet");
+ start(HostAndPort("node1", 12345));
+ ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
+
+ ReplSetHeartbeatArgs hbArgs;
+ hbArgs.setSetName("mySet");
+ hbArgs.setProtocolVersion(1);
+ hbArgs.setConfigVersion(1);
+ hbArgs.setCheckEmpty(true);
+ hbArgs.setSenderHost(HostAndPort("node1", 12345));
+ hbArgs.setSenderId(0);
+
+ Status status(ErrorCodes::InternalError, "Not set");
+ stdx::thread prsiThread(stdx::bind(doReplSetInitiate, getReplCoord(), &status));
+ const Date_t startDate = getNet()->now();
+ getNet()->enterNetwork();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
+ ASSERT_EQUALS(HostAndPort("node2", 54321), noi->getRequest().target);
+ ASSERT_EQUALS("admin", noi->getRequest().dbname);
+ ASSERT_EQUALS(hbArgs.toBSON(), noi->getRequest().cmdObj);
+ getNet()->scheduleResponse(
+ noi, startDate + Milliseconds(10), ResponseStatus(ErrorCodes::NoSuchKey, "No response"));
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
+ ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now());
+ prsiThread.join();
+ ASSERT_EQUALS(ErrorCodes::NodeNotFound, status);
+ ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
+}
+
+TEST_F(ReplCoordTest, InitiatePassesIfQuorumMet) {
+ init("mySet");
+ start(HostAndPort("node1", 12345));
+ ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
+
+ ReplSetHeartbeatArgs hbArgs;
+ hbArgs.setSetName("mySet");
+ hbArgs.setProtocolVersion(1);
+ hbArgs.setConfigVersion(1);
+ hbArgs.setCheckEmpty(true);
+ hbArgs.setSenderHost(HostAndPort("node1", 12345));
+ hbArgs.setSenderId(0);
+
+ Status status(ErrorCodes::InternalError, "Not set");
+ stdx::thread prsiThread(stdx::bind(doReplSetInitiate, getReplCoord(), &status));
+ const Date_t startDate = getNet()->now();
+ getNet()->enterNetwork();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
+ ASSERT_EQUALS(HostAndPort("node2", 54321), noi->getRequest().target);
+ ASSERT_EQUALS("admin", noi->getRequest().dbname);
+ ASSERT_EQUALS(hbArgs.toBSON(), noi->getRequest().cmdObj);
+ ReplSetHeartbeatResponse hbResp;
+ hbResp.setConfigVersion(0);
+ getNet()->scheduleResponse(
+ noi,
+ startDate + Milliseconds(10),
+ ResponseStatus(RemoteCommandResponse(hbResp.toBSON(false), Milliseconds(8))));
+ getNet()->runUntil(startDate + Milliseconds(10));
+ getNet()->exitNetwork();
+ ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now());
+ prsiThread.join();
+ ASSERT_OK(status);
+ ASSERT_EQUALS(ReplicationCoordinator::modeReplSet, getReplCoord()->getReplicationMode());
+}
+
+TEST_F(ReplCoordTest, InitiateFailsWithSetNameMismatch) {
+ OperationContextNoop txn;
+ init("mySet");
+ start(HostAndPort("node1", 12345));
+ ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
+
+ BSONObjBuilder result1;
+ ASSERT_EQUALS(
+ ErrorCodes::InvalidReplicaSetConfig,
+ getReplCoord()->processReplSetInitiate(&txn,
+ BSON("_id"
+ << "wrongSet"
+ << "version" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "node1:12345"))),
+ &result1));
+ ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
+}
+
+TEST_F(ReplCoordTest, InitiateFailsWithoutReplSetFlag) {
+ OperationContextNoop txn;
+ init("");
+ start(HostAndPort("node1", 12345));
+ ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
+
+ BSONObjBuilder result1;
+ ASSERT_EQUALS(
+ ErrorCodes::NoReplicationEnabled,
+ getReplCoord()->processReplSetInitiate(&txn,
+ BSON("_id"
+ << "mySet"
+ << "version" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "node1:12345"))),
+ &result1));
+ ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
+}
+
+TEST_F(ReplCoordTest, InitiateFailsWhileStoringLocalConfigDocument) {
+ OperationContextNoop txn;
+ init("mySet");
+ start(HostAndPort("node1", 12345));
+ ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
+
+ BSONObjBuilder result1;
+ getExternalState()->setStoreLocalConfigDocumentStatus(
+ Status(ErrorCodes::OutOfDiskSpace, "The test set this"));
+ ASSERT_EQUALS(
+ ErrorCodes::OutOfDiskSpace,
+ getReplCoord()->processReplSetInitiate(&txn,
+ BSON("_id"
+ << "mySet"
+ << "version" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "node1:12345"))),
+ &result1));
+ ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
+}
+
+TEST_F(ReplCoordTest, CheckReplEnabledForCommandNotRepl) {
+ // pass in settings to avoid having a replSet
+ ReplSettings settings;
+ init(settings);
+ start();
+
+ // check status NoReplicationEnabled and empty result
+ BSONObjBuilder result;
+ Status status = getReplCoord()->checkReplEnabledForCommand(&result);
+ ASSERT_EQUALS(status, ErrorCodes::NoReplicationEnabled);
+ ASSERT_TRUE(result.obj().isEmpty());
+}
+
+TEST_F(ReplCoordTest, checkReplEnabledForCommandConfigSvr) {
+ ReplSettings settings;
+ serverGlobalParams.configsvr = true;
+ init(settings);
+ start();
+
+ // check status NoReplicationEnabled and result mentions configsrv
+ BSONObjBuilder result;
+ Status status = getReplCoord()->checkReplEnabledForCommand(&result);
+ ASSERT_EQUALS(status, ErrorCodes::NoReplicationEnabled);
+ ASSERT_EQUALS(result.obj()["info"].String(), "configsvr");
+ serverGlobalParams.configsvr = false;
+}
+
+TEST_F(ReplCoordTest, checkReplEnabledForCommandNoConfig) {
+ start();
+
+ // check status NotYetInitialized and result mentions rs.initiate
+ BSONObjBuilder result;
+ Status status = getReplCoord()->checkReplEnabledForCommand(&result);
+ ASSERT_EQUALS(status, ErrorCodes::NotYetInitialized);
+ ASSERT_TRUE(result.obj()["info"].String().find("rs.initiate") != std::string::npos);
+}
+
+TEST_F(ReplCoordTest, checkReplEnabledForCommandWorking) {
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members" << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+ // check status OK and result is empty
+ BSONObjBuilder result;
+ Status status = getReplCoord()->checkReplEnabledForCommand(&result);
+ ASSERT_EQUALS(status, Status::OK());
+ ASSERT_TRUE(result.obj().isEmpty());
+}
+
+TEST_F(ReplCoordTest, BasicRBIDUsage) {
+ start();
+ BSONObjBuilder result;
+ getReplCoord()->processReplSetGetRBID(&result);
+ long long initialValue = result.obj()["rbid"].Int();
+ getReplCoord()->incrementRollbackID();
+
+ BSONObjBuilder result2;
+ getReplCoord()->processReplSetGetRBID(&result2);
+ long long incrementedValue = result2.obj()["rbid"].Int();
+ ASSERT_EQUALS(incrementedValue, initialValue + 1);
+}
+
+TEST_F(ReplCoordTest, AwaitReplicationNoReplEnabled) {
+ init("");
+ OperationContextNoop txn;
+ OpTimeWithTermZero time(100, 1);
+
+ WriteConcernOptions writeConcern;
+ writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
+ writeConcern.wNumNodes = 2;
+
+ // Because we didn't set ReplSettings.replSet, it will think we're a standalone so
+ // awaitReplication will always work.
+ ReplicationCoordinator::StatusAndDuration statusAndDur =
+ getReplCoord()->awaitReplication(&txn, time, writeConcern);
+ ASSERT_OK(statusAndDur.status);
+}
+
+TEST_F(ReplCoordTest, AwaitReplicationMasterSlaveMajorityBaseCase) {
+ ReplSettings settings;
+ settings.master = true;
+ init(settings);
+ OperationContextNoop txn;
+ OpTimeWithTermZero time(100, 1);
+
+ WriteConcernOptions writeConcern;
+ writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
+ writeConcern.wNumNodes = 2;
+
+
+ writeConcern.wNumNodes = 0;
+ writeConcern.wMode = WriteConcernOptions::kMajority;
+ // w:majority always works on master/slave
+ ReplicationCoordinator::StatusAndDuration statusAndDur =
+ getReplCoord()->awaitReplication(&txn, time, writeConcern);
+ ASSERT_OK(statusAndDur.status);
+}
+
+TEST_F(ReplCoordTest, AwaitReplicationReplSetBaseCases) {
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0)
+ << BSON("host"
+ << "node2:12345"
+ << "_id" << 1) << BSON("host"
+ << "node3:12345"
+ << "_id" << 2))),
+ HostAndPort("node1", 12345));
+
+ OperationContextNoop txn;
+ OpTimeWithTermZero time(100, 1);
+
+ WriteConcernOptions writeConcern;
+ writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
+ writeConcern.wNumNodes = 0; // Waiting for 0 nodes always works
+ writeConcern.wMode = "";
+
+ // Should fail when not primary
+ ReplicationCoordinator::StatusAndDuration statusAndDur =
+ getReplCoord()->awaitReplication(&txn, time, writeConcern);
+ ASSERT_EQUALS(ErrorCodes::NotMaster, statusAndDur.status);
+
+ ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 0));
+ simulateSuccessfulElection();
+
+ statusAndDur = getReplCoord()->awaitReplication(&txn, time, writeConcern);
+ ASSERT_OK(statusAndDur.status);
+}
+
+TEST_F(ReplCoordTest, AwaitReplicationNumberOfNodesNonBlocking) {
+ OperationContextNoop txn;
+ assertStartSuccess(
+ BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0)
+ << BSON("host"
+ << "node2:12345"
+ << "_id" << 1) << BSON("host"
+ << "node3:12345"
+ << "_id" << 2) << BSON("host"
+ << "node4:12345"
+ << "_id" << 3))),
+ HostAndPort("node1", 12345));
+ ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 0));
+ simulateSuccessfulElection();
+
+ OpTimeWithTermZero time1(100, 1);
+ OpTimeWithTermZero time2(100, 2);
+
+ WriteConcernOptions writeConcern;
+ writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
+ writeConcern.wNumNodes = 1;
+
+ // 1 node waiting for time 1
+ ReplicationCoordinator::StatusAndDuration statusAndDur =
+ getReplCoord()->awaitReplication(&txn, time1, writeConcern);
+ ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
+ getReplCoord()->setMyLastOptime(time1);
+ statusAndDur = getReplCoord()->awaitReplication(&txn, time1, writeConcern);
+ ASSERT_OK(statusAndDur.status);
+
+ // 2 nodes waiting for time1
+ writeConcern.wNumNodes = 2;
+ statusAndDur = getReplCoord()->awaitReplication(&txn, time1, writeConcern);
+ ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time1));
+ statusAndDur = getReplCoord()->awaitReplication(&txn, time1, writeConcern);
+ ASSERT_OK(statusAndDur.status);
+
+ // 2 nodes waiting for time2
+ statusAndDur = getReplCoord()->awaitReplication(&txn, time2, writeConcern);
+ ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
+ getReplCoord()->setMyLastOptime(time2);
+ statusAndDur = getReplCoord()->awaitReplication(&txn, time2, writeConcern);
+ ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 3, time2));
+ statusAndDur = getReplCoord()->awaitReplication(&txn, time2, writeConcern);
+ ASSERT_OK(statusAndDur.status);
+
+ // 3 nodes waiting for time2
+ writeConcern.wNumNodes = 3;
+ statusAndDur = getReplCoord()->awaitReplication(&txn, time2, writeConcern);
+ ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, time2));
+ statusAndDur = getReplCoord()->awaitReplication(&txn, time2, writeConcern);
+ ASSERT_OK(statusAndDur.status);
+}
+
+TEST_F(ReplCoordTest, AwaitReplicationNamedModesNonBlocking) {
+ OperationContextNoop txn;
+ assertStartSuccess(
+ BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "node0"
+ << "tags" << BSON("dc"
+ << "NA"
+ << "rack"
+ << "rackNA1"))
+ << BSON("_id" << 1 << "host"
+ << "node1"
+ << "tags" << BSON("dc"
+ << "NA"
+ << "rack"
+ << "rackNA2"))
+ << BSON("_id" << 2 << "host"
+ << "node2"
+ << "tags" << BSON("dc"
+ << "NA"
+ << "rack"
+ << "rackNA3"))
+ << BSON("_id" << 3 << "host"
+ << "node3"
+ << "tags" << BSON("dc"
+ << "EU"
+ << "rack"
+ << "rackEU1"))
+ << BSON("_id" << 4 << "host"
+ << "node4"
+ << "tags" << BSON("dc"
+ << "EU"
+ << "rack"
+ << "rackEU2"))) << "settings"
+ << BSON("getLastErrorModes" << BSON("multiDC" << BSON("dc" << 2) << "multiDCAndRack"
+ << BSON("dc" << 2 << "rack" << 3)))),
+ HostAndPort("node0"));
+ ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 0));
+ simulateSuccessfulElection();
+
+ OpTimeWithTermZero time1(100, 1);
+ OpTimeWithTermZero time2(100, 2);
+
+ // Test invalid write concern
+ WriteConcernOptions invalidWriteConcern;
+ invalidWriteConcern.wTimeout = WriteConcernOptions::kNoWaiting;
+ invalidWriteConcern.wMode = "fakemode";
+
+ ReplicationCoordinator::StatusAndDuration statusAndDur =
+ getReplCoord()->awaitReplication(&txn, time1, invalidWriteConcern);
+ ASSERT_EQUALS(ErrorCodes::UnknownReplWriteConcern, statusAndDur.status);
+
+
+ // Set up valid write concerns for the rest of the test
+ WriteConcernOptions majorityWriteConcern;
+ majorityWriteConcern.wTimeout = WriteConcernOptions::kNoWaiting;
+ majorityWriteConcern.wMode = WriteConcernOptions::kMajority;
+
+ WriteConcernOptions multiDCWriteConcern;
+ multiDCWriteConcern.wTimeout = WriteConcernOptions::kNoWaiting;
+ multiDCWriteConcern.wMode = "multiDC";
+
+ WriteConcernOptions multiRackWriteConcern;
+ multiRackWriteConcern.wTimeout = WriteConcernOptions::kNoWaiting;
+ multiRackWriteConcern.wMode = "multiDCAndRack";
+
+
+ // Nothing satisfied
+ getReplCoord()->setMyLastOptime(time1);
+ statusAndDur = getReplCoord()->awaitReplication(&txn, time1, majorityWriteConcern);
+ ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
+ statusAndDur = getReplCoord()->awaitReplication(&txn, time1, multiDCWriteConcern);
+ ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
+ statusAndDur = getReplCoord()->awaitReplication(&txn, time1, multiRackWriteConcern);
+ ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
+
+ // Majority satisfied but not either custom mode
+ getReplCoord()->setLastOptime_forTest(2, 1, time1);
+ getReplCoord()->setLastOptime_forTest(2, 2, time1);
+
+ statusAndDur = getReplCoord()->awaitReplication(&txn, time1, majorityWriteConcern);
+ ASSERT_OK(statusAndDur.status);
+ statusAndDur = getReplCoord()->awaitReplication(&txn, time1, multiDCWriteConcern);
+ ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
+ statusAndDur = getReplCoord()->awaitReplication(&txn, time1, multiRackWriteConcern);
+ ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
+
+ // All modes satisfied
+ getReplCoord()->setLastOptime_forTest(2, 3, time1);
+
+ statusAndDur = getReplCoord()->awaitReplication(&txn, time1, majorityWriteConcern);
+ ASSERT_OK(statusAndDur.status);
+ statusAndDur = getReplCoord()->awaitReplication(&txn, time1, multiDCWriteConcern);
+ ASSERT_OK(statusAndDur.status);
+ statusAndDur = getReplCoord()->awaitReplication(&txn, time1, multiRackWriteConcern);
+ ASSERT_OK(statusAndDur.status);
+
+ // multiDC satisfied but not majority or multiRack
+ getReplCoord()->setMyLastOptime(time2);
+ getReplCoord()->setLastOptime_forTest(2, 3, time2);
+
+ statusAndDur = getReplCoord()->awaitReplication(&txn, time2, majorityWriteConcern);
+ ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
+ statusAndDur = getReplCoord()->awaitReplication(&txn, time2, multiDCWriteConcern);
+ ASSERT_OK(statusAndDur.status);
+ statusAndDur = getReplCoord()->awaitReplication(&txn, time2, multiRackWriteConcern);
+ ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
+}
- // Helper class to wrap Timestamp as an OpTime with term 0.
- struct OpTimeWithTermZero {
- OpTimeWithTermZero(unsigned int sec, unsigned int i) : timestamp(sec, i) { }
- operator OpTime() const { return OpTime(timestamp, 0); }
-
- Timestamp timestamp;
- };
-
- TEST_F(ReplCoordTest, StartupWithValidLocalConfig) {
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("_id" << 1 << "host" << "node1:12345"))),
- HostAndPort("node1", 12345));
- }
-
- TEST_F(ReplCoordTest, StartupWithConfigMissingSelf) {
- startCapturingLogMessages();
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("_id" << 1 << "host" << "node1:12345") <<
- BSON("_id" << 2 << "host" << "node2:54321"))),
- HostAndPort("node3", 12345));
- stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("NodeNotFound"));
- }
-
- TEST_F(ReplCoordTest, StartupWithLocalConfigSetNameMismatch) {
- init("mySet");
- startCapturingLogMessages();
- assertStartSuccess(
- BSON("_id" << "notMySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("_id" << 1 << "host" << "node1:12345"))),
- HostAndPort("node1", 12345));
- stopCapturingLogMessages();
- ASSERT_EQUALS(1, countLogLinesContaining("reports set name of notMySet,"));
- }
-
- TEST_F(ReplCoordTest, StartupWithNoLocalConfig) {
- startCapturingLogMessages();
- start();
- stopCapturingLogMessages();
- ASSERT_EQUALS(2, countLogLinesContaining("Did not find local "));
- ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
- }
-
- TEST_F(ReplCoordTest, InitiateFailsWithEmptyConfig) {
- OperationContextNoop txn;
- init("mySet");
- start(HostAndPort("node1", 12345));
- BSONObjBuilder result;
- ASSERT_EQUALS(ErrorCodes::InvalidReplicaSetConfig,
- getReplCoord()->processReplSetInitiate(&txn, BSONObj(), &result));
- ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
- }
-
- TEST_F(ReplCoordTest, InitiateSucceedsWithOneNodeConfig) {
- OperationContextNoop txn;
- init("mySet");
- start(HostAndPort("node1", 12345));
- ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
-
- // Starting uninitialized, show that we can perform the initiate behavior.
- BSONObjBuilder result1;
- ASSERT_OK(getReplCoord()->processReplSetInitiate(
- &txn,
- BSON("_id" << "mySet" <<
- "version" << 1 <<
- "members" << BSON_ARRAY(
- BSON("_id" << 0 << "host" << "node1:12345"))),
- &result1));
- ASSERT_EQUALS(ReplicationCoordinator::modeReplSet, getReplCoord()->getReplicationMode());
-
- // Show that initiate fails after it has already succeeded.
- BSONObjBuilder result2;
- ASSERT_EQUALS(ErrorCodes::AlreadyInitialized,
- getReplCoord()->processReplSetInitiate(
- &txn,
- BSON("_id" << "mySet" <<
- "version" << 1 <<
- "members" << BSON_ARRAY(
- BSON("_id" << 0 << "host" << "node1:12345"))),
- &result2));
-
- // Still in repl set mode, even after failed reinitiate.
- ASSERT_EQUALS(ReplicationCoordinator::modeReplSet, getReplCoord()->getReplicationMode());
- }
-
- TEST_F(ReplCoordTest, InitiateSucceedsAfterFailing) {
- OperationContextNoop txn;
- init("mySet");
- start(HostAndPort("node1", 12345));
- BSONObjBuilder result;
- ASSERT_EQUALS(ErrorCodes::InvalidReplicaSetConfig,
- getReplCoord()->processReplSetInitiate(&txn, BSONObj(), &result));
- ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
-
- // Having failed to initiate once, show that we can now initiate.
- BSONObjBuilder result1;
- ASSERT_OK(getReplCoord()->processReplSetInitiate(
- &txn,
- BSON("_id" << "mySet" <<
- "version" << 1 <<
- "members" << BSON_ARRAY(
- BSON("_id" << 0 << "host" << "node1:12345"))),
- &result1));
- ASSERT_EQUALS(ReplicationCoordinator::modeReplSet, getReplCoord()->getReplicationMode());
- }
-
- TEST_F(ReplCoordTest, InitiateFailsIfAlreadyInitialized) {
- OperationContextNoop txn;
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("_id" << 1 << "host" << "node1:12345"))),
- HostAndPort("node1", 12345));
- BSONObjBuilder result;
- ASSERT_EQUALS(ErrorCodes::AlreadyInitialized,
- getReplCoord()->processReplSetInitiate(
- &txn,
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("_id" << 1 <<
- "host" << "node1:12345"))),
- &result));
- }
-
- TEST_F(ReplCoordTest, InitiateFailsIfSelfMissing) {
- OperationContextNoop txn;
- BSONObjBuilder result;
- init("mySet");
- start(HostAndPort("node1", 12345));
- ASSERT_EQUALS(ErrorCodes::InvalidReplicaSetConfig,
- getReplCoord()->processReplSetInitiate(
- &txn,
- BSON("_id" << "mySet" <<
- "version" << 1 <<
- "members" << BSON_ARRAY(
- BSON("_id" << 0 << "host" << "node4"))),
- &result));
- }
-
- void doReplSetInitiate(ReplicationCoordinatorImpl* replCoord, Status* status) {
- OperationContextNoop txn;
- BSONObjBuilder garbage;
- *status = replCoord->processReplSetInitiate(
- &txn,
- BSON("_id" << "mySet" <<
- "version" << 1 <<
- "members" << BSON_ARRAY(
- BSON("_id" << 0 << "host" << "node1:12345") <<
- BSON("_id" << 1 << "host" << "node2:54321"))),
- &garbage);
- }
-
- TEST_F(ReplCoordTest, InitiateFailsIfQuorumNotMet) {
- init("mySet");
- start(HostAndPort("node1", 12345));
- ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
-
- ReplSetHeartbeatArgs hbArgs;
- hbArgs.setSetName("mySet");
- hbArgs.setProtocolVersion(1);
- hbArgs.setConfigVersion(1);
- hbArgs.setCheckEmpty(true);
- hbArgs.setSenderHost(HostAndPort("node1", 12345));
- hbArgs.setSenderId(0);
-
- Status status(ErrorCodes::InternalError, "Not set");
- stdx::thread prsiThread(stdx::bind(doReplSetInitiate, getReplCoord(), &status));
- const Date_t startDate = getNet()->now();
- getNet()->enterNetwork();
- const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
- ASSERT_EQUALS(HostAndPort("node2", 54321), noi->getRequest().target);
- ASSERT_EQUALS("admin", noi->getRequest().dbname);
- ASSERT_EQUALS(hbArgs.toBSON(), noi->getRequest().cmdObj);
- getNet()->scheduleResponse(noi, startDate + Milliseconds(10),
- ResponseStatus(ErrorCodes::NoSuchKey, "No response"));
- getNet()->runUntil(startDate + Milliseconds(10));
- getNet()->exitNetwork();
- ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now());
- prsiThread.join();
- ASSERT_EQUALS(ErrorCodes::NodeNotFound, status);
- ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
- }
-
- TEST_F(ReplCoordTest, InitiatePassesIfQuorumMet) {
- init("mySet");
- start(HostAndPort("node1", 12345));
- ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
-
- ReplSetHeartbeatArgs hbArgs;
- hbArgs.setSetName("mySet");
- hbArgs.setProtocolVersion(1);
- hbArgs.setConfigVersion(1);
- hbArgs.setCheckEmpty(true);
- hbArgs.setSenderHost(HostAndPort("node1", 12345));
- hbArgs.setSenderId(0);
-
- Status status(ErrorCodes::InternalError, "Not set");
- stdx::thread prsiThread(stdx::bind(doReplSetInitiate, getReplCoord(), &status));
- const Date_t startDate = getNet()->now();
- getNet()->enterNetwork();
- const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
- ASSERT_EQUALS(HostAndPort("node2", 54321), noi->getRequest().target);
- ASSERT_EQUALS("admin", noi->getRequest().dbname);
- ASSERT_EQUALS(hbArgs.toBSON(), noi->getRequest().cmdObj);
- ReplSetHeartbeatResponse hbResp;
- hbResp.setConfigVersion(0);
- getNet()->scheduleResponse(
- noi,
- startDate + Milliseconds(10),
- ResponseStatus(RemoteCommandResponse(hbResp.toBSON(false), Milliseconds(8))));
- getNet()->runUntil(startDate + Milliseconds(10));
- getNet()->exitNetwork();
- ASSERT_EQUALS(startDate + Milliseconds(10), getNet()->now());
- prsiThread.join();
- ASSERT_OK(status);
- ASSERT_EQUALS(ReplicationCoordinator::modeReplSet, getReplCoord()->getReplicationMode());
- }
-
- TEST_F(ReplCoordTest, InitiateFailsWithSetNameMismatch) {
- OperationContextNoop txn;
- init("mySet");
- start(HostAndPort("node1", 12345));
- ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
-
- BSONObjBuilder result1;
- ASSERT_EQUALS(
- ErrorCodes::InvalidReplicaSetConfig,
- getReplCoord()->processReplSetInitiate(
- &txn,
- BSON("_id" << "wrongSet" <<
- "version" << 1 <<
- "members" << BSON_ARRAY(
- BSON("_id" << 0 << "host" << "node1:12345"))),
- &result1));
- ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
- }
-
- TEST_F(ReplCoordTest, InitiateFailsWithoutReplSetFlag) {
- OperationContextNoop txn;
- init("");
- start(HostAndPort("node1", 12345));
- ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
-
- BSONObjBuilder result1;
- ASSERT_EQUALS(
- ErrorCodes::NoReplicationEnabled,
- getReplCoord()->processReplSetInitiate(
- &txn,
- BSON("_id" << "mySet" <<
- "version" << 1 <<
- "members" << BSON_ARRAY(
- BSON("_id" << 0 << "host" << "node1:12345"))),
- &result1));
- ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
- }
-
- TEST_F(ReplCoordTest, InitiateFailsWhileStoringLocalConfigDocument) {
- OperationContextNoop txn;
- init("mySet");
- start(HostAndPort("node1", 12345));
- ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
-
- BSONObjBuilder result1;
- getExternalState()->setStoreLocalConfigDocumentStatus(Status(ErrorCodes::OutOfDiskSpace,
- "The test set this"));
- ASSERT_EQUALS(
- ErrorCodes::OutOfDiskSpace,
- getReplCoord()->processReplSetInitiate(
- &txn,
- BSON("_id" << "mySet" <<
- "version" << 1 <<
- "members" << BSON_ARRAY(
- BSON("_id" << 0 << "host" << "node1:12345"))),
- &result1));
- ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
- }
-
- TEST_F(ReplCoordTest, CheckReplEnabledForCommandNotRepl) {
- // pass in settings to avoid having a replSet
- ReplSettings settings;
- init(settings);
- start();
-
- // check status NoReplicationEnabled and empty result
- BSONObjBuilder result;
- Status status = getReplCoord()->checkReplEnabledForCommand(&result);
- ASSERT_EQUALS(status, ErrorCodes::NoReplicationEnabled);
- ASSERT_TRUE(result.obj().isEmpty());
- }
-
- TEST_F(ReplCoordTest, checkReplEnabledForCommandConfigSvr) {
- ReplSettings settings;
- serverGlobalParams.configsvr = true;
- init(settings);
- start();
-
- // check status NoReplicationEnabled and result mentions configsrv
- BSONObjBuilder result;
- Status status = getReplCoord()->checkReplEnabledForCommand(&result);
- ASSERT_EQUALS(status, ErrorCodes::NoReplicationEnabled);
- ASSERT_EQUALS(result.obj()["info"].String(), "configsvr");
- serverGlobalParams.configsvr = false;
- }
-
- TEST_F(ReplCoordTest, checkReplEnabledForCommandNoConfig) {
- start();
-
- // check status NotYetInitialized and result mentions rs.initiate
- BSONObjBuilder result;
- Status status = getReplCoord()->checkReplEnabledForCommand(&result);
- ASSERT_EQUALS(status, ErrorCodes::NotYetInitialized);
- ASSERT_TRUE(result.obj()["info"].String().find("rs.initiate") != std::string::npos);
- }
-
- TEST_F(ReplCoordTest, checkReplEnabledForCommandWorking) {
- assertStartSuccess(BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" <<
- "_id" << 0 ))),
- HostAndPort("node1", 12345));
-
- // check status OK and result is empty
- BSONObjBuilder result;
- Status status = getReplCoord()->checkReplEnabledForCommand(&result);
- ASSERT_EQUALS(status, Status::OK());
- ASSERT_TRUE(result.obj().isEmpty());
- }
-
- TEST_F(ReplCoordTest, BasicRBIDUsage) {
- start();
- BSONObjBuilder result;
- getReplCoord()->processReplSetGetRBID(&result);
- long long initialValue = result.obj()["rbid"].Int();
- getReplCoord()->incrementRollbackID();
-
- BSONObjBuilder result2;
- getReplCoord()->processReplSetGetRBID(&result2);
- long long incrementedValue = result2.obj()["rbid"].Int();
- ASSERT_EQUALS(incrementedValue, initialValue + 1);
- }
-
- TEST_F(ReplCoordTest, AwaitReplicationNoReplEnabled) {
- init("");
- OperationContextNoop txn;
- OpTimeWithTermZero time(100, 1);
-
- WriteConcernOptions writeConcern;
- writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
- writeConcern.wNumNodes = 2;
-
- // Because we didn't set ReplSettings.replSet, it will think we're a standalone so
- // awaitReplication will always work.
- ReplicationCoordinator::StatusAndDuration statusAndDur =
- getReplCoord()->awaitReplication(&txn, time, writeConcern);
- ASSERT_OK(statusAndDur.status);
- }
-
- TEST_F(ReplCoordTest, AwaitReplicationMasterSlaveMajorityBaseCase) {
- ReplSettings settings;
- settings.master = true;
- init(settings);
- OperationContextNoop txn;
- OpTimeWithTermZero time(100, 1);
-
- WriteConcernOptions writeConcern;
- writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
- writeConcern.wNumNodes = 2;
-
-
- writeConcern.wNumNodes = 0;
- writeConcern.wMode = WriteConcernOptions::kMajority;
- // w:majority always works on master/slave
- ReplicationCoordinator::StatusAndDuration statusAndDur = getReplCoord()->awaitReplication(
- &txn, time, writeConcern);
- ASSERT_OK(statusAndDur.status);
- }
-
- TEST_F(ReplCoordTest, AwaitReplicationReplSetBaseCases) {
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0) <<
- BSON("host" << "node2:12345" << "_id" << 1) <<
- BSON("host" << "node3:12345" << "_id" << 2))),
- HostAndPort("node1", 12345));
-
- OperationContextNoop txn;
- OpTimeWithTermZero time(100, 1);
-
- WriteConcernOptions writeConcern;
- writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
- writeConcern.wNumNodes = 0; // Waiting for 0 nodes always works
- writeConcern.wMode = "";
-
- // Should fail when not primary
- ReplicationCoordinator::StatusAndDuration statusAndDur = getReplCoord()->awaitReplication(
- &txn, time, writeConcern);
- ASSERT_EQUALS(ErrorCodes::NotMaster, statusAndDur.status);
-
- ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 0));
- simulateSuccessfulElection();
-
- statusAndDur = getReplCoord()->awaitReplication(&txn, time, writeConcern);
- ASSERT_OK(statusAndDur.status);
- }
-
- TEST_F(ReplCoordTest, AwaitReplicationNumberOfNodesNonBlocking) {
- OperationContextNoop txn;
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0) <<
- BSON("host" << "node2:12345" << "_id" << 1) <<
- BSON("host" << "node3:12345" << "_id" << 2) <<
- BSON("host" << "node4:12345" << "_id" << 3))),
- HostAndPort("node1", 12345));
- ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 0));
- simulateSuccessfulElection();
-
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
-
- WriteConcernOptions writeConcern;
- writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
- writeConcern.wNumNodes = 1;
-
- // 1 node waiting for time 1
- ReplicationCoordinator::StatusAndDuration statusAndDur =
- getReplCoord()->awaitReplication(&txn, time1, writeConcern);
- ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
- getReplCoord()->setMyLastOptime(time1);
- statusAndDur = getReplCoord()->awaitReplication(&txn, time1, writeConcern);
- ASSERT_OK(statusAndDur.status);
-
- // 2 nodes waiting for time1
- writeConcern.wNumNodes = 2;
- statusAndDur = getReplCoord()->awaitReplication(&txn, time1, writeConcern);
- ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time1));
- statusAndDur = getReplCoord()->awaitReplication(&txn, time1, writeConcern);
- ASSERT_OK(statusAndDur.status);
-
- // 2 nodes waiting for time2
- statusAndDur = getReplCoord()->awaitReplication(&txn, time2, writeConcern);
- ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
- getReplCoord()->setMyLastOptime(time2);
- statusAndDur = getReplCoord()->awaitReplication(&txn, time2, writeConcern);
- ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 3, time2));
- statusAndDur = getReplCoord()->awaitReplication(&txn, time2, writeConcern);
- ASSERT_OK(statusAndDur.status);
-
- // 3 nodes waiting for time2
- writeConcern.wNumNodes = 3;
- statusAndDur = getReplCoord()->awaitReplication(&txn, time2, writeConcern);
- ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, time2));
- statusAndDur = getReplCoord()->awaitReplication(&txn, time2, writeConcern);
- ASSERT_OK(statusAndDur.status);
- }
-
- TEST_F(ReplCoordTest, AwaitReplicationNamedModesNonBlocking) {
- OperationContextNoop txn;
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("_id" << 0 <<
- "host" << "node0" <<
- "tags" << BSON("dc" << "NA" <<
- "rack" << "rackNA1")) <<
- BSON("_id" << 1 <<
- "host" << "node1" <<
- "tags" << BSON("dc" << "NA" <<
- "rack" << "rackNA2")) <<
- BSON("_id" << 2 <<
- "host" << "node2" <<
- "tags" << BSON("dc" << "NA" <<
- "rack" << "rackNA3")) <<
- BSON("_id" << 3 <<
- "host" << "node3" <<
- "tags" << BSON("dc" << "EU" <<
- "rack" << "rackEU1")) <<
- BSON("_id" << 4 <<
- "host" << "node4" <<
- "tags" << BSON("dc" << "EU" <<
- "rack" << "rackEU2"))) <<
- "settings" << BSON("getLastErrorModes" <<
- BSON("multiDC" << BSON("dc" << 2) <<
- "multiDCAndRack" << BSON("dc" << 2 << "rack" << 3)))),
- HostAndPort("node0"));
- ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 0));
- simulateSuccessfulElection();
-
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
-
- // Test invalid write concern
- WriteConcernOptions invalidWriteConcern;
- invalidWriteConcern.wTimeout = WriteConcernOptions::kNoWaiting;
- invalidWriteConcern.wMode = "fakemode";
-
- ReplicationCoordinator::StatusAndDuration statusAndDur =
- getReplCoord()->awaitReplication(&txn, time1, invalidWriteConcern);
- ASSERT_EQUALS(ErrorCodes::UnknownReplWriteConcern, statusAndDur.status);
-
-
- // Set up valid write concerns for the rest of the test
- WriteConcernOptions majorityWriteConcern;
- majorityWriteConcern.wTimeout = WriteConcernOptions::kNoWaiting;
- majorityWriteConcern.wMode = WriteConcernOptions::kMajority;
-
- WriteConcernOptions multiDCWriteConcern;
- multiDCWriteConcern.wTimeout = WriteConcernOptions::kNoWaiting;
- multiDCWriteConcern.wMode = "multiDC";
-
- WriteConcernOptions multiRackWriteConcern;
- multiRackWriteConcern.wTimeout = WriteConcernOptions::kNoWaiting;
- multiRackWriteConcern.wMode = "multiDCAndRack";
-
-
- // Nothing satisfied
- getReplCoord()->setMyLastOptime(time1);
- statusAndDur = getReplCoord()->awaitReplication(&txn, time1, majorityWriteConcern);
- ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
- statusAndDur = getReplCoord()->awaitReplication(&txn, time1, multiDCWriteConcern);
- ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
- statusAndDur = getReplCoord()->awaitReplication(&txn, time1, multiRackWriteConcern);
- ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
-
- // Majority satisfied but not either custom mode
- getReplCoord()->setLastOptime_forTest(2, 1, time1);
- getReplCoord()->setLastOptime_forTest(2, 2, time1);
-
- statusAndDur = getReplCoord()->awaitReplication(&txn, time1, majorityWriteConcern);
- ASSERT_OK(statusAndDur.status);
- statusAndDur = getReplCoord()->awaitReplication(&txn, time1, multiDCWriteConcern);
- ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
- statusAndDur = getReplCoord()->awaitReplication(&txn, time1, multiRackWriteConcern);
- ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
-
- // All modes satisfied
- getReplCoord()->setLastOptime_forTest(2, 3, time1);
-
- statusAndDur = getReplCoord()->awaitReplication(&txn, time1, majorityWriteConcern);
- ASSERT_OK(statusAndDur.status);
- statusAndDur = getReplCoord()->awaitReplication(&txn, time1, multiDCWriteConcern);
- ASSERT_OK(statusAndDur.status);
- statusAndDur = getReplCoord()->awaitReplication(&txn, time1, multiRackWriteConcern);
- ASSERT_OK(statusAndDur.status);
-
- // multiDC satisfied but not majority or multiRack
- getReplCoord()->setMyLastOptime(time2);
- getReplCoord()->setLastOptime_forTest(2, 3, time2);
-
- statusAndDur = getReplCoord()->awaitReplication(&txn, time2, majorityWriteConcern);
- ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
- statusAndDur = getReplCoord()->awaitReplication(&txn, time2, multiDCWriteConcern);
- ASSERT_OK(statusAndDur.status);
- statusAndDur = getReplCoord()->awaitReplication(&txn, time2, multiRackWriteConcern);
- ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
- }
-
- /**
- * Used to wait for replication in a separate thread without blocking execution of the test.
- * To use, set the optime and write concern to be passed to awaitReplication and then call
- * start(), which will spawn a thread that calls awaitReplication. No calls may be made
- * on the ReplicationAwaiter instance between calling start and getResult(). After returning
- * from getResult(), you can call reset() to allow the awaiter to be reused for another
- * awaitReplication call.
- */
- class ReplicationAwaiter {
- public:
-
- ReplicationAwaiter(ReplicationCoordinatorImpl* replCoord, OperationContext* txn) :
- _replCoord(replCoord), _finished(false),
- _result(ReplicationCoordinator::StatusAndDuration(
- Status::OK(), Milliseconds(0))) {}
-
- void setOpTime(const OpTime& ot) {
- _optime = ot;
- }
-
- void setWriteConcern(const WriteConcernOptions& wc) {
- _writeConcern = wc;
- }
-
- // may block
- ReplicationCoordinator::StatusAndDuration getResult() {
- _thread->join();
- ASSERT(_finished);
- return _result;
- }
-
- void start(OperationContext* txn) {
- ASSERT(!_finished);
- _thread.reset(new stdx::thread(stdx::bind(&ReplicationAwaiter::_awaitReplication,
- this,
- txn)));
- }
-
- void reset() {
- ASSERT(_finished);
- _finished = false;
- _result = ReplicationCoordinator::StatusAndDuration(
- Status::OK(), Milliseconds(0));
- }
-
- private:
-
- void _awaitReplication(OperationContext* txn) {
- _result = _replCoord->awaitReplication(txn, _optime, _writeConcern);
- _finished = true;
- }
-
- ReplicationCoordinatorImpl* _replCoord;
- bool _finished;
- OpTime _optime;
- WriteConcernOptions _writeConcern;
- ReplicationCoordinator::StatusAndDuration _result;
- std::unique_ptr<stdx::thread> _thread;
- };
-
- TEST_F(ReplCoordTest, AwaitReplicationNumberOfNodesBlocking) {
- OperationContextNoop txn;
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0) <<
- BSON("host" << "node2:12345" << "_id" << 1) <<
- BSON("host" << "node3:12345" << "_id" << 2))),
- HostAndPort("node1", 12345));
- ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 0));
- simulateSuccessfulElection();
-
- ReplicationAwaiter awaiter(getReplCoord(), &txn);
-
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
-
- WriteConcernOptions writeConcern;
- writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
- writeConcern.wNumNodes = 2;
-
- // 2 nodes waiting for time1
- awaiter.setOpTime(time1);
- awaiter.setWriteConcern(writeConcern);
- awaiter.start(&txn);
- getReplCoord()->setMyLastOptime(time1);
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time1));
- ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
- ASSERT_OK(statusAndDur.status);
- awaiter.reset();
-
- // 2 nodes waiting for time2
- awaiter.setOpTime(time2);
- awaiter.start(&txn);
- getReplCoord()->setMyLastOptime(time2);
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time2));
- statusAndDur = awaiter.getResult();
- ASSERT_OK(statusAndDur.status);
- awaiter.reset();
-
- // 3 nodes waiting for time2
- writeConcern.wNumNodes = 3;
- awaiter.setWriteConcern(writeConcern);
- awaiter.start(&txn);
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, time2));
- statusAndDur = awaiter.getResult();
- ASSERT_OK(statusAndDur.status);
- awaiter.reset();
- }
-
- TEST_F(ReplCoordTest, AwaitReplicationTimeout) {
- OperationContextNoop txn;
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0) <<
- BSON("host" << "node2:12345" << "_id" << 1) <<
- BSON("host" << "node3:12345" << "_id" << 2))),
- HostAndPort("node1", 12345));
- ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 0));
- simulateSuccessfulElection();
-
- ReplicationAwaiter awaiter(getReplCoord(), &txn);
-
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
-
- WriteConcernOptions writeConcern;
- writeConcern.wTimeout = 50;
- writeConcern.wNumNodes = 2;
-
- // 2 nodes waiting for time2
- awaiter.setOpTime(time2);
- awaiter.setWriteConcern(writeConcern);
- awaiter.start(&txn);
- getReplCoord()->setMyLastOptime(time2);
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time1));
- ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
- ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
- awaiter.reset();
- }
-
- TEST_F(ReplCoordTest, AwaitReplicationShutdown) {
- OperationContextNoop txn;
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0) <<
- BSON("host" << "node2:12345" << "_id" << 1) <<
- BSON("host" << "node3:12345" << "_id" << 2))),
- HostAndPort("node1", 12345));
- ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 0));
- simulateSuccessfulElection();
-
- ReplicationAwaiter awaiter(getReplCoord(), &txn);
-
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
-
- WriteConcernOptions writeConcern;
- writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
- writeConcern.wNumNodes = 2;
-
- // 2 nodes waiting for time2
- awaiter.setOpTime(time2);
- awaiter.setWriteConcern(writeConcern);
- awaiter.start(&txn);
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time1));
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, time1));
- shutdown();
- ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
- ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, statusAndDur.status);
- awaiter.reset();
- }
-
- TEST_F(ReplCoordTest, AwaitReplicationStepDown) {
- // Test that a thread blocked in awaitReplication will be woken up and return NotMaster
- // if the node steps down while it is waiting.
- OperationContextReplMock txn;
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0) <<
- BSON("host" << "node2:12345" << "_id" << 1) <<
- BSON("host" << "node3:12345" << "_id" << 2))),
- HostAndPort("node1", 12345));
- ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 0));
- simulateSuccessfulElection();
-
- ReplicationAwaiter awaiter(getReplCoord(), &txn);
-
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
-
- WriteConcernOptions writeConcern;
- writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
- writeConcern.wNumNodes = 2;
-
- // 2 nodes waiting for time2
- awaiter.setOpTime(time2);
- awaiter.setWriteConcern(writeConcern);
- awaiter.start(&txn);
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time1));
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, time1));
- getReplCoord()->stepDown(&txn, true, Milliseconds(0), Milliseconds(1000));
- ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
- ASSERT_EQUALS(ErrorCodes::NotMaster, statusAndDur.status);
- awaiter.reset();
- }
-
- TEST_F(ReplCoordTest, AwaitReplicationInterrupt) {
- // Tests that a thread blocked in awaitReplication can be killed by a killOp operation
- const unsigned int opID = 100;
- OperationContextReplMock txn{opID};
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "node1") <<
- BSON("_id" << 1 << "host" << "node2") <<
- BSON("_id" << 2 << "host" << "node3"))),
- HostAndPort("node1"));
- ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 0));
- simulateSuccessfulElection();
-
- ReplicationAwaiter awaiter(getReplCoord(), &txn);
-
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
-
- WriteConcernOptions writeConcern;
- writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
- writeConcern.wNumNodes = 2;
-
-
- // 2 nodes waiting for time2
- awaiter.setOpTime(time2);
- awaiter.setWriteConcern(writeConcern);
- awaiter.start(&txn);
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time1));
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, time1));
-
- txn.setCheckForInterruptStatus(kInterruptedStatus);
- getReplCoord()->interrupt(opID);
- ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
- ASSERT_EQUALS(ErrorCodes::Interrupted, statusAndDur.status);
- awaiter.reset();
- }
-
- class StepDownTest : public ReplCoordTest {
- protected:
- OID myRid;
- OID rid2;
- OID rid3;
-
- private:
- virtual void setUp() {
- ReplCoordTest::setUp();
- init("mySet/test1:1234,test2:1234,test3:1234");
-
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 1 <<
- "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234") <<
- BSON("_id" << 1 << "host" << "test2:1234") <<
- BSON("_id" << 2 << "host" << "test3:1234"))),
- HostAndPort("test1", 1234));
- ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- myRid = getReplCoord()->getMyRID();
- }
- };
-
- TEST_F(ReplCoordTest, UpdateTerm) {
+/**
+ * Used to wait for replication in a separate thread without blocking execution of the test.
+ * To use, set the optime and write concern to be passed to awaitReplication and then call
+ * start(), which will spawn a thread that calls awaitReplication. No calls may be made
+ * on the ReplicationAwaiter instance between calling start and getResult(). After returning
+ * from getResult(), you can call reset() to allow the awaiter to be reused for another
+ * awaitReplication call.
+ */
+class ReplicationAwaiter {
+public:
+ ReplicationAwaiter(ReplicationCoordinatorImpl* replCoord, OperationContext* txn)
+ : _replCoord(replCoord),
+ _finished(false),
+ _result(ReplicationCoordinator::StatusAndDuration(Status::OK(), Milliseconds(0))) {}
+
+ void setOpTime(const OpTime& ot) {
+ _optime = ot;
+ }
+
+ void setWriteConcern(const WriteConcernOptions& wc) {
+ _writeConcern = wc;
+ }
+
+ // may block
+ ReplicationCoordinator::StatusAndDuration getResult() {
+ _thread->join();
+ ASSERT(_finished);
+ return _result;
+ }
+
+ void start(OperationContext* txn) {
+ ASSERT(!_finished);
+ _thread.reset(
+ new stdx::thread(stdx::bind(&ReplicationAwaiter::_awaitReplication, this, txn)));
+ }
+
+ void reset() {
+ ASSERT(_finished);
+ _finished = false;
+ _result = ReplicationCoordinator::StatusAndDuration(Status::OK(), Milliseconds(0));
+ }
+
+private:
+ void _awaitReplication(OperationContext* txn) {
+ _result = _replCoord->awaitReplication(txn, _optime, _writeConcern);
+ _finished = true;
+ }
+
+ ReplicationCoordinatorImpl* _replCoord;
+ bool _finished;
+ OpTime _optime;
+ WriteConcernOptions _writeConcern;
+ ReplicationCoordinator::StatusAndDuration _result;
+ std::unique_ptr<stdx::thread> _thread;
+};
+
+TEST_F(ReplCoordTest, AwaitReplicationNumberOfNodesBlocking) {
+ OperationContextNoop txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0)
+ << BSON("host"
+ << "node2:12345"
+ << "_id" << 1) << BSON("host"
+ << "node3:12345"
+ << "_id" << 2))),
+ HostAndPort("node1", 12345));
+ ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 0));
+ simulateSuccessfulElection();
+
+ ReplicationAwaiter awaiter(getReplCoord(), &txn);
+
+ OpTimeWithTermZero time1(100, 1);
+ OpTimeWithTermZero time2(100, 2);
+
+ WriteConcernOptions writeConcern;
+ writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
+ writeConcern.wNumNodes = 2;
+
+ // 2 nodes waiting for time1
+ awaiter.setOpTime(time1);
+ awaiter.setWriteConcern(writeConcern);
+ awaiter.start(&txn);
+ getReplCoord()->setMyLastOptime(time1);
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time1));
+ ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
+ ASSERT_OK(statusAndDur.status);
+ awaiter.reset();
+
+ // 2 nodes waiting for time2
+ awaiter.setOpTime(time2);
+ awaiter.start(&txn);
+ getReplCoord()->setMyLastOptime(time2);
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time2));
+ statusAndDur = awaiter.getResult();
+ ASSERT_OK(statusAndDur.status);
+ awaiter.reset();
+
+ // 3 nodes waiting for time2
+ writeConcern.wNumNodes = 3;
+ awaiter.setWriteConcern(writeConcern);
+ awaiter.start(&txn);
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, time2));
+ statusAndDur = awaiter.getResult();
+ ASSERT_OK(statusAndDur.status);
+ awaiter.reset();
+}
+
+TEST_F(ReplCoordTest, AwaitReplicationTimeout) {
+ OperationContextNoop txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0)
+ << BSON("host"
+ << "node2:12345"
+ << "_id" << 1) << BSON("host"
+ << "node3:12345"
+ << "_id" << 2))),
+ HostAndPort("node1", 12345));
+ ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 0));
+ simulateSuccessfulElection();
+
+ ReplicationAwaiter awaiter(getReplCoord(), &txn);
+
+ OpTimeWithTermZero time1(100, 1);
+ OpTimeWithTermZero time2(100, 2);
+
+ WriteConcernOptions writeConcern;
+ writeConcern.wTimeout = 50;
+ writeConcern.wNumNodes = 2;
+
+ // 2 nodes waiting for time2
+ awaiter.setOpTime(time2);
+ awaiter.setWriteConcern(writeConcern);
+ awaiter.start(&txn);
+ getReplCoord()->setMyLastOptime(time2);
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time1));
+ ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
+ ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
+ awaiter.reset();
+}
+
+TEST_F(ReplCoordTest, AwaitReplicationShutdown) {
+ OperationContextNoop txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0)
+ << BSON("host"
+ << "node2:12345"
+ << "_id" << 1) << BSON("host"
+ << "node3:12345"
+ << "_id" << 2))),
+ HostAndPort("node1", 12345));
+ ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 0));
+ simulateSuccessfulElection();
+
+ ReplicationAwaiter awaiter(getReplCoord(), &txn);
+
+ OpTimeWithTermZero time1(100, 1);
+ OpTimeWithTermZero time2(100, 2);
+
+ WriteConcernOptions writeConcern;
+ writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
+ writeConcern.wNumNodes = 2;
+
+ // 2 nodes waiting for time2
+ awaiter.setOpTime(time2);
+ awaiter.setWriteConcern(writeConcern);
+ awaiter.start(&txn);
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time1));
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, time1));
+ shutdown();
+ ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, statusAndDur.status);
+ awaiter.reset();
+}
+
+TEST_F(ReplCoordTest, AwaitReplicationStepDown) {
+ // Test that a thread blocked in awaitReplication will be woken up and return NotMaster
+ // if the node steps down while it is waiting.
+ OperationContextReplMock txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0)
+ << BSON("host"
+ << "node2:12345"
+ << "_id" << 1) << BSON("host"
+ << "node3:12345"
+ << "_id" << 2))),
+ HostAndPort("node1", 12345));
+ ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 0));
+ simulateSuccessfulElection();
+
+ ReplicationAwaiter awaiter(getReplCoord(), &txn);
+
+ OpTimeWithTermZero time1(100, 1);
+ OpTimeWithTermZero time2(100, 2);
+
+ WriteConcernOptions writeConcern;
+ writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
+ writeConcern.wNumNodes = 2;
+
+ // 2 nodes waiting for time2
+ awaiter.setOpTime(time2);
+ awaiter.setWriteConcern(writeConcern);
+ awaiter.start(&txn);
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time1));
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, time1));
+ getReplCoord()->stepDown(&txn, true, Milliseconds(0), Milliseconds(1000));
+ ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
+ ASSERT_EQUALS(ErrorCodes::NotMaster, statusAndDur.status);
+ awaiter.reset();
+}
+
+TEST_F(ReplCoordTest, AwaitReplicationInterrupt) {
+ // Tests that a thread blocked in awaitReplication can be killed by a killOp operation
+ const unsigned int opID = 100;
+ OperationContextReplMock txn{opID};
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "node1")
+ << BSON("_id" << 1 << "host"
+ << "node2") << BSON("_id" << 2 << "host"
+ << "node3"))),
+ HostAndPort("node1"));
+ ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 0));
+ simulateSuccessfulElection();
+
+ ReplicationAwaiter awaiter(getReplCoord(), &txn);
+
+ OpTimeWithTermZero time1(100, 1);
+ OpTimeWithTermZero time2(100, 2);
+
+ WriteConcernOptions writeConcern;
+ writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
+ writeConcern.wNumNodes = 2;
+
+
+ // 2 nodes waiting for time2
+ awaiter.setOpTime(time2);
+ awaiter.setWriteConcern(writeConcern);
+ awaiter.start(&txn);
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time1));
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, time1));
+
+ txn.setCheckForInterruptStatus(kInterruptedStatus);
+ getReplCoord()->interrupt(opID);
+ ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
+ ASSERT_EQUALS(ErrorCodes::Interrupted, statusAndDur.status);
+ awaiter.reset();
+}
+
+class StepDownTest : public ReplCoordTest {
+protected:
+ OID myRid;
+ OID rid2;
+ OID rid3;
+
+private:
+ virtual void setUp() {
ReplCoordTest::setUp();
init("mySet/test1:1234,test2:1234,test3:1234");
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 1 <<
- "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234") <<
- BSON("_id" << 1 << "host" << "test2:1234") <<
- BSON("_id" << 2 << "host" << "test3:1234")) <<
- "protocolVersion" << 1),
- HostAndPort("test1", 1234));
- getReplCoord()->setMyLastOptime(OpTime(Timestamp (100, 1), 0));
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "test1:1234")
+ << BSON("_id" << 1 << "host"
+ << "test2:1234")
+ << BSON("_id" << 2 << "host"
+ << "test3:1234"))),
+ HostAndPort("test1", 1234));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
-
- simulateSuccessfulV1Election();
-
- ASSERT_EQUALS(1, getReplCoord()->getTerm());
- ASSERT_TRUE(getReplCoord()->getMemberState().primary());
-
- // lower term, no change
- getReplCoord()->updateTerm(0);
- ASSERT_EQUALS(1, getReplCoord()->getTerm());
- ASSERT_TRUE(getReplCoord()->getMemberState().primary());
-
- // same term, no change
- getReplCoord()->updateTerm(1);
- ASSERT_EQUALS(1, getReplCoord()->getTerm());
- ASSERT_TRUE(getReplCoord()->getMemberState().primary());
-
- // higher term, step down and change term
- Handle cbHandle;
- getReplCoord()->updateTerm_forTest(2);
- ASSERT_EQUALS(2, getReplCoord()->getTerm());
- ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
-
- }
-
- TEST_F(StepDownTest, StepDownNotPrimary) {
- OperationContextReplMock txn;
- OpTimeWithTermZero optime1(100, 1);
- // All nodes are caught up
- getReplCoord()->setMyLastOptime(optime1);
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 1, optime1));
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 2, optime1));
-
- Status status = getReplCoord()->stepDown(&txn, false, Milliseconds(0), Milliseconds(0));
- ASSERT_EQUALS(ErrorCodes::NotMaster, status);
- ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
- }
-
- TEST_F(StepDownTest, StepDownTimeoutAcquiringGlobalLock) {
- OperationContextReplMock txn;
- OpTimeWithTermZero optime1(100, 1);
- // All nodes are caught up
- getReplCoord()->setMyLastOptime(optime1);
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 1, optime1));
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 2, optime1));
-
- simulateSuccessfulElection();
-
- // Make sure stepDown cannot grab the global shared lock
- Lock::GlobalWrite lk(txn.lockState());
-
- Status status = getReplCoord()->stepDown(&txn, false, Milliseconds(0), Milliseconds(1000));
- ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, status);
- ASSERT_TRUE(getReplCoord()->getMemberState().primary());
- }
-
- TEST_F(StepDownTest, StepDownNoWaiting) {
- OperationContextReplMock txn;
- OpTimeWithTermZero optime1(100, 1);
- // All nodes are caught up
- getReplCoord()->setMyLastOptime(optime1);
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 1, optime1));
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 2, optime1));
-
- simulateSuccessfulElection();
-
- enterNetwork();
- getNet()->runUntil(getNet()->now() + Seconds(2));
- ASSERT(getNet()->hasReadyRequests());
- NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
- RemoteCommandRequest request = noi->getRequest();
- log() << request.target.toString() << " processing " << request.cmdObj;
- ReplSetHeartbeatArgs hbArgs;
- if (hbArgs.initialize(request.cmdObj).isOK()) {
- ReplSetHeartbeatResponse hbResp;
- hbResp.setSetName(hbArgs.getSetName());
- hbResp.setState(MemberState::RS_SECONDARY);
- hbResp.setConfigVersion(hbArgs.getConfigVersion());
- hbResp.setOpTime(optime1);
- BSONObjBuilder respObj;
- respObj << "ok" << 1;
- hbResp.addToBSON(&respObj, false);
- getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(respObj.obj()));
- }
- while (getNet()->hasReadyRequests()) {
- getNet()->blackHole(getNet()->getNextReadyRequest());
- }
- getNet()->runReadyNetworkOperations();
- exitNetwork();
-
-
- ASSERT_TRUE(getReplCoord()->getMemberState().primary());
- ASSERT_OK(getReplCoord()->stepDown(&txn, false, Milliseconds(0), Milliseconds(1000)));
- enterNetwork(); // So we can safely inspect the topology coordinator
- ASSERT_EQUALS(getNet()->now() + Seconds(1), getTopoCoord().getStepDownTime());
- ASSERT_TRUE(getTopoCoord().getMemberState().secondary());
- exitNetwork();
- ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
- }
-
- TEST_F(ReplCoordTest, StepDownAndBackUpSingleNode) {
- init("mySet");
-
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 1 <<
- "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234"))),
- HostAndPort("test1", 1234));
- OperationContextReplMock txn;
- getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
-
- ASSERT_TRUE(getReplCoord()->getMemberState().primary());
- ASSERT_OK(getReplCoord()->stepDown(&txn, true, Milliseconds(0), Milliseconds(1000)));
- getNet()->enterNetwork(); // Must do this before inspecting the topocoord
- Date_t stepdownUntil = getNet()->now() + Seconds(1);
- ASSERT_EQUALS(stepdownUntil, getTopoCoord().getStepDownTime());
- ASSERT_TRUE(getTopoCoord().getMemberState().secondary());
- ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
-
- // Now run time forward and make sure that the node becomes primary again when the stepdown
- // period ends.
- getNet()->runUntil(stepdownUntil);
- ASSERT_EQUALS(stepdownUntil, getNet()->now());
- ASSERT_TRUE(getTopoCoord().getMemberState().primary());
- getNet()->exitNetwork();
- ASSERT_TRUE(getReplCoord()->getMemberState().primary());
- }
-
- /**
- * Used to run wait for stepDown() to finish in a separate thread without blocking execution of
- * the test. To use, set the values of "force", "waitTime", and "stepDownTime", which will be
- * used as the arguments passed to stepDown, and then call
- * start(), which will spawn a thread that calls stepDown. No calls may be made
- * on the StepDownRunner instance between calling start and getResult(). After returning
- * from getResult(), you can call reset() to allow the StepDownRunner to be reused for another
- * stepDown call.
- */
- class StepDownRunner {
- public:
-
- StepDownRunner(ReplicationCoordinatorImpl* replCoord) :
- _replCoord(replCoord), _finished(false), _result(Status::OK()), _force(false),
- _waitTime(0), _stepDownTime(0) {}
-
- // may block
- Status getResult() {
- _thread->join();
- ASSERT(_finished);
- return _result;
- }
-
- void start(OperationContext* txn) {
- ASSERT(!_finished);
- _thread.reset(new stdx::thread(stdx::bind(&StepDownRunner::_stepDown,
- this,
- txn)));
- }
-
- void reset() {
- ASSERT(_finished);
- _finished = false;
- _result = Status(ErrorCodes::InternalError, "Result Status never set");
- }
-
- void setForce(bool force) {
- _force = force;
- }
-
- void setWaitTime(const Milliseconds& waitTime) {
- _waitTime = waitTime;
- }
-
- void setStepDownTime(const Milliseconds& stepDownTime) {
- _stepDownTime = stepDownTime;
- }
-
- private:
-
- void _stepDown(OperationContext* txn) {
- _result = _replCoord->stepDown(txn, _force, _waitTime, _stepDownTime);
- _finished = true;
- }
-
- ReplicationCoordinatorImpl* _replCoord;
- bool _finished;
- Status _result;
- std::unique_ptr<stdx::thread> _thread;
- bool _force;
- Milliseconds _waitTime;
- Milliseconds _stepDownTime;
- };
-
- TEST_F(StepDownTest, StepDownNotCaughtUp) {
- OperationContextReplMock txn;
- OpTimeWithTermZero optime1(100, 1);
- OpTimeWithTermZero optime2(100, 2);
- // No secondary is caught up
- getReplCoord()->setMyLastOptime(optime2);
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 1, optime1));
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 2, optime1));
-
- // Try to stepDown but time out because no secondaries are caught up
- StepDownRunner runner(getReplCoord());
- runner.setForce(false);
- runner.setWaitTime(Milliseconds(0));
- runner.setStepDownTime(Milliseconds(1000));
-
- simulateSuccessfulElection();
-
- runner.start(&txn);
- Status status = runner.getResult();
- ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, status);
- ASSERT_TRUE(getReplCoord()->getMemberState().primary());
-
- // Now use "force" to force it to step down even though no one is caught up
- runner.reset();
- getNet()->enterNetwork();
- const Date_t startDate = getNet()->now();
- while (startDate + Milliseconds(1000) < getNet()->now()) {
- while (getNet()->hasReadyRequests()) {
- getNet()->blackHole(getNet()->getNextReadyRequest());
- }
- getNet()->runUntil(startDate + Milliseconds(1000));
- }
- getNet()->exitNetwork();
- ASSERT_TRUE(getReplCoord()->getMemberState().primary());
- runner.setForce(true);
- runner.start(&txn);
- status = runner.getResult();
- ASSERT_OK(status);
- ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
-
- }
-
- TEST_F(StepDownTest, StepDownCatchUp) {
- OperationContextReplMock txn;
- OpTimeWithTermZero optime1(100, 1);
- OpTimeWithTermZero optime2(100, 2);
- // No secondary is caught up
- getReplCoord()->setMyLastOptime(optime2);
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 1, optime1));
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 2, optime1));
-
- // stepDown where the secondary actually has to catch up before the stepDown can succeed
- StepDownRunner runner(getReplCoord());
- runner.setForce(false);
- runner.setWaitTime(Milliseconds(10000));
- runner.setStepDownTime(Milliseconds(60000));
-
- simulateSuccessfulElection();
-
- runner.start(&txn);
-
- // Make a secondary actually catch up
- enterNetwork();
- getNet()->runUntil(getNet()->now() + Milliseconds(2000));
- ASSERT(getNet()->hasReadyRequests());
- NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
- RemoteCommandRequest request = noi->getRequest();
- log() << request.target.toString() << " processing " << request.cmdObj;
- ReplSetHeartbeatArgs hbArgs;
- if (hbArgs.initialize(request.cmdObj).isOK()) {
- ReplSetHeartbeatResponse hbResp;
- hbResp.setSetName(hbArgs.getSetName());
- hbResp.setState(MemberState::RS_SECONDARY);
- hbResp.setConfigVersion(hbArgs.getConfigVersion());
- hbResp.setOpTime(optime2);
- BSONObjBuilder respObj;
- respObj << "ok" << 1;
- hbResp.addToBSON(&respObj, false);
- getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(respObj.obj()));
- }
- while (getNet()->hasReadyRequests()) {
- getNet()->blackHole(getNet()->getNextReadyRequest());
- }
- getNet()->runReadyNetworkOperations();
- exitNetwork();
-
- ASSERT_OK(runner.getResult());
- ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
- }
-
- TEST_F(StepDownTest, InterruptStepDown) {
- const unsigned int opID = 100;
- OperationContextReplMock txn{opID};
- OpTimeWithTermZero optime1(100, 1);
- OpTimeWithTermZero optime2(100, 2);
- // No secondary is caught up
- getReplCoord()->setMyLastOptime(optime2);
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 1, optime1));
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 2, optime1));
-
- // stepDown where the secondary actually has to catch up before the stepDown can succeed
- StepDownRunner runner(getReplCoord());
- runner.setForce(false);
- runner.setWaitTime(Milliseconds(10000));
- runner.setStepDownTime(Milliseconds(60000));
-
- simulateSuccessfulElection();
- ASSERT_TRUE(getReplCoord()->getMemberState().primary());
-
- runner.start(&txn);
-
- txn.setCheckForInterruptStatus(kInterruptedStatus);
- getReplCoord()->interrupt(opID);
-
- ASSERT_EQUALS(ErrorCodes::Interrupted, runner.getResult());
- ASSERT_TRUE(getReplCoord()->getMemberState().primary());
- }
+ myRid = getReplCoord()->getMyRID();
+ }
+};
+
+TEST_F(ReplCoordTest, UpdateTerm) {
+ ReplCoordTest::setUp();
+ init("mySet/test1:1234,test2:1234,test3:1234");
+
+ assertStartSuccess(
+ BSON("_id"
+ << "mySet"
+ << "version" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "test1:1234")
+ << BSON("_id" << 1 << "host"
+ << "test2:1234") << BSON("_id" << 2 << "host"
+ << "test3:1234"))
+ << "protocolVersion" << 1),
+ HostAndPort("test1", 1234));
+ getReplCoord()->setMyLastOptime(OpTime(Timestamp(100, 1), 0));
+ ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+
+ simulateSuccessfulV1Election();
+
+ ASSERT_EQUALS(1, getReplCoord()->getTerm());
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+
+ // lower term, no change
+ getReplCoord()->updateTerm(0);
+ ASSERT_EQUALS(1, getReplCoord()->getTerm());
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+
+ // same term, no change
+ getReplCoord()->updateTerm(1);
+ ASSERT_EQUALS(1, getReplCoord()->getTerm());
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+
+ // higher term, step down and change term
+ Handle cbHandle;
+ getReplCoord()->updateTerm_forTest(2);
+ ASSERT_EQUALS(2, getReplCoord()->getTerm());
+ ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+}
+
+TEST_F(StepDownTest, StepDownNotPrimary) {
+ OperationContextReplMock txn;
+ OpTimeWithTermZero optime1(100, 1);
+ // All nodes are caught up
+ getReplCoord()->setMyLastOptime(optime1);
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 1, optime1));
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 2, optime1));
+
+ Status status = getReplCoord()->stepDown(&txn, false, Milliseconds(0), Milliseconds(0));
+ ASSERT_EQUALS(ErrorCodes::NotMaster, status);
+ ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+}
+
+TEST_F(StepDownTest, StepDownTimeoutAcquiringGlobalLock) {
+ OperationContextReplMock txn;
+ OpTimeWithTermZero optime1(100, 1);
+ // All nodes are caught up
+ getReplCoord()->setMyLastOptime(optime1);
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 1, optime1));
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 2, optime1));
+
+ simulateSuccessfulElection();
+
+ // Make sure stepDown cannot grab the global shared lock
+ Lock::GlobalWrite lk(txn.lockState());
+
+ Status status = getReplCoord()->stepDown(&txn, false, Milliseconds(0), Milliseconds(1000));
+ ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, status);
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+}
+
+TEST_F(StepDownTest, StepDownNoWaiting) {
+ OperationContextReplMock txn;
+ OpTimeWithTermZero optime1(100, 1);
+ // All nodes are caught up
+ getReplCoord()->setMyLastOptime(optime1);
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 1, optime1));
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 2, optime1));
+
+ simulateSuccessfulElection();
+
+ enterNetwork();
+ getNet()->runUntil(getNet()->now() + Seconds(2));
+ ASSERT(getNet()->hasReadyRequests());
+ NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
+ RemoteCommandRequest request = noi->getRequest();
+ log() << request.target.toString() << " processing " << request.cmdObj;
+ ReplSetHeartbeatArgs hbArgs;
+ if (hbArgs.initialize(request.cmdObj).isOK()) {
+ ReplSetHeartbeatResponse hbResp;
+ hbResp.setSetName(hbArgs.getSetName());
+ hbResp.setState(MemberState::RS_SECONDARY);
+ hbResp.setConfigVersion(hbArgs.getConfigVersion());
+ hbResp.setOpTime(optime1);
+ BSONObjBuilder respObj;
+ respObj << "ok" << 1;
+ hbResp.addToBSON(&respObj, false);
+ getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(respObj.obj()));
+ }
+ while (getNet()->hasReadyRequests()) {
+ getNet()->blackHole(getNet()->getNextReadyRequest());
+ }
+ getNet()->runReadyNetworkOperations();
+ exitNetwork();
+
+
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+ ASSERT_OK(getReplCoord()->stepDown(&txn, false, Milliseconds(0), Milliseconds(1000)));
+ enterNetwork(); // So we can safely inspect the topology coordinator
+ ASSERT_EQUALS(getNet()->now() + Seconds(1), getTopoCoord().getStepDownTime());
+ ASSERT_TRUE(getTopoCoord().getMemberState().secondary());
+ exitNetwork();
+ ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+}
+
+TEST_F(ReplCoordTest, StepDownAndBackUpSingleNode) {
+ init("mySet");
+
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "test1:1234"))),
+ HostAndPort("test1", 1234));
+ OperationContextReplMock txn;
+ getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
+
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+ ASSERT_OK(getReplCoord()->stepDown(&txn, true, Milliseconds(0), Milliseconds(1000)));
+ getNet()->enterNetwork(); // Must do this before inspecting the topocoord
+ Date_t stepdownUntil = getNet()->now() + Seconds(1);
+ ASSERT_EQUALS(stepdownUntil, getTopoCoord().getStepDownTime());
+ ASSERT_TRUE(getTopoCoord().getMemberState().secondary());
+ ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+
+ // Now run time forward and make sure that the node becomes primary again when the stepdown
+ // period ends.
+ getNet()->runUntil(stepdownUntil);
+ ASSERT_EQUALS(stepdownUntil, getNet()->now());
+ ASSERT_TRUE(getTopoCoord().getMemberState().primary());
+ getNet()->exitNetwork();
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+}
- TEST_F(ReplCoordTest, GetReplicationModeNone) {
- init();
- ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
- }
+/**
+ * Used to run wait for stepDown() to finish in a separate thread without blocking execution of
+ * the test. To use, set the values of "force", "waitTime", and "stepDownTime", which will be
+ * used as the arguments passed to stepDown, and then call
+ * start(), which will spawn a thread that calls stepDown. No calls may be made
+ * on the StepDownRunner instance between calling start and getResult(). After returning
+ * from getResult(), you can call reset() to allow the StepDownRunner to be reused for another
+ * stepDown call.
+ */
+class StepDownRunner {
+public:
+ StepDownRunner(ReplicationCoordinatorImpl* replCoord)
+ : _replCoord(replCoord),
+ _finished(false),
+ _result(Status::OK()),
+ _force(false),
+ _waitTime(0),
+ _stepDownTime(0) {}
- TEST_F(ReplCoordTest, GetReplicationModeMaster) {
- // modeMasterSlave if master set
- ReplSettings settings;
- settings.master = true;
- init(settings);
- ASSERT_EQUALS(ReplicationCoordinator::modeMasterSlave,
- getReplCoord()->getReplicationMode());
+ // may block
+ Status getResult() {
+ _thread->join();
+ ASSERT(_finished);
+ return _result;
}
- TEST_F(ReplCoordTest, GetReplicationModeSlave) {
- // modeMasterSlave if the slave flag was set
- ReplSettings settings;
- settings.slave = SimpleSlave;
- init(settings);
- ASSERT_EQUALS(ReplicationCoordinator::modeMasterSlave,
- getReplCoord()->getReplicationMode());
+ void start(OperationContext* txn) {
+ ASSERT(!_finished);
+ _thread.reset(new stdx::thread(stdx::bind(&StepDownRunner::_stepDown, this, txn)));
}
- TEST_F(ReplCoordTest, GetReplicationModeRepl) {
- // modeReplSet if the set name was supplied.
- ReplSettings settings;
- settings.replSet = "mySet/node1:12345";
- init(settings);
- ASSERT_EQUALS(ReplicationCoordinator::modeReplSet, getReplCoord()->getReplicationMode());
- ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0 ))),
- HostAndPort("node1", 12345));
+ void reset() {
+ ASSERT(_finished);
+ _finished = false;
+ _result = Status(ErrorCodes::InternalError, "Result Status never set");
}
- TEST_F(ReplCoordTest, TestPrepareReplSetUpdatePositionCommand) {
- OperationContextNoop txn;
- init("mySet/test1:1234,test2:1234,test3:1234");
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 1 <<
- "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234") <<
- BSON("_id" << 1 << "host" << "test2:1234") <<
- BSON("_id" << 2 << "host" << "test3:1234"))),
- HostAndPort("test1", 1234));
- OpTimeWithTermZero optime1(100, 1);
- OpTimeWithTermZero optime2(100, 2);
- OpTimeWithTermZero optime3(2, 1);
- getReplCoord()->setMyLastOptime(optime1);
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 1, optime2));
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 2, optime3));
-
- // Check that the proper BSON is generated for the replSetUpdatePositionCommand
- BSONObjBuilder cmdBuilder;
- getReplCoord()->prepareReplSetUpdatePositionCommand(&cmdBuilder);
- BSONObj cmd = cmdBuilder.done();
-
- ASSERT_EQUALS(2, cmd.nFields());
- ASSERT_EQUALS("replSetUpdatePosition", cmd.firstElement().fieldNameStringData());
-
- std::set<long long> memberIds;
- BSONForEach(entryElement, cmd["optimes"].Obj()) {
- BSONObj entry = entryElement.Obj();
- long long memberId = entry["memberId"].Number();
- memberIds.insert(memberId);
- if (memberId == 0) {
- // TODO(siyuan) Update when we change replSetUpdatePosition format
- ASSERT_EQUALS(optime1.timestamp, entry["optime"].timestamp());
- } else if (memberId == 1) {
- ASSERT_EQUALS(optime2.timestamp, entry["optime"].timestamp());
- } else {
- ASSERT_EQUALS(2, memberId);
- ASSERT_EQUALS(optime3.timestamp, entry["optime"].timestamp());
- }
- }
- ASSERT_EQUALS(3U, memberIds.size()); // Make sure we saw all 3 nodes
+ void setForce(bool force) {
+ _force = force;
}
- TEST_F(ReplCoordTest, SetMaintenanceMode) {
- init("mySet/test1:1234,test2:1234,test3:1234");
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 1 <<
- "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "test1:1234") <<
- BSON("_id" << 1 << "host" << "test2:1234") <<
- BSON("_id" << 2 << "host" << "test3:1234"))),
- HostAndPort("test2", 1234));
- OperationContextNoop txn;
- getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
- getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 0));
-
- // Can't unset maintenance mode if it was never set to begin with.
- Status status = getReplCoord()->setMaintenanceMode(false);
- ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
- ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
-
- // valid set
- ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
- ASSERT_TRUE(getReplCoord()->getMemberState().recovering());
-
- // If we go into rollback while in maintenance mode, our state changes to RS_ROLLBACK.
- getReplCoord()->setFollowerMode(MemberState::RS_ROLLBACK);
- ASSERT_TRUE(getReplCoord()->getMemberState().rollback());
-
- // When we go back to SECONDARY, we still observe RECOVERING because of maintenance mode.
- getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
- ASSERT_TRUE(getReplCoord()->getMemberState().recovering());
-
- // Can set multiple times
- ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
- ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
-
- // Need to unset the number of times you set
- ASSERT_OK(getReplCoord()->setMaintenanceMode(false));
- ASSERT_OK(getReplCoord()->setMaintenanceMode(false));
- ASSERT_OK(getReplCoord()->setMaintenanceMode(false));
- status = getReplCoord()->setMaintenanceMode(false);
- // fourth one fails b/c we only set three times
- ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
- // Unsetting maintenance mode changes our state to secondary if maintenance mode was
- // the only thinking keeping us out of it.
- ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
-
- // From rollback, entering and exiting maintenance mode doesn't change perceived
- // state.
- getReplCoord()->setFollowerMode(MemberState::RS_ROLLBACK);
- ASSERT_TRUE(getReplCoord()->getMemberState().rollback());
- ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
- ASSERT_TRUE(getReplCoord()->getMemberState().rollback());
- ASSERT_OK(getReplCoord()->setMaintenanceMode(false));
- ASSERT_TRUE(getReplCoord()->getMemberState().rollback());
-
- // Rollback is sticky even if entered while in maintenance mode.
- getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
- ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
- ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
- ASSERT_TRUE(getReplCoord()->getMemberState().recovering());
- getReplCoord()->setFollowerMode(MemberState::RS_ROLLBACK);
- ASSERT_TRUE(getReplCoord()->getMemberState().rollback());
- ASSERT_OK(getReplCoord()->setMaintenanceMode(false));
- ASSERT_TRUE(getReplCoord()->getMemberState().rollback());
- getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
- ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
-
- // Can't modify maintenance mode when PRIMARY
- simulateSuccessfulElection();
-
- status = getReplCoord()->setMaintenanceMode(true);
- ASSERT_EQUALS(ErrorCodes::NotSecondary, status);
- ASSERT_TRUE(getReplCoord()->getMemberState().primary());
-
- simulateStepDownOnIsolation();
-
- status = getReplCoord()->setMaintenanceMode(false);
- ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
- ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
- ASSERT_OK(getReplCoord()->setMaintenanceMode(false));
+ void setWaitTime(const Milliseconds& waitTime) {
+ _waitTime = waitTime;
}
- TEST_F(ReplCoordTest, GetHostsWrittenToReplSet) {
- HostAndPort myHost("node1:12345");
- HostAndPort client1Host("node2:12345");
- HostAndPort client2Host("node3:12345") ;
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << myHost.toString()) <<
- BSON("_id" << 1 << "host" << client1Host.toString()) <<
- BSON("_id" << 2 << "host" << client2Host.toString()))),
- HostAndPort("node1", 12345));
- OperationContextNoop txn;
-
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
-
- getReplCoord()->setMyLastOptime(time2);
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time1));
-
- std::vector<HostAndPort> caughtUpHosts = getReplCoord()->getHostsWrittenTo(time2);
- ASSERT_EQUALS(1U, caughtUpHosts.size());
- ASSERT_EQUALS(myHost, caughtUpHosts[0]);
-
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, time2));
- caughtUpHosts = getReplCoord()->getHostsWrittenTo(time2);
- ASSERT_EQUALS(2U, caughtUpHosts.size());
- if (myHost == caughtUpHosts[0]) {
- ASSERT_EQUALS(client2Host, caughtUpHosts[1]);
- }
- else {
- ASSERT_EQUALS(client2Host, caughtUpHosts[0]);
- ASSERT_EQUALS(myHost, caughtUpHosts[1]);
- }
+ void setStepDownTime(const Milliseconds& stepDownTime) {
+ _stepDownTime = stepDownTime;
}
- TEST_F(ReplCoordTest, GetHostsWrittenToMasterSlave) {
- ReplSettings settings;
- settings.master = true;
- init(settings);
- HostAndPort clientHost("node2:12345");
- OperationContextNoop txn;
-
- OID client = OID::gen();
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
-
- getExternalState()->setClientHostAndPort(clientHost);
- HandshakeArgs handshake;
- ASSERT_OK(handshake.initialize(BSON("handshake" << client)));
- ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake));
-
- getReplCoord()->setMyLastOptime(time2);
- ASSERT_OK(getReplCoord()->setLastOptimeForSlave(client, time1.timestamp));
-
- std::vector<HostAndPort> caughtUpHosts = getReplCoord()->getHostsWrittenTo(time2);
- ASSERT_EQUALS(0U, caughtUpHosts.size()); // self doesn't get included in master-slave
-
- ASSERT_OK(getReplCoord()->setLastOptimeForSlave(client, time2.timestamp));
- caughtUpHosts = getReplCoord()->getHostsWrittenTo(time2);
- ASSERT_EQUALS(1U, caughtUpHosts.size());
- ASSERT_EQUALS(clientHost, caughtUpHosts[0]);
+private:
+ void _stepDown(OperationContext* txn) {
+ _result = _replCoord->stepDown(txn, _force, _waitTime, _stepDownTime);
+ _finished = true;
}
- TEST_F(ReplCoordTest, GetOtherNodesInReplSetNoConfig) {
- start();
- ASSERT_EQUALS(0U, getReplCoord()->getOtherNodesInReplSet().size());
- }
+ ReplicationCoordinatorImpl* _replCoord;
+ bool _finished;
+ Status _result;
+ std::unique_ptr<stdx::thread> _thread;
+ bool _force;
+ Milliseconds _waitTime;
+ Milliseconds _stepDownTime;
+};
- TEST_F(ReplCoordTest, GetOtherNodesInReplSet) {
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << "h1") <<
- BSON("_id" << 1 << "host" << "h2") <<
- BSON("_id" << 2 <<
- "host" << "h3" <<
- "priority" << 0 <<
- "hidden" << true))),
- HostAndPort("h1"));
-
- std::vector<HostAndPort> otherNodes = getReplCoord()->getOtherNodesInReplSet();
- ASSERT_EQUALS(2U, otherNodes.size());
- if (otherNodes[0] == HostAndPort("h2")) {
- ASSERT_EQUALS(HostAndPort("h3"), otherNodes[1]);
- }
- else {
- ASSERT_EQUALS(HostAndPort("h3"), otherNodes[0]);
- ASSERT_EQUALS(HostAndPort("h2"), otherNodes[0]);
- }
- }
+TEST_F(StepDownTest, StepDownNotCaughtUp) {
+ OperationContextReplMock txn;
+ OpTimeWithTermZero optime1(100, 1);
+ OpTimeWithTermZero optime2(100, 2);
+ // No secondary is caught up
+ getReplCoord()->setMyLastOptime(optime2);
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 1, optime1));
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 2, optime1));
- TEST_F(ReplCoordTest, IsMasterNoConfig) {
- start();
- IsMasterResponse response;
+ // Try to stepDown but time out because no secondaries are caught up
+ StepDownRunner runner(getReplCoord());
+ runner.setForce(false);
+ runner.setWaitTime(Milliseconds(0));
+ runner.setStepDownTime(Milliseconds(1000));
- getReplCoord()->fillIsMasterForReplSet(&response);
- ASSERT_FALSE(response.isConfigSet());
- BSONObj responseObj = response.toBSON();
- ASSERT_FALSE(responseObj["ismaster"].Bool());
- ASSERT_FALSE(responseObj["secondary"].Bool());
- ASSERT_TRUE(responseObj["isreplicaset"].Bool());
- ASSERT_EQUALS("Does not have a valid replica set config", responseObj["info"].String());
+ simulateSuccessfulElection();
- IsMasterResponse roundTripped;
- ASSERT_OK(roundTripped.initialize(response.toBSON()));
- }
+ runner.start(&txn);
+ Status status = runner.getResult();
+ ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, status);
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
- TEST_F(ReplCoordTest, IsMaster) {
- HostAndPort h1("h1");
- HostAndPort h2("h2");
- HostAndPort h3("h3");
- HostAndPort h4("h4");
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("_id" << 0 << "host" << h1.toString()) <<
- BSON("_id" << 1 << "host" << h2.toString()) <<
- BSON("_id" << 2 <<
- "host" << h3.toString() <<
- "arbiterOnly" << true) <<
- BSON("_id" << 3 <<
- "host" << h4.toString() <<
- "priority" << 0 <<
- "tags" << BSON("key1" << "value1" <<
- "key2" << "value2")))),
- h4);
- getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
- ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
-
- IsMasterResponse response;
- getReplCoord()->fillIsMasterForReplSet(&response);
-
- ASSERT_EQUALS("mySet", response.getReplSetName());
- ASSERT_EQUALS(2, response.getReplSetVersion());
- ASSERT_FALSE(response.isMaster());
- ASSERT_TRUE(response.isSecondary());
- // TODO(spencer): test that response includes current primary when there is one.
- ASSERT_FALSE(response.isArbiterOnly());
- ASSERT_TRUE(response.isPassive());
- ASSERT_FALSE(response.isHidden());
- ASSERT_TRUE(response.shouldBuildIndexes());
- ASSERT_EQUALS(Seconds(0), response.getSlaveDelay());
- ASSERT_EQUALS(h4, response.getMe());
-
- std::vector<HostAndPort> hosts = response.getHosts();
- ASSERT_EQUALS(2U, hosts.size());
- if (hosts[0] == h1) {
- ASSERT_EQUALS(h2, hosts[1]);
- }
- else {
- ASSERT_EQUALS(h2, hosts[0]);
- ASSERT_EQUALS(h1, hosts[1]);
+ // Now use "force" to force it to step down even though no one is caught up
+ runner.reset();
+ getNet()->enterNetwork();
+ const Date_t startDate = getNet()->now();
+ while (startDate + Milliseconds(1000) < getNet()->now()) {
+ while (getNet()->hasReadyRequests()) {
+ getNet()->blackHole(getNet()->getNextReadyRequest());
}
- std::vector<HostAndPort> passives = response.getPassives();
- ASSERT_EQUALS(1U, passives.size());
- ASSERT_EQUALS(h4, passives[0]);
- std::vector<HostAndPort> arbiters = response.getArbiters();
- ASSERT_EQUALS(1U, arbiters.size());
- ASSERT_EQUALS(h3, arbiters[0]);
-
- unordered_map<std::string, std::string> tags = response.getTags();
- ASSERT_EQUALS(2U, tags.size());
- ASSERT_EQUALS("value1", tags["key1"]);
- ASSERT_EQUALS("value2", tags["key2"]);
-
- IsMasterResponse roundTripped;
- ASSERT_OK(roundTripped.initialize(response.toBSON()));
- }
-
- TEST_F(ReplCoordTest, ShutDownBeforeStartUpFinished) {
- init();
- startCapturingLogMessages();
- getReplCoord()->shutdown();
- stopCapturingLogMessages();
- ASSERT_EQUALS(1,
- countLogLinesContaining("shutdown() called before startReplication() finished"));
- }
-
- TEST_F(ReplCoordTest, UpdatePositionWithConfigVersionAndMemberIdTest) {
- OperationContextNoop txn;
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0) <<
- BSON("host" << "node2:12345" << "_id" << 1) <<
- BSON("host" << "node3:12345" << "_id" << 2))),
- HostAndPort("node1", 12345));
- ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 0));
- simulateSuccessfulElection();
-
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
- OpTimeWithTermZero staleTime(10, 0);
- getReplCoord()->setMyLastOptime(time1);
-
- WriteConcernOptions writeConcern;
- writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
- writeConcern.wNumNodes = 1;
-
- ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
- getReplCoord()->awaitReplication(&txn, time2, writeConcern).status);
-
- // receive updatePosition containing ourself, should not process the update for self
- UpdatePositionArgs args;
- ASSERT_OK(args.initialize(BSON("replSetUpdatePosition" << 1 <<
- "optimes" << BSON_ARRAY(
- BSON("cfgver" << 2 <<
- "memberId" << 0 <<
- "optime" << time2.timestamp)))));
-
- ASSERT_OK(getReplCoord()->processReplSetUpdatePosition(args, 0));
- ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
- getReplCoord()->awaitReplication(&txn, time2, writeConcern).status);
-
- // receive updatePosition with incorrect config version
- UpdatePositionArgs args2;
- ASSERT_OK(args2.initialize(BSON("replSetUpdatePosition" << 1 <<
- "optimes" << BSON_ARRAY(
- BSON("cfgver" << 3 <<
- "memberId" << 1 <<
- "optime" << time2.timestamp)))));
-
- long long cfgver;
- ASSERT_EQUALS(ErrorCodes::InvalidReplicaSetConfig,
- getReplCoord()->processReplSetUpdatePosition(args2, &cfgver));
- ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
- getReplCoord()->awaitReplication(&txn, time2, writeConcern).status);
-
- // receive updatePosition with nonexistent member id
- UpdatePositionArgs args3;
- ASSERT_OK(args3.initialize(BSON("replSetUpdatePosition" << 1 <<
- "optimes" << BSON_ARRAY(
- BSON("cfgver" << 2 <<
- "memberId" << 9 <<
- "optime" << time2.timestamp)))));
-
- ASSERT_EQUALS(ErrorCodes::NodeNotFound,
- getReplCoord()->processReplSetUpdatePosition(args3, 0));
- ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
- getReplCoord()->awaitReplication(&txn, time2, writeConcern).status);
-
- // receive a good update position
- getReplCoord()->setMyLastOptime(time2);
- UpdatePositionArgs args4;
- ASSERT_OK(args4.initialize(BSON("replSetUpdatePosition" << 1 <<
- "optimes" << BSON_ARRAY(
- BSON("cfgver" << 2 <<
- "memberId" << 1 <<
- "optime" << time2.timestamp) <<
- BSON("cfgver" << 2 <<
- "memberId" << 2 <<
- "optime" << time2.timestamp)))));
-
- ASSERT_OK(getReplCoord()->processReplSetUpdatePosition(args4, 0));
- ASSERT_OK(getReplCoord()->awaitReplication(&txn, time2, writeConcern).status);
-
- writeConcern.wNumNodes = 3;
- ASSERT_OK(getReplCoord()->awaitReplication(&txn, time2, writeConcern).status);
- }
-
- void doReplSetReconfig(ReplicationCoordinatorImpl* replCoord, Status* status) {
- OperationContextNoop txn;
- BSONObjBuilder garbage;
- ReplSetReconfigArgs args;
- args.force = false;
- args.newConfigObj = BSON("_id" << "mySet" <<
- "version" << 3 <<
- "members" << BSON_ARRAY(
- BSON("_id" << 0 <<
- "host" << "node1:12345" <<
- "priority" << 3) <<
- BSON("_id" << 1 << "host" << "node2:12345") <<
- BSON("_id" << 2 << "host" << "node3:12345")));
- *status = replCoord->processReplSetReconfig(&txn, args, &garbage);
- }
-
- TEST_F(ReplCoordTest, AwaitReplicationReconfigSimple) {
- OperationContextNoop txn;
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0) <<
- BSON("host" << "node2:12345" << "_id" << 1) <<
- BSON("host" << "node3:12345" << "_id" << 2))),
- HostAndPort("node1", 12345));
- ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 2));
- simulateSuccessfulElection();
-
- OpTimeWithTermZero time(100, 2);
-
- // 3 nodes waiting for time
- WriteConcernOptions writeConcern;
- writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
- writeConcern.wNumNodes = 3;
-
- ReplicationAwaiter awaiter(getReplCoord(), &txn);
- awaiter.setOpTime(time);
- awaiter.setWriteConcern(writeConcern);
- awaiter.start(&txn);
-
- // reconfig
- Status status(ErrorCodes::InternalError, "Not Set");
- stdx::thread reconfigThread(stdx::bind(doReplSetReconfig, getReplCoord(), &status));
-
- NetworkInterfaceMock* net = getNet();
- getNet()->enterNetwork();
- const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
- const RemoteCommandRequest& request = noi->getRequest();
- repl::ReplSetHeartbeatArgs hbArgs;
- ASSERT_OK(hbArgs.initialize(request.cmdObj));
- repl::ReplSetHeartbeatResponse hbResp;
- hbResp.setSetName("mySet");
- hbResp.setState(MemberState::RS_SECONDARY);
- hbResp.setConfigVersion(2);
- BSONObjBuilder respObj;
- respObj << "ok" << 1;
- hbResp.addToBSON(&respObj, false);
- net->scheduleResponse(noi, net->now(), makeResponseStatus(respObj.obj()));
- net->runReadyNetworkOperations();
- getNet()->exitNetwork();
- reconfigThread.join();
- ASSERT_OK(status);
-
- // satisfy write concern
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(3, 0, time));
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(3, 1, time));
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(3, 2, time));
- ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
- ASSERT_OK(statusAndDur.status);
- awaiter.reset();
- }
-
- void doReplSetReconfigToFewer(ReplicationCoordinatorImpl* replCoord, Status* status) {
- OperationContextNoop txn;
- BSONObjBuilder garbage;
- ReplSetReconfigArgs args;
- args.force = false;
- args.newConfigObj = BSON("_id" << "mySet" <<
- "version" << 3 <<
- "members" << BSON_ARRAY(
- BSON("_id" << 0 << "host" << "node1:12345") <<
- BSON("_id" << 2 << "host" << "node3:12345")));
- *status = replCoord->processReplSetReconfig(&txn, args, &garbage);
- }
-
- TEST_F(ReplCoordTest, AwaitReplicationReconfigNodeCountExceedsNumberOfNodes) {
- OperationContextNoop txn;
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0) <<
- BSON("host" << "node2:12345" << "_id" << 1) <<
- BSON("host" << "node3:12345" << "_id" << 2))),
- HostAndPort("node1", 12345));
- ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 2));
- simulateSuccessfulElection();
-
- OpTimeWithTermZero time(100, 2);
-
- // 3 nodes waiting for time
- WriteConcernOptions writeConcern;
- writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
- writeConcern.wNumNodes = 3;
-
- ReplicationAwaiter awaiter(getReplCoord(), &txn);
- awaiter.setOpTime(time);
- awaiter.setWriteConcern(writeConcern);
- awaiter.start(&txn);
-
- // reconfig to fewer nodes
- Status status(ErrorCodes::InternalError, "Not Set");
- stdx::thread reconfigThread(stdx::bind(doReplSetReconfigToFewer, getReplCoord(), &status));
-
- NetworkInterfaceMock* net = getNet();
- getNet()->enterNetwork();
- const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
- const RemoteCommandRequest& request = noi->getRequest();
- repl::ReplSetHeartbeatArgs hbArgs;
- ASSERT_OK(hbArgs.initialize(request.cmdObj));
- repl::ReplSetHeartbeatResponse hbResp;
- hbResp.setSetName("mySet");
- hbResp.setState(MemberState::RS_SECONDARY);
- hbResp.setConfigVersion(2);
- BSONObjBuilder respObj;
- respObj << "ok" << 1;
- hbResp.addToBSON(&respObj, false);
- net->scheduleResponse(noi, net->now(), makeResponseStatus(respObj.obj()));
- net->runReadyNetworkOperations();
- getNet()->exitNetwork();
- reconfigThread.join();
- ASSERT_OK(status);
- std::cout << "asdf" << std::endl;
-
- // writeconcern feasability should be reevaluated and an error should be returned
- ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
- ASSERT_EQUALS(ErrorCodes::CannotSatisfyWriteConcern, statusAndDur.status);
- awaiter.reset();
- }
-
- TEST_F(ReplCoordTest, AwaitReplicationReconfigToSmallerMajority) {
- OperationContextNoop txn;
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0) <<
- BSON("host" << "node2:12345" << "_id" << 1) <<
- BSON("host" << "node3:12345" << "_id" << 2) <<
- BSON("host" << "node4:12345" << "_id" << 3) <<
- BSON("host" << "node5:12345" << "_id" << 4))),
- HostAndPort("node1", 12345));
- ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 1));
- simulateSuccessfulElection();
-
- OpTimeWithTermZero time(100, 2);
-
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time));
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, time));
-
-
- // majority nodes waiting for time
- WriteConcernOptions writeConcern;
- writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
- writeConcern.wMode = WriteConcernOptions::kMajority;
-
- ReplicationAwaiter awaiter(getReplCoord(), &txn);
- awaiter.setOpTime(time);
- awaiter.setWriteConcern(writeConcern);
- awaiter.start(&txn);
-
- // demonstrate that majority cannot currently be satisfied
- WriteConcernOptions writeConcern2;
- writeConcern2.wTimeout = WriteConcernOptions::kNoWaiting;
- writeConcern2.wMode = WriteConcernOptions::kMajority;
- ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
- getReplCoord()->awaitReplication(&txn, time, writeConcern2).status);
-
- // reconfig to three nodes
- Status status(ErrorCodes::InternalError, "Not Set");
- stdx::thread reconfigThread(stdx::bind(doReplSetReconfig, getReplCoord(), &status));
-
- NetworkInterfaceMock* net = getNet();
- getNet()->enterNetwork();
- const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
- const RemoteCommandRequest& request = noi->getRequest();
- repl::ReplSetHeartbeatArgs hbArgs;
- ASSERT_OK(hbArgs.initialize(request.cmdObj));
- repl::ReplSetHeartbeatResponse hbResp;
- hbResp.setSetName("mySet");
+ getNet()->runUntil(startDate + Milliseconds(1000));
+ }
+ getNet()->exitNetwork();
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+ runner.setForce(true);
+ runner.start(&txn);
+ status = runner.getResult();
+ ASSERT_OK(status);
+ ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+}
+
+TEST_F(StepDownTest, StepDownCatchUp) {
+ OperationContextReplMock txn;
+ OpTimeWithTermZero optime1(100, 1);
+ OpTimeWithTermZero optime2(100, 2);
+ // No secondary is caught up
+ getReplCoord()->setMyLastOptime(optime2);
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 1, optime1));
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 2, optime1));
+
+ // stepDown where the secondary actually has to catch up before the stepDown can succeed
+ StepDownRunner runner(getReplCoord());
+ runner.setForce(false);
+ runner.setWaitTime(Milliseconds(10000));
+ runner.setStepDownTime(Milliseconds(60000));
+
+ simulateSuccessfulElection();
+
+ runner.start(&txn);
+
+ // Make a secondary actually catch up
+ enterNetwork();
+ getNet()->runUntil(getNet()->now() + Milliseconds(2000));
+ ASSERT(getNet()->hasReadyRequests());
+ NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest();
+ RemoteCommandRequest request = noi->getRequest();
+ log() << request.target.toString() << " processing " << request.cmdObj;
+ ReplSetHeartbeatArgs hbArgs;
+ if (hbArgs.initialize(request.cmdObj).isOK()) {
+ ReplSetHeartbeatResponse hbResp;
+ hbResp.setSetName(hbArgs.getSetName());
hbResp.setState(MemberState::RS_SECONDARY);
- hbResp.setConfigVersion(2);
+ hbResp.setConfigVersion(hbArgs.getConfigVersion());
+ hbResp.setOpTime(optime2);
BSONObjBuilder respObj;
respObj << "ok" << 1;
hbResp.addToBSON(&respObj, false);
- net->scheduleResponse(noi, net->now(), makeResponseStatus(respObj.obj()));
- net->runReadyNetworkOperations();
- getNet()->exitNetwork();
- reconfigThread.join();
- ASSERT_OK(status);
-
- // writeconcern feasability should be reevaluated and be satisfied
- ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
- ASSERT_OK(statusAndDur.status);
- awaiter.reset();
- }
-
- TEST_F(ReplCoordTest, AwaitReplicationMajority) {
- // Test that we can satisfy majority write concern can only be
- // statisfied by voting data-bearing members.
- OperationContextNoop txn;
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0) <<
- BSON("host" << "node2:12345" << "_id" << 1) <<
- BSON("host" << "node3:12345" << "_id" << 2) <<
- BSON("host" << "node4:12345" <<
- "_id" << 3 <<
- "votes" << 0) <<
- BSON("host" << "node5:12345" <<
- "_id" << 4 <<
- "arbiterOnly" << true))),
- HostAndPort("node1", 12345));
- ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- OpTimeWithTermZero time(100, 0);
- getReplCoord()->setMyLastOptime(time);
- simulateSuccessfulElection();
-
- WriteConcernOptions majorityWriteConcern;
- majorityWriteConcern.wTimeout = WriteConcernOptions::kNoWaiting;
- majorityWriteConcern.wMode = WriteConcernOptions::kMajority;
-
- ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
- getReplCoord()->awaitReplication(&txn, time, majorityWriteConcern).status);
-
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time));
- ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
- getReplCoord()->awaitReplication(&txn, time, majorityWriteConcern).status);
-
- // this member does not vote and as a result should not count towards write concern
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 3, time));
- ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
- getReplCoord()->awaitReplication(&txn, time, majorityWriteConcern).status);
-
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, time));
- ASSERT_OK(getReplCoord()->awaitReplication(&txn, time, majorityWriteConcern).status);
- }
-
- TEST_F(ReplCoordTest, LastCommittedOpTime) {
- // Test that the commit level advances properly.
- OperationContextNoop txn;
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0) <<
- BSON("host" << "node2:12345" << "_id" << 1) <<
- BSON("host" << "node3:12345" << "_id" << 2) <<
- BSON("host" << "node4:12345" <<
- "_id" << 3 <<
- "votes" << 0) <<
- BSON("host" << "node5:12345" <<
- "_id" << 4 <<
- "arbiterOnly" << true))),
- HostAndPort("node1", 12345));
- ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- OpTimeWithTermZero zero(0, 0);
- OpTimeWithTermZero time(100, 0);
- getReplCoord()->setMyLastOptime(time);
- simulateSuccessfulElection();
-
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time));
- ASSERT_EQUALS((OpTime)zero, getReplCoord()->getLastCommittedOpTime());
-
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 3, time));
- ASSERT_EQUALS((OpTime)zero, getReplCoord()->getLastCommittedOpTime());
-
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, time));
- ASSERT_EQUALS((OpTime)time, getReplCoord()->getLastCommittedOpTime());
-
-
- // Set a new, later OpTime.
- OpTimeWithTermZero newTime = OpTimeWithTermZero(100, 1);
- getReplCoord()->setMyLastOptime(newTime);
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 3, newTime));
- ASSERT_EQUALS((OpTime)time, getReplCoord()->getLastCommittedOpTime());
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, newTime));
- // Reached majority of voting nodes with newTime.
- ASSERT_EQUALS((OpTime)newTime, getReplCoord()->getLastCommittedOpTime());
- ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, newTime));
- ASSERT_EQUALS((OpTime)newTime, getReplCoord()->getLastCommittedOpTime());
- }
-
- TEST_F(ReplCoordTest, CantUseReadAfterIfNotReplSet) {
- init(ReplSettings());
- OperationContextNoop txn;
- auto result = getReplCoord()->waitUntilOpTime(&txn,
- ReadAfterOpTimeArgs(OpTimeWithTermZero(50, 0)));
-
- ASSERT_FALSE(result.didWait());
- ASSERT_EQUALS(ErrorCodes::NotAReplicaSet, result.getStatus());
- }
-
- TEST_F(ReplCoordTest, ReadAfterWhileShutdown) {
- OperationContextNoop txn;
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
- HostAndPort("node1", 12345));
-
- getReplCoord()->setMyLastOptime(OpTimeWithTermZero(10, 0));
-
- shutdown();
-
- auto result = getReplCoord()->waitUntilOpTime(&txn,
- ReadAfterOpTimeArgs(OpTimeWithTermZero(50, 0)));
-
- ASSERT_TRUE(result.didWait());
- ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result.getStatus());
- }
-
- TEST_F(ReplCoordTest, ReadAfterInterrupted) {
- OperationContextReplMock txn;
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
- HostAndPort("node1", 12345));
-
- getReplCoord()->setMyLastOptime(OpTimeWithTermZero(10, 0));
-
- txn.setCheckForInterruptStatus(Status(ErrorCodes::Interrupted, "test"));
-
- auto result = getReplCoord()->waitUntilOpTime(&txn,
- ReadAfterOpTimeArgs(OpTimeWithTermZero(50, 0)));
-
- ASSERT_TRUE(result.didWait());
- ASSERT_EQUALS(ErrorCodes::Interrupted, result.getStatus());
- }
-
- TEST_F(ReplCoordTest, ReadAfterNoOpTime) {
- OperationContextNoop txn;
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
- HostAndPort("node1", 12345));
-
- auto result = getReplCoord()->waitUntilOpTime(&txn, ReadAfterOpTimeArgs());
-
- ASSERT_FALSE(result.didWait());
- ASSERT_OK(result.getStatus());
- }
-
- TEST_F(ReplCoordTest, ReadAfterGreaterOpTime) {
- OperationContextNoop txn;
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
- HostAndPort("node1", 12345));
-
- getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 0));
- auto result = getReplCoord()->waitUntilOpTime(&txn,
- ReadAfterOpTimeArgs(OpTimeWithTermZero(50, 0)));
-
- ASSERT_TRUE(result.didWait());
- ASSERT_OK(result.getStatus());
- }
-
- TEST_F(ReplCoordTest, ReadAfterEqualOpTime) {
- OperationContextNoop txn;
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
- HostAndPort("node1", 12345));
-
-
- OpTimeWithTermZero time(100, 0);
- getReplCoord()->setMyLastOptime(time);
- auto result = getReplCoord()->waitUntilOpTime(&txn, ReadAfterOpTimeArgs(time));
-
- ASSERT_TRUE(result.didWait());
- ASSERT_OK(result.getStatus());
- }
-
- TEST_F(ReplCoordTest, ReadAfterDeferredGreaterOpTime) {
- OperationContextNoop txn;
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
- HostAndPort("node1", 12345));
-
- getReplCoord()->setMyLastOptime(OpTimeWithTermZero(0, 0));
-
- auto pseudoLogOp = std::async(std::launch::async, [this]() {
- // Not guaranteed to be scheduled after waitUnitl blocks...
- getReplCoord()->setMyLastOptime(OpTimeWithTermZero(200, 0));
- });
-
- auto result = getReplCoord()->waitUntilOpTime(&txn,
- ReadAfterOpTimeArgs(OpTimeWithTermZero(100, 0)));
- pseudoLogOp.get();
-
- ASSERT_TRUE(result.didWait());
- ASSERT_OK(result.getStatus());
- }
-
- TEST_F(ReplCoordTest, ReadAfterDeferredEqualOpTime) {
- OperationContextNoop txn;
- assertStartSuccess(
- BSON("_id" << "mySet" <<
- "version" << 2 <<
- "members" << BSON_ARRAY(BSON("host" << "node1:12345" << "_id" << 0))),
- HostAndPort("node1", 12345));
-
- getReplCoord()->setMyLastOptime(OpTimeWithTermZero(0, 0));
-
- OpTimeWithTermZero opTimeToWait(100, 0);
-
- auto pseudoLogOp = std::async(std::launch::async, [this, &opTimeToWait]() {
- // Not guaranteed to be scheduled after waitUnitl blocks...
- getReplCoord()->setMyLastOptime(opTimeToWait);
- });
-
- auto result = getReplCoord()->waitUntilOpTime(&txn, ReadAfterOpTimeArgs(opTimeToWait));
- pseudoLogOp.get();
-
- ASSERT_TRUE(result.didWait());
- ASSERT_OK(result.getStatus());
+ getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus(respObj.obj()));
+ }
+ while (getNet()->hasReadyRequests()) {
+ getNet()->blackHole(getNet()->getNextReadyRequest());
+ }
+ getNet()->runReadyNetworkOperations();
+ exitNetwork();
+
+ ASSERT_OK(runner.getResult());
+ ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+}
+
+TEST_F(StepDownTest, InterruptStepDown) {
+ const unsigned int opID = 100;
+ OperationContextReplMock txn{opID};
+ OpTimeWithTermZero optime1(100, 1);
+ OpTimeWithTermZero optime2(100, 2);
+ // No secondary is caught up
+ getReplCoord()->setMyLastOptime(optime2);
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 1, optime1));
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 2, optime1));
+
+ // stepDown where the secondary actually has to catch up before the stepDown can succeed
+ StepDownRunner runner(getReplCoord());
+ runner.setForce(false);
+ runner.setWaitTime(Milliseconds(10000));
+ runner.setStepDownTime(Milliseconds(60000));
+
+ simulateSuccessfulElection();
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+
+ runner.start(&txn);
+
+ txn.setCheckForInterruptStatus(kInterruptedStatus);
+ getReplCoord()->interrupt(opID);
+
+ ASSERT_EQUALS(ErrorCodes::Interrupted, runner.getResult());
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+}
+
+TEST_F(ReplCoordTest, GetReplicationModeNone) {
+ init();
+ ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
+}
+
+TEST_F(ReplCoordTest, GetReplicationModeMaster) {
+ // modeMasterSlave if master set
+ ReplSettings settings;
+ settings.master = true;
+ init(settings);
+ ASSERT_EQUALS(ReplicationCoordinator::modeMasterSlave, getReplCoord()->getReplicationMode());
+}
+
+TEST_F(ReplCoordTest, GetReplicationModeSlave) {
+ // modeMasterSlave if the slave flag was set
+ ReplSettings settings;
+ settings.slave = SimpleSlave;
+ init(settings);
+ ASSERT_EQUALS(ReplicationCoordinator::modeMasterSlave, getReplCoord()->getReplicationMode());
+}
+
+TEST_F(ReplCoordTest, GetReplicationModeRepl) {
+ // modeReplSet if the set name was supplied.
+ ReplSettings settings;
+ settings.replSet = "mySet/node1:12345";
+ init(settings);
+ ASSERT_EQUALS(ReplicationCoordinator::modeReplSet, getReplCoord()->getReplicationMode());
+ ASSERT_EQUALS(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s);
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members" << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0))),
+ HostAndPort("node1", 12345));
+}
+
+TEST_F(ReplCoordTest, TestPrepareReplSetUpdatePositionCommand) {
+ OperationContextNoop txn;
+ init("mySet/test1:1234,test2:1234,test3:1234");
+ assertStartSuccess(
+ BSON("_id"
+ << "mySet"
+ << "version" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "test1:1234")
+ << BSON("_id" << 1 << "host"
+ << "test2:1234") << BSON("_id" << 2 << "host"
+ << "test3:1234"))),
+ HostAndPort("test1", 1234));
+ OpTimeWithTermZero optime1(100, 1);
+ OpTimeWithTermZero optime2(100, 2);
+ OpTimeWithTermZero optime3(2, 1);
+ getReplCoord()->setMyLastOptime(optime1);
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 1, optime2));
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(1, 2, optime3));
+
+ // Check that the proper BSON is generated for the replSetUpdatePositionCommand
+ BSONObjBuilder cmdBuilder;
+ getReplCoord()->prepareReplSetUpdatePositionCommand(&cmdBuilder);
+ BSONObj cmd = cmdBuilder.done();
+
+ ASSERT_EQUALS(2, cmd.nFields());
+ ASSERT_EQUALS("replSetUpdatePosition", cmd.firstElement().fieldNameStringData());
+
+ std::set<long long> memberIds;
+ BSONForEach(entryElement, cmd["optimes"].Obj()) {
+ BSONObj entry = entryElement.Obj();
+ long long memberId = entry["memberId"].Number();
+ memberIds.insert(memberId);
+ if (memberId == 0) {
+ // TODO(siyuan) Update when we change replSetUpdatePosition format
+ ASSERT_EQUALS(optime1.timestamp, entry["optime"].timestamp());
+ } else if (memberId == 1) {
+ ASSERT_EQUALS(optime2.timestamp, entry["optime"].timestamp());
+ } else {
+ ASSERT_EQUALS(2, memberId);
+ ASSERT_EQUALS(optime3.timestamp, entry["optime"].timestamp());
+ }
}
-
- // TODO(schwerin): Unit test election id updating
+ ASSERT_EQUALS(3U, memberIds.size()); // Make sure we saw all 3 nodes
+}
+
+TEST_F(ReplCoordTest, SetMaintenanceMode) {
+ init("mySet/test1:1234,test2:1234,test3:1234");
+ assertStartSuccess(
+ BSON("_id"
+ << "mySet"
+ << "version" << 1 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "test1:1234")
+ << BSON("_id" << 1 << "host"
+ << "test2:1234") << BSON("_id" << 2 << "host"
+ << "test3:1234"))),
+ HostAndPort("test2", 1234));
+ OperationContextNoop txn;
+ getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
+ getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 0));
+
+ // Can't unset maintenance mode if it was never set to begin with.
+ Status status = getReplCoord()->setMaintenanceMode(false);
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
+ ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+
+ // valid set
+ ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
+ ASSERT_TRUE(getReplCoord()->getMemberState().recovering());
+
+ // If we go into rollback while in maintenance mode, our state changes to RS_ROLLBACK.
+ getReplCoord()->setFollowerMode(MemberState::RS_ROLLBACK);
+ ASSERT_TRUE(getReplCoord()->getMemberState().rollback());
+
+ // When we go back to SECONDARY, we still observe RECOVERING because of maintenance mode.
+ getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
+ ASSERT_TRUE(getReplCoord()->getMemberState().recovering());
+
+ // Can set multiple times
+ ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
+ ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
+
+ // Need to unset the number of times you set
+ ASSERT_OK(getReplCoord()->setMaintenanceMode(false));
+ ASSERT_OK(getReplCoord()->setMaintenanceMode(false));
+ ASSERT_OK(getReplCoord()->setMaintenanceMode(false));
+ status = getReplCoord()->setMaintenanceMode(false);
+ // fourth one fails b/c we only set three times
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
+ // Unsetting maintenance mode changes our state to secondary if maintenance mode was
+ // the only thinking keeping us out of it.
+ ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+
+ // From rollback, entering and exiting maintenance mode doesn't change perceived
+ // state.
+ getReplCoord()->setFollowerMode(MemberState::RS_ROLLBACK);
+ ASSERT_TRUE(getReplCoord()->getMemberState().rollback());
+ ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
+ ASSERT_TRUE(getReplCoord()->getMemberState().rollback());
+ ASSERT_OK(getReplCoord()->setMaintenanceMode(false));
+ ASSERT_TRUE(getReplCoord()->getMemberState().rollback());
+
+ // Rollback is sticky even if entered while in maintenance mode.
+ getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
+ ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+ ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
+ ASSERT_TRUE(getReplCoord()->getMemberState().recovering());
+ getReplCoord()->setFollowerMode(MemberState::RS_ROLLBACK);
+ ASSERT_TRUE(getReplCoord()->getMemberState().rollback());
+ ASSERT_OK(getReplCoord()->setMaintenanceMode(false));
+ ASSERT_TRUE(getReplCoord()->getMemberState().rollback());
+ getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
+ ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+
+ // Can't modify maintenance mode when PRIMARY
+ simulateSuccessfulElection();
+
+ status = getReplCoord()->setMaintenanceMode(true);
+ ASSERT_EQUALS(ErrorCodes::NotSecondary, status);
+ ASSERT_TRUE(getReplCoord()->getMemberState().primary());
+
+ simulateStepDownOnIsolation();
+
+ status = getReplCoord()->setMaintenanceMode(false);
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, status);
+ ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
+ ASSERT_OK(getReplCoord()->setMaintenanceMode(false));
+}
+
+TEST_F(ReplCoordTest, GetHostsWrittenToReplSet) {
+ HostAndPort myHost("node1:12345");
+ HostAndPort client1Host("node2:12345");
+ HostAndPort client2Host("node3:12345");
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host" << myHost.toString())
+ << BSON("_id" << 1 << "host" << client1Host.toString())
+ << BSON("_id" << 2 << "host" << client2Host.toString()))),
+ HostAndPort("node1", 12345));
+ OperationContextNoop txn;
+
+ OpTimeWithTermZero time1(100, 1);
+ OpTimeWithTermZero time2(100, 2);
+
+ getReplCoord()->setMyLastOptime(time2);
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time1));
+
+ std::vector<HostAndPort> caughtUpHosts = getReplCoord()->getHostsWrittenTo(time2);
+ ASSERT_EQUALS(1U, caughtUpHosts.size());
+ ASSERT_EQUALS(myHost, caughtUpHosts[0]);
+
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, time2));
+ caughtUpHosts = getReplCoord()->getHostsWrittenTo(time2);
+ ASSERT_EQUALS(2U, caughtUpHosts.size());
+ if (myHost == caughtUpHosts[0]) {
+ ASSERT_EQUALS(client2Host, caughtUpHosts[1]);
+ } else {
+ ASSERT_EQUALS(client2Host, caughtUpHosts[0]);
+ ASSERT_EQUALS(myHost, caughtUpHosts[1]);
+ }
+}
+
+TEST_F(ReplCoordTest, GetHostsWrittenToMasterSlave) {
+ ReplSettings settings;
+ settings.master = true;
+ init(settings);
+ HostAndPort clientHost("node2:12345");
+ OperationContextNoop txn;
+
+ OID client = OID::gen();
+ OpTimeWithTermZero time1(100, 1);
+ OpTimeWithTermZero time2(100, 2);
+
+ getExternalState()->setClientHostAndPort(clientHost);
+ HandshakeArgs handshake;
+ ASSERT_OK(handshake.initialize(BSON("handshake" << client)));
+ ASSERT_OK(getReplCoord()->processHandshake(&txn, handshake));
+
+ getReplCoord()->setMyLastOptime(time2);
+ ASSERT_OK(getReplCoord()->setLastOptimeForSlave(client, time1.timestamp));
+
+ std::vector<HostAndPort> caughtUpHosts = getReplCoord()->getHostsWrittenTo(time2);
+ ASSERT_EQUALS(0U, caughtUpHosts.size()); // self doesn't get included in master-slave
+
+ ASSERT_OK(getReplCoord()->setLastOptimeForSlave(client, time2.timestamp));
+ caughtUpHosts = getReplCoord()->getHostsWrittenTo(time2);
+ ASSERT_EQUALS(1U, caughtUpHosts.size());
+ ASSERT_EQUALS(clientHost, caughtUpHosts[0]);
+}
+
+TEST_F(ReplCoordTest, GetOtherNodesInReplSetNoConfig) {
+ start();
+ ASSERT_EQUALS(0U, getReplCoord()->getOtherNodesInReplSet().size());
+}
+
+TEST_F(ReplCoordTest, GetOtherNodesInReplSet) {
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "h1")
+ << BSON("_id" << 1 << "host"
+ << "h2")
+ << BSON("_id" << 2 << "host"
+ << "h3"
+ << "priority" << 0 << "hidden" << true))),
+ HostAndPort("h1"));
+
+ std::vector<HostAndPort> otherNodes = getReplCoord()->getOtherNodesInReplSet();
+ ASSERT_EQUALS(2U, otherNodes.size());
+ if (otherNodes[0] == HostAndPort("h2")) {
+ ASSERT_EQUALS(HostAndPort("h3"), otherNodes[1]);
+ } else {
+ ASSERT_EQUALS(HostAndPort("h3"), otherNodes[0]);
+ ASSERT_EQUALS(HostAndPort("h2"), otherNodes[0]);
+ }
+}
+
+TEST_F(ReplCoordTest, IsMasterNoConfig) {
+ start();
+ IsMasterResponse response;
+
+ getReplCoord()->fillIsMasterForReplSet(&response);
+ ASSERT_FALSE(response.isConfigSet());
+ BSONObj responseObj = response.toBSON();
+ ASSERT_FALSE(responseObj["ismaster"].Bool());
+ ASSERT_FALSE(responseObj["secondary"].Bool());
+ ASSERT_TRUE(responseObj["isreplicaset"].Bool());
+ ASSERT_EQUALS("Does not have a valid replica set config", responseObj["info"].String());
+
+ IsMasterResponse roundTripped;
+ ASSERT_OK(roundTripped.initialize(response.toBSON()));
+}
+
+TEST_F(ReplCoordTest, IsMaster) {
+ HostAndPort h1("h1");
+ HostAndPort h2("h2");
+ HostAndPort h3("h3");
+ HostAndPort h4("h4");
+ assertStartSuccess(
+ BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host" << h1.toString())
+ << BSON("_id" << 1 << "host" << h2.toString())
+ << BSON("_id" << 2 << "host" << h3.toString() << "arbiterOnly" << true)
+ << BSON("_id" << 3 << "host" << h4.toString() << "priority" << 0
+ << "tags" << BSON("key1"
+ << "value1"
+ << "key2"
+ << "value2")))),
+ h4);
+ getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
+ ASSERT_TRUE(getReplCoord()->getMemberState().secondary());
+
+ IsMasterResponse response;
+ getReplCoord()->fillIsMasterForReplSet(&response);
+
+ ASSERT_EQUALS("mySet", response.getReplSetName());
+ ASSERT_EQUALS(2, response.getReplSetVersion());
+ ASSERT_FALSE(response.isMaster());
+ ASSERT_TRUE(response.isSecondary());
+ // TODO(spencer): test that response includes current primary when there is one.
+ ASSERT_FALSE(response.isArbiterOnly());
+ ASSERT_TRUE(response.isPassive());
+ ASSERT_FALSE(response.isHidden());
+ ASSERT_TRUE(response.shouldBuildIndexes());
+ ASSERT_EQUALS(Seconds(0), response.getSlaveDelay());
+ ASSERT_EQUALS(h4, response.getMe());
+
+ std::vector<HostAndPort> hosts = response.getHosts();
+ ASSERT_EQUALS(2U, hosts.size());
+ if (hosts[0] == h1) {
+ ASSERT_EQUALS(h2, hosts[1]);
+ } else {
+ ASSERT_EQUALS(h2, hosts[0]);
+ ASSERT_EQUALS(h1, hosts[1]);
+ }
+ std::vector<HostAndPort> passives = response.getPassives();
+ ASSERT_EQUALS(1U, passives.size());
+ ASSERT_EQUALS(h4, passives[0]);
+ std::vector<HostAndPort> arbiters = response.getArbiters();
+ ASSERT_EQUALS(1U, arbiters.size());
+ ASSERT_EQUALS(h3, arbiters[0]);
+
+ unordered_map<std::string, std::string> tags = response.getTags();
+ ASSERT_EQUALS(2U, tags.size());
+ ASSERT_EQUALS("value1", tags["key1"]);
+ ASSERT_EQUALS("value2", tags["key2"]);
+
+ IsMasterResponse roundTripped;
+ ASSERT_OK(roundTripped.initialize(response.toBSON()));
+}
+
+TEST_F(ReplCoordTest, ShutDownBeforeStartUpFinished) {
+ init();
+ startCapturingLogMessages();
+ getReplCoord()->shutdown();
+ stopCapturingLogMessages();
+ ASSERT_EQUALS(1,
+ countLogLinesContaining("shutdown() called before startReplication() finished"));
+}
+
+TEST_F(ReplCoordTest, UpdatePositionWithConfigVersionAndMemberIdTest) {
+ OperationContextNoop txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0)
+ << BSON("host"
+ << "node2:12345"
+ << "_id" << 1) << BSON("host"
+ << "node3:12345"
+ << "_id" << 2))),
+ HostAndPort("node1", 12345));
+ ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 0));
+ simulateSuccessfulElection();
+
+ OpTimeWithTermZero time1(100, 1);
+ OpTimeWithTermZero time2(100, 2);
+ OpTimeWithTermZero staleTime(10, 0);
+ getReplCoord()->setMyLastOptime(time1);
+
+ WriteConcernOptions writeConcern;
+ writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
+ writeConcern.wNumNodes = 1;
+
+ ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
+ getReplCoord()->awaitReplication(&txn, time2, writeConcern).status);
+
+ // receive updatePosition containing ourself, should not process the update for self
+ UpdatePositionArgs args;
+ ASSERT_OK(args.initialize(BSON("replSetUpdatePosition"
+ << 1 << "optimes"
+ << BSON_ARRAY(BSON("cfgver" << 2 << "memberId" << 0 << "optime"
+ << time2.timestamp)))));
+
+ ASSERT_OK(getReplCoord()->processReplSetUpdatePosition(args, 0));
+ ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
+ getReplCoord()->awaitReplication(&txn, time2, writeConcern).status);
+
+ // receive updatePosition with incorrect config version
+ UpdatePositionArgs args2;
+ ASSERT_OK(args2.initialize(BSON("replSetUpdatePosition"
+ << 1 << "optimes"
+ << BSON_ARRAY(BSON("cfgver" << 3 << "memberId" << 1 << "optime"
+ << time2.timestamp)))));
+
+ long long cfgver;
+ ASSERT_EQUALS(ErrorCodes::InvalidReplicaSetConfig,
+ getReplCoord()->processReplSetUpdatePosition(args2, &cfgver));
+ ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
+ getReplCoord()->awaitReplication(&txn, time2, writeConcern).status);
+
+ // receive updatePosition with nonexistent member id
+ UpdatePositionArgs args3;
+ ASSERT_OK(args3.initialize(BSON("replSetUpdatePosition"
+ << 1 << "optimes"
+ << BSON_ARRAY(BSON("cfgver" << 2 << "memberId" << 9 << "optime"
+ << time2.timestamp)))));
+
+ ASSERT_EQUALS(ErrorCodes::NodeNotFound, getReplCoord()->processReplSetUpdatePosition(args3, 0));
+ ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
+ getReplCoord()->awaitReplication(&txn, time2, writeConcern).status);
+
+ // receive a good update position
+ getReplCoord()->setMyLastOptime(time2);
+ UpdatePositionArgs args4;
+ ASSERT_OK(args4.initialize(
+ BSON("replSetUpdatePosition"
+ << 1 << "optimes"
+ << BSON_ARRAY(
+ BSON("cfgver" << 2 << "memberId" << 1 << "optime" << time2.timestamp)
+ << BSON("cfgver" << 2 << "memberId" << 2 << "optime" << time2.timestamp)))));
+
+ ASSERT_OK(getReplCoord()->processReplSetUpdatePosition(args4, 0));
+ ASSERT_OK(getReplCoord()->awaitReplication(&txn, time2, writeConcern).status);
+
+ writeConcern.wNumNodes = 3;
+ ASSERT_OK(getReplCoord()->awaitReplication(&txn, time2, writeConcern).status);
+}
+
+void doReplSetReconfig(ReplicationCoordinatorImpl* replCoord, Status* status) {
+ OperationContextNoop txn;
+ BSONObjBuilder garbage;
+ ReplSetReconfigArgs args;
+ args.force = false;
+ args.newConfigObj = BSON("_id"
+ << "mySet"
+ << "version" << 3 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "node1:12345"
+ << "priority" << 3)
+ << BSON("_id" << 1 << "host"
+ << "node2:12345")
+ << BSON("_id" << 2 << "host"
+ << "node3:12345")));
+ *status = replCoord->processReplSetReconfig(&txn, args, &garbage);
+}
+
+TEST_F(ReplCoordTest, AwaitReplicationReconfigSimple) {
+ OperationContextNoop txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0)
+ << BSON("host"
+ << "node2:12345"
+ << "_id" << 1) << BSON("host"
+ << "node3:12345"
+ << "_id" << 2))),
+ HostAndPort("node1", 12345));
+ ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 2));
+ simulateSuccessfulElection();
+
+ OpTimeWithTermZero time(100, 2);
+
+ // 3 nodes waiting for time
+ WriteConcernOptions writeConcern;
+ writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
+ writeConcern.wNumNodes = 3;
+
+ ReplicationAwaiter awaiter(getReplCoord(), &txn);
+ awaiter.setOpTime(time);
+ awaiter.setWriteConcern(writeConcern);
+ awaiter.start(&txn);
+
+ // reconfig
+ Status status(ErrorCodes::InternalError, "Not Set");
+ stdx::thread reconfigThread(stdx::bind(doReplSetReconfig, getReplCoord(), &status));
+
+ NetworkInterfaceMock* net = getNet();
+ getNet()->enterNetwork();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
+ const RemoteCommandRequest& request = noi->getRequest();
+ repl::ReplSetHeartbeatArgs hbArgs;
+ ASSERT_OK(hbArgs.initialize(request.cmdObj));
+ repl::ReplSetHeartbeatResponse hbResp;
+ hbResp.setSetName("mySet");
+ hbResp.setState(MemberState::RS_SECONDARY);
+ hbResp.setConfigVersion(2);
+ BSONObjBuilder respObj;
+ respObj << "ok" << 1;
+ hbResp.addToBSON(&respObj, false);
+ net->scheduleResponse(noi, net->now(), makeResponseStatus(respObj.obj()));
+ net->runReadyNetworkOperations();
+ getNet()->exitNetwork();
+ reconfigThread.join();
+ ASSERT_OK(status);
+
+ // satisfy write concern
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(3, 0, time));
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(3, 1, time));
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(3, 2, time));
+ ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
+ ASSERT_OK(statusAndDur.status);
+ awaiter.reset();
+}
+
+void doReplSetReconfigToFewer(ReplicationCoordinatorImpl* replCoord, Status* status) {
+ OperationContextNoop txn;
+ BSONObjBuilder garbage;
+ ReplSetReconfigArgs args;
+ args.force = false;
+ args.newConfigObj = BSON("_id"
+ << "mySet"
+ << "version" << 3 << "members"
+ << BSON_ARRAY(BSON("_id" << 0 << "host"
+ << "node1:12345")
+ << BSON("_id" << 2 << "host"
+ << "node3:12345")));
+ *status = replCoord->processReplSetReconfig(&txn, args, &garbage);
+}
+
+TEST_F(ReplCoordTest, AwaitReplicationReconfigNodeCountExceedsNumberOfNodes) {
+ OperationContextNoop txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0)
+ << BSON("host"
+ << "node2:12345"
+ << "_id" << 1) << BSON("host"
+ << "node3:12345"
+ << "_id" << 2))),
+ HostAndPort("node1", 12345));
+ ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 2));
+ simulateSuccessfulElection();
+
+ OpTimeWithTermZero time(100, 2);
+
+ // 3 nodes waiting for time
+ WriteConcernOptions writeConcern;
+ writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
+ writeConcern.wNumNodes = 3;
+
+ ReplicationAwaiter awaiter(getReplCoord(), &txn);
+ awaiter.setOpTime(time);
+ awaiter.setWriteConcern(writeConcern);
+ awaiter.start(&txn);
+
+ // reconfig to fewer nodes
+ Status status(ErrorCodes::InternalError, "Not Set");
+ stdx::thread reconfigThread(stdx::bind(doReplSetReconfigToFewer, getReplCoord(), &status));
+
+ NetworkInterfaceMock* net = getNet();
+ getNet()->enterNetwork();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
+ const RemoteCommandRequest& request = noi->getRequest();
+ repl::ReplSetHeartbeatArgs hbArgs;
+ ASSERT_OK(hbArgs.initialize(request.cmdObj));
+ repl::ReplSetHeartbeatResponse hbResp;
+ hbResp.setSetName("mySet");
+ hbResp.setState(MemberState::RS_SECONDARY);
+ hbResp.setConfigVersion(2);
+ BSONObjBuilder respObj;
+ respObj << "ok" << 1;
+ hbResp.addToBSON(&respObj, false);
+ net->scheduleResponse(noi, net->now(), makeResponseStatus(respObj.obj()));
+ net->runReadyNetworkOperations();
+ getNet()->exitNetwork();
+ reconfigThread.join();
+ ASSERT_OK(status);
+ std::cout << "asdf" << std::endl;
+
+ // writeconcern feasability should be reevaluated and an error should be returned
+ ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
+ ASSERT_EQUALS(ErrorCodes::CannotSatisfyWriteConcern, statusAndDur.status);
+ awaiter.reset();
+}
+
+TEST_F(ReplCoordTest, AwaitReplicationReconfigToSmallerMajority) {
+ OperationContextNoop txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0)
+ << BSON("host"
+ << "node2:12345"
+ << "_id" << 1) << BSON("host"
+ << "node3:12345"
+ << "_id" << 2)
+ << BSON("host"
+ << "node4:12345"
+ << "_id" << 3) << BSON("host"
+ << "node5:12345"
+ << "_id" << 4))),
+ HostAndPort("node1", 12345));
+ ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 1));
+ simulateSuccessfulElection();
+
+ OpTimeWithTermZero time(100, 2);
+
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time));
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, time));
+
+
+ // majority nodes waiting for time
+ WriteConcernOptions writeConcern;
+ writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
+ writeConcern.wMode = WriteConcernOptions::kMajority;
+
+ ReplicationAwaiter awaiter(getReplCoord(), &txn);
+ awaiter.setOpTime(time);
+ awaiter.setWriteConcern(writeConcern);
+ awaiter.start(&txn);
+
+ // demonstrate that majority cannot currently be satisfied
+ WriteConcernOptions writeConcern2;
+ writeConcern2.wTimeout = WriteConcernOptions::kNoWaiting;
+ writeConcern2.wMode = WriteConcernOptions::kMajority;
+ ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
+ getReplCoord()->awaitReplication(&txn, time, writeConcern2).status);
+
+ // reconfig to three nodes
+ Status status(ErrorCodes::InternalError, "Not Set");
+ stdx::thread reconfigThread(stdx::bind(doReplSetReconfig, getReplCoord(), &status));
+
+ NetworkInterfaceMock* net = getNet();
+ getNet()->enterNetwork();
+ const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest();
+ const RemoteCommandRequest& request = noi->getRequest();
+ repl::ReplSetHeartbeatArgs hbArgs;
+ ASSERT_OK(hbArgs.initialize(request.cmdObj));
+ repl::ReplSetHeartbeatResponse hbResp;
+ hbResp.setSetName("mySet");
+ hbResp.setState(MemberState::RS_SECONDARY);
+ hbResp.setConfigVersion(2);
+ BSONObjBuilder respObj;
+ respObj << "ok" << 1;
+ hbResp.addToBSON(&respObj, false);
+ net->scheduleResponse(noi, net->now(), makeResponseStatus(respObj.obj()));
+ net->runReadyNetworkOperations();
+ getNet()->exitNetwork();
+ reconfigThread.join();
+ ASSERT_OK(status);
+
+ // writeconcern feasability should be reevaluated and be satisfied
+ ReplicationCoordinator::StatusAndDuration statusAndDur = awaiter.getResult();
+ ASSERT_OK(statusAndDur.status);
+ awaiter.reset();
+}
+
+TEST_F(ReplCoordTest, AwaitReplicationMajority) {
+ // Test that we can satisfy majority write concern can only be
+ // statisfied by voting data-bearing members.
+ OperationContextNoop txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0)
+ << BSON("host"
+ << "node2:12345"
+ << "_id" << 1) << BSON("host"
+ << "node3:12345"
+ << "_id" << 2)
+ << BSON("host"
+ << "node4:12345"
+ << "_id" << 3 << "votes" << 0)
+ << BSON("host"
+ << "node5:12345"
+ << "_id" << 4 << "arbiterOnly" << true))),
+ HostAndPort("node1", 12345));
+ ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ OpTimeWithTermZero time(100, 0);
+ getReplCoord()->setMyLastOptime(time);
+ simulateSuccessfulElection();
+
+ WriteConcernOptions majorityWriteConcern;
+ majorityWriteConcern.wTimeout = WriteConcernOptions::kNoWaiting;
+ majorityWriteConcern.wMode = WriteConcernOptions::kMajority;
+
+ ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
+ getReplCoord()->awaitReplication(&txn, time, majorityWriteConcern).status);
+
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time));
+ ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
+ getReplCoord()->awaitReplication(&txn, time, majorityWriteConcern).status);
+
+ // this member does not vote and as a result should not count towards write concern
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 3, time));
+ ASSERT_EQUALS(ErrorCodes::WriteConcernFailed,
+ getReplCoord()->awaitReplication(&txn, time, majorityWriteConcern).status);
+
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, time));
+ ASSERT_OK(getReplCoord()->awaitReplication(&txn, time, majorityWriteConcern).status);
+}
+
+TEST_F(ReplCoordTest, LastCommittedOpTime) {
+ // Test that the commit level advances properly.
+ OperationContextNoop txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0)
+ << BSON("host"
+ << "node2:12345"
+ << "_id" << 1) << BSON("host"
+ << "node3:12345"
+ << "_id" << 2)
+ << BSON("host"
+ << "node4:12345"
+ << "_id" << 3 << "votes" << 0)
+ << BSON("host"
+ << "node5:12345"
+ << "_id" << 4 << "arbiterOnly" << true))),
+ HostAndPort("node1", 12345));
+ ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
+ OpTimeWithTermZero zero(0, 0);
+ OpTimeWithTermZero time(100, 0);
+ getReplCoord()->setMyLastOptime(time);
+ simulateSuccessfulElection();
+
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, time));
+ ASSERT_EQUALS((OpTime)zero, getReplCoord()->getLastCommittedOpTime());
+
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 3, time));
+ ASSERT_EQUALS((OpTime)zero, getReplCoord()->getLastCommittedOpTime());
+
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, time));
+ ASSERT_EQUALS((OpTime)time, getReplCoord()->getLastCommittedOpTime());
+
+
+ // Set a new, later OpTime.
+ OpTimeWithTermZero newTime = OpTimeWithTermZero(100, 1);
+ getReplCoord()->setMyLastOptime(newTime);
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 3, newTime));
+ ASSERT_EQUALS((OpTime)time, getReplCoord()->getLastCommittedOpTime());
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 2, newTime));
+ // Reached majority of voting nodes with newTime.
+ ASSERT_EQUALS((OpTime)newTime, getReplCoord()->getLastCommittedOpTime());
+ ASSERT_OK(getReplCoord()->setLastOptime_forTest(2, 1, newTime));
+ ASSERT_EQUALS((OpTime)newTime, getReplCoord()->getLastCommittedOpTime());
+}
+
+TEST_F(ReplCoordTest, CantUseReadAfterIfNotReplSet) {
+ init(ReplSettings());
+ OperationContextNoop txn;
+ auto result =
+ getReplCoord()->waitUntilOpTime(&txn, ReadAfterOpTimeArgs(OpTimeWithTermZero(50, 0)));
+
+ ASSERT_FALSE(result.didWait());
+ ASSERT_EQUALS(ErrorCodes::NotAReplicaSet, result.getStatus());
+}
+
+TEST_F(ReplCoordTest, ReadAfterWhileShutdown) {
+ OperationContextNoop txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members" << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+ getReplCoord()->setMyLastOptime(OpTimeWithTermZero(10, 0));
+
+ shutdown();
+
+ auto result =
+ getReplCoord()->waitUntilOpTime(&txn, ReadAfterOpTimeArgs(OpTimeWithTermZero(50, 0)));
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result.getStatus());
+}
+
+TEST_F(ReplCoordTest, ReadAfterInterrupted) {
+ OperationContextReplMock txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members" << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+ getReplCoord()->setMyLastOptime(OpTimeWithTermZero(10, 0));
+
+ txn.setCheckForInterruptStatus(Status(ErrorCodes::Interrupted, "test"));
+
+ auto result =
+ getReplCoord()->waitUntilOpTime(&txn, ReadAfterOpTimeArgs(OpTimeWithTermZero(50, 0)));
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_EQUALS(ErrorCodes::Interrupted, result.getStatus());
+}
+
+TEST_F(ReplCoordTest, ReadAfterNoOpTime) {
+ OperationContextNoop txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members" << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+ auto result = getReplCoord()->waitUntilOpTime(&txn, ReadAfterOpTimeArgs());
+
+ ASSERT_FALSE(result.didWait());
+ ASSERT_OK(result.getStatus());
+}
+
+TEST_F(ReplCoordTest, ReadAfterGreaterOpTime) {
+ OperationContextNoop txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members" << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+ getReplCoord()->setMyLastOptime(OpTimeWithTermZero(100, 0));
+ auto result =
+ getReplCoord()->waitUntilOpTime(&txn, ReadAfterOpTimeArgs(OpTimeWithTermZero(50, 0)));
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_OK(result.getStatus());
+}
+
+TEST_F(ReplCoordTest, ReadAfterEqualOpTime) {
+ OperationContextNoop txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members" << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+
+ OpTimeWithTermZero time(100, 0);
+ getReplCoord()->setMyLastOptime(time);
+ auto result = getReplCoord()->waitUntilOpTime(&txn, ReadAfterOpTimeArgs(time));
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_OK(result.getStatus());
+}
+
+TEST_F(ReplCoordTest, ReadAfterDeferredGreaterOpTime) {
+ OperationContextNoop txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members" << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+ getReplCoord()->setMyLastOptime(OpTimeWithTermZero(0, 0));
+
+ auto pseudoLogOp = std::async(std::launch::async,
+ [this]() {
+ // Not guaranteed to be scheduled after waitUnitl blocks...
+ getReplCoord()->setMyLastOptime(OpTimeWithTermZero(200, 0));
+ });
+
+ auto result =
+ getReplCoord()->waitUntilOpTime(&txn, ReadAfterOpTimeArgs(OpTimeWithTermZero(100, 0)));
+ pseudoLogOp.get();
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_OK(result.getStatus());
+}
+
+TEST_F(ReplCoordTest, ReadAfterDeferredEqualOpTime) {
+ OperationContextNoop txn;
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members" << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0))),
+ HostAndPort("node1", 12345));
+
+ getReplCoord()->setMyLastOptime(OpTimeWithTermZero(0, 0));
+
+ OpTimeWithTermZero opTimeToWait(100, 0);
+
+ auto pseudoLogOp = std::async(std::launch::async,
+ [this, &opTimeToWait]() {
+ // Not guaranteed to be scheduled after waitUnitl blocks...
+ getReplCoord()->setMyLastOptime(opTimeToWait);
+ });
+
+ auto result = getReplCoord()->waitUntilOpTime(&txn, ReadAfterOpTimeArgs(opTimeToWait));
+ pseudoLogOp.get();
+
+ ASSERT_TRUE(result.didWait());
+ ASSERT_OK(result.getStatus());
+}
+
+// TODO(schwerin): Unit test election id updating
} // namespace
} // namespace repl