diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/bson/oid_test.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_fetcher_test.cpp | 49 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_set_config_test.cpp | 25 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_consistency_markers_impl_test.cpp | 49 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp | 651 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp | 275 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_test.cpp | 91 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_test_fixture.cpp | 69 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_test_fixture.h | 2 | ||||
-rw-r--r-- | src/mongo/shell/utils.js | 48 |
11 files changed, 1176 insertions, 105 deletions
diff --git a/src/mongo/bson/oid_test.cpp b/src/mongo/bson/oid_test.cpp index 8be156ae41d..7a0e6d6773d 100644 --- a/src/mongo/bson/oid_test.cpp +++ b/src/mongo/bson/oid_test.cpp @@ -149,16 +149,4 @@ TEST(Basic, FromStringToString) { std::string fromStr("541b1a00e8a23afa832b218e"); ASSERT_EQUALS(OID(fromStr).toString(), fromStr); } - -TEST(Basic, FromTerm) { - auto term = 7; - auto oid = OID::fromTerm(term); - - auto oidStr = oid.toString(); - auto oidHead = oidStr.substr(0, 8); - auto oidTail = oidStr.substr(oidStr.length() - 1); - - ASSERT_EQUALS("7fffffff", oidHead); - ASSERT_EQUALS(term, std::stoi(oidTail)); -} } diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 34e2b6c7c9a..ab9c0dc2227 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -917,6 +917,12 @@ env.CppUnitTest( ], ) +env.CppUnitTest('replication_coordinator_impl_elect_test', + 'replication_coordinator_impl_elect_test.cpp', + LIBDEPS=[ + 'repl_coordinator_test_fixture', + ]) + env.CppUnitTest('replication_coordinator_impl_elect_v1_test', 'replication_coordinator_impl_elect_v1_test.cpp', LIBDEPS=['repl_coordinator_test_fixture']) @@ -925,6 +931,10 @@ env.CppUnitTest('replication_coordinator_impl_heartbeat_v1_test', 'replication_coordinator_impl_heartbeat_v1_test.cpp', LIBDEPS=['repl_coordinator_test_fixture']) +env.CppUnitTest('replication_coordinator_impl_heartbeat_test', + 'replication_coordinator_impl_heartbeat_test.cpp', + LIBDEPS=['repl_coordinator_test_fixture']) + env.CppUnitTest('replication_coordinator_impl_reconfig_test', 'replication_coordinator_impl_reconfig_test.cpp', LIBDEPS=['repl_coordinator_test_fixture']) diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index 354377f2912..352298b195a 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -88,7 +88,7 @@ protected: * Tests handling of two batches of operations returned from query. * Returns getMore request. */ - RemoteCommandRequest testTwoBatchHandling(); + RemoteCommandRequest testTwoBatchHandling(bool isV1ElectionProtocol); OpTime remoteNewerOpTime; OpTime staleOpTime; @@ -137,11 +137,13 @@ BSONObj OplogFetcherTest::makeOplogQueryMetadataObject(OpTime lastAppliedOpTime, HostAndPort source("localhost:12345"); NamespaceString nss("local.oplog.rs"); -ReplSetConfig _createConfig() { +ReplSetConfig _createConfig(bool isV1ElectionProtocol) { BSONObjBuilder bob; bob.append("_id", "myset"); bob.append("version", 1); - bob.append("protocolVersion", 1); + if (isV1ElectionProtocol) { + bob.append("protocolVersion", 1); + } { BSONArrayBuilder membersBob(bob.subarrayStart("members")); BSONObjBuilder(membersBob.subobjStart()) @@ -166,7 +168,7 @@ std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(RemoteComman lastFetched, source, nss, - _createConfig(), + _createConfig(true), 0, rbid, requireFresherSyncSource, @@ -222,7 +224,7 @@ std::unique_ptr<OplogFetcher> OplogFetcherTest::makeOplogFetcher(ReplSetConfig c TEST_F( OplogFetcherTest, FindQueryContainsTermAndStartTimestampIfGetCurrentTermAndLastCommittedOpTimeReturnsValidTerm) { - auto cmdObj = makeOplogFetcher(_createConfig())->getFindQuery_forTest(); + auto cmdObj = makeOplogFetcher(_createConfig(true))->getFindQuery_forTest(); ASSERT_EQUALS(mongo::BSONType::Object, cmdObj["filter"].type()); ASSERT_BSONOBJ_EQ(BSON("ts" << BSON("$gte" << lastFetched.opTime.getTimestamp())), cmdObj["filter"].Obj()); @@ -233,7 +235,7 @@ TEST_F( TEST_F(OplogFetcherTest, FindQueryDoesNotContainTermIfGetCurrentTermAndLastCommittedOpTimeReturnsUninitializedTerm) { dataReplicatorExternalState->currentTerm = OpTime::kUninitializedTerm; - auto cmdObj = makeOplogFetcher(_createConfig())->getFindQuery_forTest(); + auto cmdObj = makeOplogFetcher(_createConfig(true))->getFindQuery_forTest(); ASSERT_EQUALS(mongo::BSONType::Object, cmdObj["filter"].type()); ASSERT_BSONOBJ_EQ(BSON("ts" << BSON("$gte" << lastFetched.opTime.getTimestamp())), cmdObj["filter"].Obj()); @@ -242,29 +244,39 @@ TEST_F(OplogFetcherTest, } TEST_F(OplogFetcherTest, MetadataObjectContainsMetadataFieldsUnderProtocolVersion1) { - auto metadataObj = makeOplogFetcher(_createConfig())->getMetadataObject_forTest(); + auto metadataObj = makeOplogFetcher(_createConfig(true))->getMetadataObject_forTest(); ASSERT_EQUALS(3, metadataObj.nFields()); ASSERT_EQUALS(1, metadataObj[rpc::kReplSetMetadataFieldName].numberInt()); ASSERT_EQUALS(1, metadataObj[rpc::kOplogQueryMetadataFieldName].numberInt()); } +TEST_F(OplogFetcherTest, MetadataObjectIsEmptyUnderProtocolVersion0) { + auto metadataObj = makeOplogFetcher(_createConfig(false))->getMetadataObject_forTest(); + ASSERT_BSONOBJ_EQ(ReadPreferenceSetting::secondaryPreferredMetadata(), metadataObj); +} + TEST_F(OplogFetcherTest, AwaitDataTimeoutShouldEqualHalfElectionTimeoutUnderProtocolVersion1) { - auto config = _createConfig(); + auto config = _createConfig(true); auto timeout = makeOplogFetcher(config)->getAwaitDataTimeout_forTest(); ASSERT_EQUALS(config.getElectionTimeoutPeriod() / 2, timeout); } +TEST_F(OplogFetcherTest, AwaitDataTimeoutShouldBeAConstantUnderProtocolVersion0) { + auto timeout = makeOplogFetcher(_createConfig(false))->getAwaitDataTimeout_forTest(); + ASSERT_EQUALS(OplogFetcher::kDefaultProtocolZeroAwaitDataTimeout, timeout); +} + TEST_F(OplogFetcherTest, FindQueryHasNoReadconcernIfTermNotLastFetched) { auto uninitializedTerm = OpTime::kUninitializedTerm; ASSERT_NOT_EQUALS(dataReplicatorExternalState->currentTerm, uninitializedTerm); dataReplicatorExternalState->currentTerm++; - auto cmdObj = makeOplogFetcher(_createConfig())->getFindQuery_forTest(); + auto cmdObj = makeOplogFetcher(_createConfig(true))->getFindQuery_forTest(); ASSERT_FALSE(cmdObj.hasField("readConcern")); } TEST_F(OplogFetcherTest, FindQueryHasNoReadconcernIfTermUninitialized) { dataReplicatorExternalState->currentTerm = OpTime::kUninitializedTerm; - auto cmdObj = makeOplogFetcher(_createConfig())->getFindQuery_forTest(); + auto cmdObj = makeOplogFetcher(_createConfig(true))->getFindQuery_forTest(); ASSERT_FALSE(cmdObj.hasField("readConcern")); } @@ -684,14 +696,18 @@ TEST_F(OplogFetcherTest, ASSERT_FALSE(dataReplicatorExternalState->syncSourceHasSyncSource); } -RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling() { +RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling(bool isV1ElectionProtocol) { ShutdownState shutdownState; + if (!isV1ElectionProtocol) { + dataReplicatorExternalState->currentTerm = OpTime::kUninitializedTerm; + } + OplogFetcher oplogFetcher(&getExecutor(), lastFetched, source, nss, - _createConfig(), + _createConfig(isV1ElectionProtocol), 0, rbid, true, @@ -741,13 +757,20 @@ RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling() { TEST_F( OplogFetcherTest, NoDataAvailableAfterFirstTwoBatchesShouldCauseTheOplogFetcherToShutDownWithSuccessfulStatus) { - auto request = testTwoBatchHandling(); + auto request = testTwoBatchHandling(true); ASSERT_EQUALS(dataReplicatorExternalState->currentTerm, request.cmdObj["term"].numberLong()); ASSERT_EQUALS(dataReplicatorExternalState->lastCommittedOpTime, unittest::assertGet(OpTime::parseFromOplogEntry( request.cmdObj["lastKnownCommittedOpTime"].Obj()))); } +TEST_F(OplogFetcherTest, + GetMoreRequestUnderProtocolVersionZeroDoesNotIncludeTermOrLastKnownCommittedOpTime) { + auto request = testTwoBatchHandling(false); + ASSERT_FALSE(request.cmdObj.hasField("term")); + ASSERT_FALSE(request.cmdObj.hasField("lastKnownCommittedOpTime")); +} + TEST_F(OplogFetcherTest, ValidateDocumentsReturnsNoSuchKeyIfTimestampIsNotFoundInAnyDocument) { auto firstEntry = makeNoopOplogEntry(Seconds(123), 100); auto secondEntry = BSON("o" << BSON("msg" diff --git a/src/mongo/db/repl/repl_set_config_test.cpp b/src/mongo/db/repl/repl_set_config_test.cpp index 07f303c7016..9a9731d89f8 100644 --- a/src/mongo/db/repl/repl_set_config_test.cpp +++ b/src/mongo/db/repl/repl_set_config_test.cpp @@ -1685,7 +1685,32 @@ TEST(ReplSetConfig, GetCatchUpTakeoverDelayDefault) { } TEST(ReplSetConfig, ConfirmDefaultValuesOfAndAbilityToSetWriteConcernMajorityJournalDefault) { + // PV0, should default to false. ReplSetConfig config; + ASSERT_OK(config.initialize(BSON("_id" + << "rs0" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 0 << "host" + << "localhost:12345"))))); + ASSERT_OK(config.validate()); + ASSERT_FALSE(config.getWriteConcernMajorityShouldJournal()); + ASSERT_FALSE(config.toBSON().hasField("writeConcernMajorityJournalDefault")); + + // Should be able to set it true in PV0. + ASSERT_OK(config.initialize(BSON("_id" + << "rs0" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 0 << "host" + << "localhost:12345")) + << "writeConcernMajorityJournalDefault" + << true))); + ASSERT_OK(config.validate()); + ASSERT_TRUE(config.getWriteConcernMajorityShouldJournal()); + ASSERT_TRUE(config.toBSON().hasField("writeConcernMajorityJournalDefault")); // PV1, should default to true. ASSERT_OK(config.initialize(BSON("_id" diff --git a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp index 1e586e28f65..c45a4114ce7 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_impl_test.cpp @@ -291,4 +291,53 @@ TEST_F(ReplicationConsistencyMarkersTest, ReplicationConsistencyMarkers) { ASSERT_FALSE(recoveryUnit->waitUntilDurableCalled); } +TEST_F(ReplicationConsistencyMarkersTest, SetMinValidOnPVChange) { + auto minValidNss = makeNamespace(_agent, "minValid"); + auto oplogTruncateAfterPointNss = makeNamespace(_agent, "oplogTruncateAfterPoint"); + auto checkpointTimestampNss = makeNamespace(_agent, "checkpointTimestamp"); + + ReplicationConsistencyMarkersImpl consistencyMarkers( + getStorageInterface(), minValidNss, oplogTruncateAfterPointNss, checkpointTimestampNss); + auto opCtx = getOperationContext(); + ASSERT(consistencyMarkers.createInternalCollections(opCtx).isOK()); + consistencyMarkers.initializeMinValidDocument(opCtx); + + auto advanceAndCheckMinValidOpTime = [&](OpTime advanceTo, OpTime expected) { + consistencyMarkers.setMinValidToAtLeast(opCtx, advanceTo); + ASSERT_EQUALS(expected, consistencyMarkers.getMinValid(opCtx)); + }; + + // Set minValid in PV 1. + OpTime startOpTime({Seconds(20), 0}, 1LL); + advanceAndCheckMinValidOpTime(startOpTime, startOpTime); + + // In rollback, minValid is when the date becomes consistent and never goes back. + OpTime rollbackOpTime({Seconds(10), 0}, 1LL); + advanceAndCheckMinValidOpTime(rollbackOpTime, startOpTime); + + // Writes arrive, so minValid advances. + OpTime opTime1({Seconds(30), 0}, 1LL); + advanceAndCheckMinValidOpTime(opTime1, opTime1); + + // A new term starts and oplog diverges, so the timestamp is lower. + OpTime newTermOpTime({Seconds(20), 0}, 2LL); + advanceAndCheckMinValidOpTime(newTermOpTime, newTermOpTime); + + // We should never advance minValid to a lower term, but verify it never goes back even if the + // timestamp is higher. + OpTime invalidOpTime({Seconds(80), 0}, 1LL); + advanceAndCheckMinValidOpTime(invalidOpTime, newTermOpTime); + + // PV downgrade to PV0 + OpTime downgradeOpTime({Seconds(50), 0}, -1LL); + advanceAndCheckMinValidOpTime(downgradeOpTime, downgradeOpTime); + + // Writes arrive in PV0. + OpTime opTime2({Seconds(60), 0}, -1LL); + advanceAndCheckMinValidOpTime(opTime2, opTime2); + + // PV upgrade again. + OpTime upgradeOpTime({Seconds(70), 0}, 0LL); + advanceAndCheckMinValidOpTime(upgradeOpTime, upgradeOpTime); +} } // namespace diff --git a/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp new file mode 100644 index 00000000000..b7b871cd89f --- /dev/null +++ b/src/mongo/db/repl/replication_coordinator_impl_elect_test.cpp @@ -0,0 +1,651 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/jsobj.h" +#include "mongo/db/operation_context_noop.h" +#include "mongo/db/repl/is_master_response.h" +#include "mongo/db/repl/repl_set_config.h" +#include "mongo/db/repl/repl_set_heartbeat_args.h" +#include "mongo/db/repl/repl_set_heartbeat_response.h" +#include "mongo/db/repl/replication_coordinator_external_state_mock.h" +#include "mongo/db/repl/replication_coordinator_impl.h" +#include "mongo/db/repl/replication_coordinator_test_fixture.h" +#include "mongo/db/repl/topology_coordinator.h" +#include "mongo/executor/network_interface_mock.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/fail_point_service.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace repl { +namespace { + +using executor::NetworkInterfaceMock; +using executor::RemoteCommandRequest; +using executor::RemoteCommandResponse; + +class ReplCoordElectTest : public ReplCoordTest { +protected: + void assertStartSuccess(const BSONObj& configDoc, const HostAndPort& selfHost); + void simulateFreshEnoughForElectability(); +}; + +void ReplCoordElectTest::assertStartSuccess(const BSONObj& configDoc, const HostAndPort& selfHost) { + ReplCoordTest::assertStartSuccess(addProtocolVersion(configDoc, 0), selfHost); +} + +void ReplCoordElectTest::simulateFreshEnoughForElectability() { + ReplicationCoordinatorImpl* replCoord = getReplCoord(); + ReplSetConfig rsConfig = replCoord->getReplicaSetConfig_forTest(); + NetworkInterfaceMock* net = getNet(); + net->enterNetwork(); + for (int i = 0; i < rsConfig.getNumMembers() - 1; ++i) { + const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); + const RemoteCommandRequest& request = noi->getRequest(); + log() << request.target.toString() << " processing " << request.cmdObj; + if (request.cmdObj.firstElement().fieldNameStringData() == "replSetFresh") { + net->scheduleResponse( + noi, + net->now(), + makeResponseStatus(BSON("ok" << 1 << "fresher" << false << "opTime" + << Date_t::fromMillisSinceEpoch(Timestamp(0, 0).asLL()) + << "veto" + << false))); + } else { + error() << "Black holing unexpected request to " << request.target << ": " + << request.cmdObj; + net->blackHole(noi); + } + net->runReadyNetworkOperations(); + } + net->exitNetwork(); +} + +TEST_F(ReplCoordElectTest, StartElectionDoesNotStartAnElectionWhenNodeHasNoOplogEntries) { + logger::globalLogDomain()->setMinimumLoggedSeverity(logger::LogSeverity::Debug(3)); + // Election never starts because we haven't set a lastOpTimeApplied value yet, via a + // heartbeat. + startCapturingLogMessages(); + assertStartSuccess(BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345"))), + HostAndPort("node1", 12345)); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + simulateEnoughHeartbeatsForAllNodesUp(); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("node has no applied oplog entries")); +} + +/** + * This test checks that an election can happen when only one node is up, and it has the + * vote(s) to win. + */ +TEST_F(ReplCoordElectTest, ElectionSucceedsWhenNodeIsTheOnlyElectableNode) { + assertStartSuccess(BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345" + << "votes" + << 0 + << "hidden" + << true + << "priority" + << 0))), + HostAndPort("node1", 12345)); + + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + // Fake OpTime from initiate, or a write op. + getExternalState()->setLastOpTime(OpTime{{0, 0}, -1}); + + ASSERT(getReplCoord()->getMemberState().secondary()) + << getReplCoord()->getMemberState().toString(); + + getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(10, 0), -1)); + + NetworkInterfaceMock* net = getNet(); + net->enterNetwork(); + const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); + // blackhole heartbeat + net->scheduleResponse( + noi, net->now(), executor::RemoteCommandResponse(ErrorCodes::OperationFailed, "timeout")); + net->runReadyNetworkOperations(); + // blackhole freshness + const NetworkInterfaceMock::NetworkOperationIterator noi2 = net->getNextReadyRequest(); + net->scheduleResponse( + noi2, net->now(), executor::RemoteCommandResponse(ErrorCodes::OperationFailed, "timeout")); + net->runReadyNetworkOperations(); + net->exitNetwork(); + + ASSERT(getReplCoord()->getMemberState().primary()) + << getReplCoord()->getMemberState().toString(); + ASSERT(getReplCoord()->getApplierState() == ReplicationCoordinator::ApplierState::Draining); + + const auto opCtxPtr = makeOperationContext(); + auto& opCtx = *opCtxPtr; + + // Since we're still in drain mode, expect that we report ismaster: false, issecondary:true. + IsMasterResponse imResponse; + getReplCoord()->fillIsMasterForReplSet(&imResponse); + ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString(); + ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString(); + getReplCoord()->signalDrainComplete(&opCtx, getReplCoord()->getTerm()); + getReplCoord()->fillIsMasterForReplSet(&imResponse); + ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString(); + ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString(); +} + +TEST_F(ReplCoordElectTest, ElectionSucceedsWhenNodeIsTheOnlyNode) { + startCapturingLogMessages(); + assertStartSuccess(BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345"))), + HostAndPort("node1", 12345)); + + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + + // Fake OpTime from initiate, or a write op. + getExternalState()->setLastOpTime(OpTime{{0, 0}, -1}); + + ASSERT(getReplCoord()->getMemberState().primary()) + << getReplCoord()->getMemberState().toString(); + ASSERT(getReplCoord()->getApplierState() == ReplicationCoordinator::ApplierState::Draining); + + const auto opCtxPtr = makeOperationContext(); + auto& opCtx = *opCtxPtr; + + // Since we're still in drain mode, expect that we report ismaster: false, issecondary:true. + IsMasterResponse imResponse; + getReplCoord()->fillIsMasterForReplSet(&imResponse); + ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString(); + ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString(); + getReplCoord()->signalDrainComplete(&opCtx, getReplCoord()->getTerm()); + getReplCoord()->fillIsMasterForReplSet(&imResponse); + ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString(); + ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString(); +} + +TEST_F(ReplCoordElectTest, ElectionSucceedsWhenAllNodesVoteYea) { + BSONObj configObj = BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345"))); + assertStartSuccess(configObj, HostAndPort("node1", 12345)); + OperationContextNoop opCtx; + getReplCoord()->setMyLastAppliedOpTime(OpTime{{100, 1}, -1}); + getExternalState()->setLastOpTime(OpTime{{100, 1}, -1}); + + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + startCapturingLogMessages(); + simulateSuccessfulElection(); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("election succeeded")); +} + +TEST_F(ReplCoordElectTest, ElectionFailsWhenOneNodeVotesNay) { + // one responds with -10000 votes, and one doesn't respond, and we are not elected + startCapturingLogMessages(); + BSONObj configObj = BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345"))); + assertStartSuccess(configObj, HostAndPort("node1", 12345)); + ReplSetConfig config = assertMakeRSConfig(configObj); + + OperationContextNoop opCtx; + OpTime time1(Timestamp(100, 1), 0); + getReplCoord()->setMyLastAppliedOpTime(time1); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + + simulateEnoughHeartbeatsForAllNodesUp(); + simulateFreshEnoughForElectability(); + NetworkInterfaceMock* net = getNet(); + net->enterNetwork(); + while (net->hasReadyRequests()) { + const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); + const RemoteCommandRequest& request = noi->getRequest(); + log() << request.target.toString() << " processing " << request.cmdObj; + if (request.target != HostAndPort("node2", 12345)) { + net->blackHole(noi); + } else if (request.cmdObj.firstElement().fieldNameStringData() != "replSetElect") { + net->blackHole(noi); + } else { + net->scheduleResponse( + noi, + net->now(), + makeResponseStatus(BSON("ok" << 1 << "vote" << -10000 << "round" << OID()))); + } + net->runReadyNetworkOperations(); + } + net->exitNetwork(); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("couldn't elect self, only received -9999 votes")); +} + +TEST_F(ReplCoordElectTest, VotesWithStringValuesAreNotCountedAsYeas) { + // one responds with a bad 'vote' field, and one doesn't respond, and we are not elected + startCapturingLogMessages(); + BSONObj configObj = BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345"))); + assertStartSuccess(configObj, HostAndPort("node1", 12345)); + ReplSetConfig config = assertMakeRSConfig(configObj); + + OperationContextNoop opCtx; + OpTime time1(Timestamp(100, 1), 0); + getReplCoord()->setMyLastAppliedOpTime(time1); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + + simulateEnoughHeartbeatsForAllNodesUp(); + simulateFreshEnoughForElectability(); + NetworkInterfaceMock* net = getNet(); + net->enterNetwork(); + while (net->hasReadyRequests()) { + const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); + const RemoteCommandRequest& request = noi->getRequest(); + log() << request.target.toString() << " processing " << request.cmdObj; + if (request.target != HostAndPort("node2", 12345)) { + net->blackHole(noi); + } else if (request.cmdObj.firstElement().fieldNameStringData() != "replSetElect") { + net->blackHole(noi); + } else { + net->scheduleResponse(noi, + net->now(), + makeResponseStatus(BSON("ok" << 1 << "vote" + << "yea" + << "round" + << OID()))); + } + net->runReadyNetworkOperations(); + } + net->exitNetwork(); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, + countLogLinesContaining("wrong type for vote argument in replSetElect command")); +} + +TEST_F(ReplCoordElectTest, TransitionToRollbackFailsWhenElectionInProgress) { + BSONObj configObj = BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345"))); + assertStartSuccess(configObj, HostAndPort("node1", 12345)); + ReplSetConfig config = assertMakeRSConfig(configObj); + + OperationContextNoop opCtx; + OpTime time1(Timestamp(100, 1), 0); + getReplCoord()->setMyLastAppliedOpTime(time1); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + + simulateEnoughHeartbeatsForAllNodesUp(); + simulateFreshEnoughForElectability(); + + ASSERT_EQUALS(ErrorCodes::ElectionInProgress, + getReplCoord()->setFollowerMode(MemberState::RS_ROLLBACK)); + + ASSERT_FALSE(getReplCoord()->getMemberState().rollback()); + + // We do not need to respond to any pending network operations because setFollowerMode() will + // cancel the freshness checker and election command runner. +} + +TEST_F(ReplCoordElectTest, NodeWillNotStandForElectionDuringHeartbeatReconfig) { + // start up, receive reconfig via heartbeat while at the same time, become candidate. + // candidate state should be cleared. + OperationContextNoop opCtx; + assertStartSuccess(BSON("_id" + << "mySet" + << "version" + << 2 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345") + << BSON("_id" << 4 << "host" + << "node4:12345") + << BSON("_id" << 5 << "host" + << "node5:12345"))), + HostAndPort("node1", 12345)); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(100, 0), 0)); + + getGlobalFailPointRegistry() + ->getFailPoint("blockHeartbeatReconfigFinish") + ->setMode(FailPoint::alwaysOn); + + // hb reconfig + NetworkInterfaceMock* net = getNet(); + net->enterNetwork(); + ReplSetHeartbeatResponse hbResp2; + ReplSetConfig config; + config + .initialize(BSON("_id" + << "mySet" + << "version" + << 3 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345")))) + .transitional_ignore(); + hbResp2.setConfig(config); + hbResp2.setConfigVersion(3); + hbResp2.setSetName("mySet"); + hbResp2.setState(MemberState::RS_SECONDARY); + BSONObjBuilder respObj2; + respObj2 << "ok" << 1; + hbResp2.addToBSON(&respObj2, false); + net->runUntil(net->now() + Seconds(10)); // run until we've sent a heartbeat request + const NetworkInterfaceMock::NetworkOperationIterator noi2 = net->getNextReadyRequest(); + net->scheduleResponse(noi2, net->now(), makeResponseStatus(respObj2.obj())); + net->runReadyNetworkOperations(); + getNet()->exitNetwork(); + + // prepare candidacy + BSONObjBuilder result; + ReplicationCoordinator::ReplSetReconfigArgs args; + args.force = false; + args.newConfigObj = config.toBSON(); + ASSERT_EQUALS(ErrorCodes::ConfigurationInProgress, + getReplCoord()->processReplSetReconfig(&opCtx, args, &result)); + + logger::globalLogDomain()->setMinimumLoggedSeverity(logger::LogSeverity::Debug(2)); + startCapturingLogMessages(); + + // receive sufficient heartbeats to trigger an election + ReplicationCoordinatorImpl* replCoord = getReplCoord(); + ReplSetConfig rsConfig = replCoord->getReplicaSetConfig_forTest(); + net->enterNetwork(); + for (int i = 0; i < 2; ++i) { + const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); + const RemoteCommandRequest& request = noi->getRequest(); + log() << request.target.toString() << " processing " << request.cmdObj; + ReplSetHeartbeatArgs hbArgs; + if (hbArgs.initialize(request.cmdObj).isOK()) { + ReplSetHeartbeatResponse hbResp; + hbResp.setSetName(rsConfig.getReplSetName()); + hbResp.setState(MemberState::RS_SECONDARY); + hbResp.setConfigVersion(rsConfig.getConfigVersion()); + BSONObjBuilder respObj; + respObj << "ok" << 1; + hbResp.addToBSON(&respObj, false); + net->scheduleResponse(noi, net->now(), makeResponseStatus(respObj.obj())); + } else { + error() << "Black holing unexpected request to " << request.target << ": " + << request.cmdObj; + net->blackHole(noi); + } + net->runReadyNetworkOperations(); + } + + stopCapturingLogMessages(); + // ensure node does not stand for election + ASSERT_EQUALS(1, + countLogLinesContaining("Not standing for election; processing " + "a configuration change")); + getGlobalFailPointRegistry() + ->getFailPoint("blockHeartbeatReconfigFinish") + ->setMode(FailPoint::off); +} + +TEST_F(ReplCoordElectTest, StepsDownRemoteIfNodeHasHigherPriorityThanCurrentPrimary) { + BSONObj configObj = BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345" + << "priority" + << 2) + << BSON("_id" << 2 << "host" + << "node2:12345") + << BSON("_id" << 3 << "host" + << "node3:12345"))); + assertStartSuccess(configObj, HostAndPort("node1", 12345)); + ReplSetConfig config = assertMakeRSConfig(configObj); + + auto replCoord = getReplCoord(); + + OperationContextNoop opCtx; + OpTime time1(Timestamp(100, 1), 0); + getReplCoord()->setMyLastAppliedOpTime(time1); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + + auto net = getNet(); + net->enterNetwork(); + while (net->hasReadyRequests()) { + auto noi = net->getNextReadyRequest(); + auto&& request = noi->getRequest(); + log() << request.target << " processing " << request.cmdObj; + ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData()); + ReplSetHeartbeatArgs hbArgs; + if (hbArgs.initialize(request.cmdObj).isOK()) { + ReplSetHeartbeatResponse hbResp; + Date_t responseDate = net->now(); + hbResp.setSetName(config.getReplSetName()); + if (request.target == HostAndPort("node2", 12345)) { + hbResp.setState(MemberState::RS_PRIMARY); + } else { + hbResp.setState(MemberState::RS_SECONDARY); + responseDate += Milliseconds{1}; + } + hbResp.setConfigVersion(config.getConfigVersion()); + auto response = makeResponseStatus(hbResp.toBSON(replCoord->isV1ElectionProtocol())); + net->scheduleResponse(noi, responseDate, response); + } else { + error() << "Black holing unexpected request to " << request.target << ": " + << request.cmdObj; + net->blackHole(noi); + } + } + const auto afterHeartbeatsProcessed = net->now() + Milliseconds{1}; + net->runUntil(afterHeartbeatsProcessed); + ASSERT_EQ(afterHeartbeatsProcessed, net->now()); + + ASSERT_TRUE(net->hasReadyRequests()); + auto noi = net->getNextReadyRequest(); + auto&& request = noi->getRequest(); + log() << request.target << " processing " << request.cmdObj; + ASSERT_EQUALS("replSetStepDown", request.cmdObj.firstElement().fieldNameStringData()); + ASSERT_EQUALS(1LL, request.cmdObj["secondaryCatchUpPeriodSecs"].safeNumberLong()); + auto target = request.target; + ASSERT_EQUALS(HostAndPort("node2", 12345), target); + auto response = makeResponseStatus(BSON("ok" << 1)); + net->scheduleResponse(noi, net->now(), response); + logger::globalLogDomain()->setMinimumLoggedSeverity(logger::LogSeverity::Debug(1)); + startCapturingLogMessages(); + net->runReadyNetworkOperations(); + stopCapturingLogMessages(); + logger::globalLogDomain()->setMinimumLoggedSeverity(logger::LogSeverity::Log()); + net->exitNetwork(); + ASSERT_EQUALS(1, + countLogLinesContaining(str::stream() << "stepdown of primary(" + << target.toString() + << ") succeeded")); +} + +TEST_F(ReplCoordElectTest, NodeCancelsElectionUponReceivingANewConfigDuringFreshnessCheckingPhase) { + // Start up and become electable. + assertStartSuccess(BSON("_id" + << "mySet" + << "version" + << 2 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 3 << "host" + << "node3:12345") + << BSON("_id" << 2 << "host" + << "node2:12345")) + << "settings" + << BSON("heartbeatIntervalMillis" << 100)), + HostAndPort("node1", 12345)); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(100, 0), 0)); + getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(100, 0), 0)); + simulateEnoughHeartbeatsForAllNodesUp(); + simulateFreshEnoughForElectability(); + ASSERT(TopologyCoordinator::Role::kCandidate == getTopoCoord().getRole()); + + // Advance to freshness checker request phase. + NetworkInterfaceMock* net = getNet(); + net->enterNetwork(); + while (TopologyCoordinator::Role::kCandidate != getTopoCoord().getRole()) { + net->runUntil(net->now() + Seconds(1)); + if (!net->hasReadyRequests()) { + continue; + } + net->blackHole(net->getNextReadyRequest()); + } + net->exitNetwork(); + ASSERT(TopologyCoordinator::Role::kCandidate == getTopoCoord().getRole()); + + // Submit a reconfig and confirm it cancels the election. + ReplicationCoordinatorImpl::ReplSetReconfigArgs config = { + BSON("_id" + << "mySet" + << "version" + << 4 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345"))), + true}; + + BSONObjBuilder result; + const auto opCtx = makeOperationContext(); + ASSERT_OK(getReplCoord()->processReplSetReconfig(opCtx.get(), config, &result)); + // Wait until election cancels. + net->enterNetwork(); + net->runReadyNetworkOperations(); + net->exitNetwork(); + ASSERT(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole()); +} + +TEST_F(ReplCoordElectTest, NodeCancelsElectionUponReceivingANewConfigDuringElectionPhase) { + // Start up and become electable. + assertStartSuccess(BSON("_id" + << "mySet" + << "version" + << 2 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 3 << "host" + << "node3:12345") + << BSON("_id" << 2 << "host" + << "node2:12345")) + << "settings" + << BSON("heartbeatIntervalMillis" << 100)), + HostAndPort("node1", 12345)); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(100, 0), 0)); + getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(100, 0), 0)); + simulateEnoughHeartbeatsForAllNodesUp(); + simulateFreshEnoughForElectability(); + ASSERT(TopologyCoordinator::Role::kCandidate == getTopoCoord().getRole()); + + // Submit a reconfig and confirm it cancels the election. + ReplicationCoordinatorImpl::ReplSetReconfigArgs config = { + BSON("_id" + << "mySet" + << "version" + << 4 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345"))), + true}; + + BSONObjBuilder result; + const auto opCtx = makeOperationContext(); + ASSERT_OK(getReplCoord()->processReplSetReconfig(opCtx.get(), config, &result)); + // Wait until election cancels. + getNet()->enterNetwork(); + getNet()->runReadyNetworkOperations(); + getNet()->exitNetwork(); + ASSERT(TopologyCoordinator::Role::kFollower == getTopoCoord().getRole()); +} + +} // namespace +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp new file mode 100644 index 00000000000..55140bb9a23 --- /dev/null +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_test.cpp @@ -0,0 +1,275 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/jsobj.h" +#include "mongo/db/operation_context_noop.h" +#include "mongo/db/repl/repl_set_config.h" +#include "mongo/db/repl/repl_set_heartbeat_args.h" +#include "mongo/db/repl/repl_set_heartbeat_args_v1.h" +#include "mongo/db/repl/repl_set_heartbeat_response.h" +#include "mongo/db/repl/replication_coordinator_external_state_mock.h" +#include "mongo/db/repl/replication_coordinator_impl.h" +#include "mongo/db/repl/replication_coordinator_test_fixture.h" +#include "mongo/db/repl/topology_coordinator.h" +#include "mongo/executor/network_interface_mock.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace repl { +namespace { + +using executor::NetworkInterfaceMock; +using executor::RemoteCommandRequest; +using executor::RemoteCommandResponse; + +class ReplCoordHBTest : public ReplCoordTest { +protected: + void assertStartSuccess(const BSONObj& configDoc, const HostAndPort& selfHost); + void assertMemberState(MemberState expected, std::string msg = ""); + ReplSetHeartbeatResponse receiveHeartbeatFrom(const ReplSetConfig& rsConfig, + int sourceId, + const HostAndPort& source); +}; + +void ReplCoordHBTest::assertStartSuccess(const BSONObj& configDoc, const HostAndPort& selfHost) { + ReplCoordTest::assertStartSuccess(addProtocolVersion(configDoc, 0), selfHost); +} + +void ReplCoordHBTest::assertMemberState(const MemberState expected, std::string msg) { + const MemberState actual = getReplCoord()->getMemberState(); + ASSERT(expected == actual) << "Expected coordinator to report state " << expected.toString() + << " but found " << actual.toString() << " - " << msg; +} + +ReplSetHeartbeatResponse ReplCoordHBTest::receiveHeartbeatFrom(const ReplSetConfig& rsConfig, + int sourceId, + const HostAndPort& source) { + ReplSetHeartbeatArgs hbArgs; + hbArgs.setProtocolVersion(1); + hbArgs.setConfigVersion(rsConfig.getConfigVersion()); + hbArgs.setSetName(rsConfig.getReplSetName()); + hbArgs.setSenderHost(source); + hbArgs.setSenderId(sourceId); + ASSERT(hbArgs.isInitialized()); + + ReplSetHeartbeatResponse response; + ASSERT_OK(getReplCoord()->processHeartbeat(hbArgs, &response)); + return response; +} + +TEST_F(ReplCoordHBTest, NodeJoinsExistingReplSetWhenReceivingAConfigContainingTheNodeViaHeartbeat) { + logger::globalLogDomain()->setMinimumLoggedSeverity(logger::LogSeverity::Debug(3)); + ReplSetConfig rsConfig = assertMakeRSConfigV0(BSON("_id" + << "mySet" + << "version" + << 3 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "h1:1") + << BSON("_id" << 2 << "host" + << "h2:1") + << BSON("_id" << 3 << "host" + << "h3:1")))); + init("mySet"); + addSelf(HostAndPort("h2", 1)); + const Date_t startDate = getNet()->now(); + start(); + enterNetwork(); + assertMemberState(MemberState::RS_STARTUP); + NetworkInterfaceMock* net = getNet(); + ASSERT_FALSE(net->hasReadyRequests()); + exitNetwork(); + receiveHeartbeatFrom(rsConfig, 1, HostAndPort("h1", 1)); + + enterNetwork(); + NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); + const RemoteCommandRequest& request = noi->getRequest(); + ASSERT_EQUALS(HostAndPort("h1", 1), request.target); + ReplSetHeartbeatArgsV1 hbArgs; + ASSERT_OK(hbArgs.initialize(request.cmdObj)); + ASSERT_EQUALS("mySet", hbArgs.getSetName()); + ASSERT_EQUALS(-2, hbArgs.getConfigVersion()); + ReplSetHeartbeatResponse hbResp; + hbResp.setSetName("mySet"); + hbResp.setState(MemberState::RS_PRIMARY); + hbResp.noteReplSet(); + hbResp.setConfigVersion(rsConfig.getConfigVersion()); + hbResp.setConfig(rsConfig); + BSONObjBuilder responseBuilder; + responseBuilder << "ok" << 1; + hbResp.addToBSON(&responseBuilder, false); + net->scheduleResponse( + noi, startDate + Milliseconds(200), makeResponseStatus(responseBuilder.obj())); + assertRunUntil(startDate + Milliseconds(200)); + + // Because the new config is stored using an out-of-band thread, we need to perform some + // extra synchronization to let the executor finish the heartbeat reconfig. We know that + // after the out-of-band thread completes, it schedules new heartbeats. We assume that no + // other network operations get scheduled during or before the reconfig, though this may + // cease to be true in the future. + noi = net->getNextReadyRequest(); + + assertMemberState(MemberState::RS_STARTUP2); + OperationContextNoop opCtx; + ReplSetConfig storedConfig; + ASSERT_OK(storedConfig.initialize( + unittest::assertGet(getExternalState()->loadLocalConfigDocument(&opCtx)))); + ASSERT_OK(storedConfig.validate()); + ASSERT_EQUALS(3, storedConfig.getConfigVersion()); + ASSERT_EQUALS(3, storedConfig.getNumMembers()); + exitNetwork(); +} + +TEST_F(ReplCoordHBTest, + NodeDoesNotJoinExistingReplSetWhenReceivingAConfigNotContainingTheNodeViaHeartbeat) { + // Tests that a node in RS_STARTUP will not transition to RS_REMOVED if it receives a + // configuration that does not contain it. + logger::globalLogDomain()->setMinimumLoggedSeverity(logger::LogSeverity::Debug(3)); + ReplSetConfig rsConfig = assertMakeRSConfigV0(BSON("_id" + << "mySet" + << "version" + << 3 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "h1:1") + << BSON("_id" << 2 << "host" + << "h2:1") + << BSON("_id" << 3 << "host" + << "h3:1")))); + init("mySet"); + addSelf(HostAndPort("h4", 1)); + const Date_t startDate = getNet()->now(); + start(); + enterNetwork(); + assertMemberState(MemberState::RS_STARTUP, "1"); + NetworkInterfaceMock* net = getNet(); + ASSERT_FALSE(net->hasReadyRequests()); + exitNetwork(); + receiveHeartbeatFrom(rsConfig, 1, HostAndPort("h1", 1)); + + enterNetwork(); + NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); + const RemoteCommandRequest& request = noi->getRequest(); + ASSERT_EQUALS(HostAndPort("h1", 1), request.target); + ReplSetHeartbeatArgsV1 hbArgs; + ASSERT_OK(hbArgs.initialize(request.cmdObj)); + ASSERT_EQUALS("mySet", hbArgs.getSetName()); + ASSERT_EQUALS(-2, hbArgs.getConfigVersion()); + ReplSetHeartbeatResponse hbResp; + hbResp.setSetName("mySet"); + hbResp.setState(MemberState::RS_PRIMARY); + hbResp.noteReplSet(); + hbResp.setConfigVersion(rsConfig.getConfigVersion()); + hbResp.setConfig(rsConfig); + BSONObjBuilder responseBuilder; + responseBuilder << "ok" << 1; + hbResp.addToBSON(&responseBuilder, false); + net->scheduleResponse( + noi, startDate + Milliseconds(200), makeResponseStatus(responseBuilder.obj())); + assertRunUntil(startDate + Milliseconds(2200)); + + // Because the new config is stored using an out-of-band thread, we need to perform some + // extra synchronization to let the executor finish the heartbeat reconfig. We know that + // after the out-of-band thread completes, it schedules new heartbeats. We assume that no + // other network operations get scheduled during or before the reconfig, though this may + // cease to be true in the future. + noi = net->getNextReadyRequest(); + + assertMemberState(MemberState::RS_STARTUP, "2"); + OperationContextNoop opCtx; + + StatusWith<BSONObj> loadedConfig(getExternalState()->loadLocalConfigDocument(&opCtx)); + ASSERT_NOT_OK(loadedConfig.getStatus()) << loadedConfig.getValue(); + exitNetwork(); +} + +TEST_F(ReplCoordHBTest, NodeReturnsNotYetInitializedInResponseToAHeartbeatReceivedPriorToAConfig) { + // ensure that if we've yet to receive an initial config, we return NotYetInitialized + init("mySet"); + ReplSetHeartbeatArgs hbArgs; + hbArgs.setProtocolVersion(1); + hbArgs.setConfigVersion(3); + hbArgs.setSetName("mySet"); + hbArgs.setSenderHost(HostAndPort("h1:1")); + hbArgs.setSenderId(1); + ASSERT(hbArgs.isInitialized()); + + ReplSetHeartbeatResponse response; + Status status = getReplCoord()->processHeartbeat(hbArgs, &response); + ASSERT_EQUALS(ErrorCodes::NotYetInitialized, status.code()); +} + +TEST_F(ReplCoordHBTest, + NodeChangesToRecoveringStateWhenAllNodesRespondToHeartbeatsWithUnauthorized) { + // Tests that a node that only has auth error heartbeats is recovering + logger::globalLogDomain()->setMinimumLoggedSeverity(logger::LogSeverity::Debug(3)); + assertStartSuccess(BSON("_id" + << "mySet" + << "version" + << 1 + << "members" + << BSON_ARRAY(BSON("_id" << 1 << "host" + << "node1:12345") + << BSON("_id" << 2 << "host" + << "node2:12345"))), + HostAndPort("node1", 12345)); + ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + + // process heartbeat + enterNetwork(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); + const RemoteCommandRequest& request = noi->getRequest(); + log() << request.target.toString() << " processing " << request.cmdObj; + getNet()->scheduleResponse(noi, + getNet()->now(), + makeResponseStatus(BSON("ok" << 0.0 << "errmsg" + << "unauth'd" + << "code" + << ErrorCodes::Unauthorized))); + + if (request.target != HostAndPort("node2", 12345) && + request.cmdObj.firstElement().fieldNameStringData() != "replSetHeartbeat") { + error() << "Black holing unexpected request to " << request.target << ": " + << request.cmdObj; + getNet()->blackHole(noi); + } + getNet()->runReadyNetworkOperations(); + exitNetwork(); + + ASSERT_TRUE(getTopoCoord().getMemberState().recovering()); + assertMemberState(MemberState::RS_RECOVERING, "0"); +} + +} // namespace +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index c981c0af10f..f374290ec8f 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -1409,71 +1409,6 @@ TEST_F(ReplCoordTest, NodeReturnsBadValueWhenUpdateTermIsRunAgainstANonReplNode) ASSERT_EQUALS(ErrorCodes::BadValue, getReplCoord()->updateTerm(opCtx.get(), 0).code()); } -TEST_F(ReplCoordTest, ElectionIdTracksTermInPV1) { - init("mySet/test1:1234,test2:1234,test3:1234"); - - assertStartSuccess(BSON("_id" - << "mySet" - << "version" - << 1 - << "members" - << BSON_ARRAY(BSON("_id" << 0 << "host" - << "test1:1234") - << BSON("_id" << 1 << "host" - << "test2:1234") - << BSON("_id" << 2 << "host" - << "test3:1234")) - << "protocolVersion" - << 1), - HostAndPort("test1", 1234)); - getReplCoord()->setMyLastAppliedOpTime(OpTime(Timestamp(100, 1), 0)); - getReplCoord()->setMyLastDurableOpTime(OpTime(Timestamp(100, 1), 0)); - ASSERT_OK(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); - - // No election has taken place yet. - { - auto term = getTopoCoord().getTerm(); - auto electionId = getTopoCoord().getElectionId(); - - ASSERT_EQUALS(0, term); - ASSERT_FALSE(electionId.isSet()); - } - - simulateSuccessfulV1Election(); - - ASSERT_TRUE(getReplCoord()->getMemberState().primary()); - - // Check that the electionId is set properly after the election. - { - auto term = getTopoCoord().getTerm(); - auto electionId = getTopoCoord().getElectionId(); - - ASSERT_EQUALS(1, term); - ASSERT_EQUALS(OID::fromTerm(term), electionId); - } - - const auto opCtx = makeOperationContext(); - auto status = getReplCoord()->stepDown(opCtx.get(), true, Milliseconds(0), Milliseconds(1000)); - - ASSERT_OK(status); - ASSERT_TRUE(getReplCoord()->getMemberState().secondary()); - - simulateSuccessfulV1ElectionWithoutExitingDrainMode( - getReplCoord()->getElectionTimeout_forTest()); - - ASSERT_TRUE(getReplCoord()->getMemberState().primary()); - - // Check that the electionId is again properly set after the new election. - { - auto term = getTopoCoord().getTerm(); - auto electionId = getTopoCoord().getElectionId(); - - ASSERT_EQUALS(2, term); - ASSERT_EQUALS(OID::fromTerm(term), electionId); - } -} - TEST_F(ReplCoordTest, NodeChangesTermAndStepsDownWhenAndOnlyWhenUpdateTermSuppliesAHigherTerm) { init("mySet/test1:1234,test2:1234,test3:1234"); @@ -4475,6 +4410,32 @@ TEST_F(ReplCoordTest, replCoord->getElectionTimeout_forTest()); } +TEST_F(ReplCoordTest, DoNotScheduleElectionWhenCancelAndRescheduleElectionTimeoutIsRunInPV0) { + assertStartSuccess(BSON("_id" + << "mySet" + << "protocolVersion" + << 0 + << "version" + << 2 + << "members" + << BSON_ARRAY(BSON("host" + << "node1:12345" + << "_id" + << 0) + << BSON("host" + << "node2:12345" + << "_id" + << 1))), + HostAndPort("node1", 12345)); + ReplicationCoordinatorImpl* replCoord = getReplCoord(); + ASSERT_OK(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); + + getReplCoord()->cancelAndRescheduleElectionTimeout(); + + auto electionTimeoutWhen = replCoord->getElectionTimeout_forTest(); + ASSERT_EQUALS(Date_t(), electionTimeoutWhen); +} + TEST_F(ReplCoordTest, DoNotScheduleElectionWhenCancelAndRescheduleElectionTimeoutIsRunInRollback) { assertStartSuccess(BSON("_id" << "mySet" diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp index a7ce2561f57..c2278a72e9a 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp @@ -72,6 +72,10 @@ ReplSetConfig ReplCoordTest::assertMakeRSConfig(const BSONObj& configBson) { return config; } +ReplSetConfig ReplCoordTest::assertMakeRSConfigV0(const BSONObj& configBson) { + return assertMakeRSConfig(addProtocolVersion(configBson, 0)); +} + BSONObj ReplCoordTest::addProtocolVersion(const BSONObj& configDoc, int protocolVersion) { BSONObjBuilder builder; builder << "protocolVersion" << protocolVersion; @@ -234,7 +238,8 @@ void ReplCoordTest::simulateEnoughHeartbeatsForAllNodesUp() { log() << request.target.toString() << " processing " << request.cmdObj; ReplSetHeartbeatArgsV1 hbArgs; ReplSetHeartbeatArgs hbArgsPV0; - if (hbArgs.initialize(request.cmdObj).isOK()) { + if (hbArgs.initialize(request.cmdObj).isOK() || + hbArgsPV0.initialize(request.cmdObj).isOK()) { ReplSetHeartbeatResponse hbResp; hbResp.setSetName(rsConfig.getReplSetName()); hbResp.setState(MemberState::RS_SECONDARY); @@ -388,6 +393,68 @@ void ReplCoordTest::simulateSuccessfulV1ElectionAt(Date_t electionTime) { ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString(); } +void ReplCoordTest::simulateSuccessfulElection() { + ReplicationCoordinatorImpl* replCoord = getReplCoord(); + NetworkInterfaceMock* net = getNet(); + ReplSetConfig rsConfig = replCoord->getReplicaSetConfig_forTest(); + ASSERT(replCoord->getMemberState().secondary()) << replCoord->getMemberState().toString(); + bool hasReadyRequests = true; + // Process requests until we're primary and consume the heartbeats for the notification + // of election win. + while (!replCoord->getMemberState().primary() || hasReadyRequests) { + log() << "Waiting on network in state " << replCoord->getMemberState(); + getNet()->enterNetwork(); + const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); + const RemoteCommandRequest& request = noi->getRequest(); + log() << request.target.toString() << " processing " << request.cmdObj; + ReplSetHeartbeatArgs hbArgs; + if (hbArgs.initialize(request.cmdObj).isOK()) { + ReplSetHeartbeatResponse hbResp; + hbResp.setSetName(rsConfig.getReplSetName()); + hbResp.setState(MemberState::RS_SECONDARY); + hbResp.setConfigVersion(rsConfig.getConfigVersion()); + BSONObjBuilder respObj; + respObj << "ok" << 1; + hbResp.addToBSON(&respObj, false); + net->scheduleResponse(noi, net->now(), makeResponseStatus(respObj.obj())); + } else if (request.cmdObj.firstElement().fieldNameStringData() == "replSetFresh") { + net->scheduleResponse( + noi, + net->now(), + makeResponseStatus(BSON( + "ok" << 1 << "fresher" << false << "opTime" << Date_t() << "veto" << false))); + } else if (request.cmdObj.firstElement().fieldNameStringData() == "replSetElect") { + net->scheduleResponse(noi, + net->now(), + makeResponseStatus(BSON("ok" << 1 << "vote" << 1 << "round" + << request.cmdObj["round"].OID()))); + } else { + error() << "Black holing unexpected request to " << request.target << ": " + << request.cmdObj; + net->blackHole(noi); + } + net->runReadyNetworkOperations(); + hasReadyRequests = net->hasReadyRequests(); + getNet()->exitNetwork(); + } + ASSERT(replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining); + ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString(); + + IsMasterResponse imResponse; + replCoord->fillIsMasterForReplSet(&imResponse); + ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString(); + ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString(); + { + auto opCtx = makeOperationContext(); + replCoord->signalDrainComplete(opCtx.get(), replCoord->getTerm()); + } + replCoord->fillIsMasterForReplSet(&imResponse); + ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString(); + ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString(); + + ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString(); +} + void ReplCoordTest::signalDrainComplete(OperationContext* opCtx) { getExternalState()->setFirstOpTimeOfMyTerm(OpTime(Timestamp(1, 1), getReplCoord()->getTerm())); getReplCoord()->signalDrainComplete(opCtx, getReplCoord()->getTerm()); diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.h b/src/mongo/db/repl/replication_coordinator_test_fixture.h index 67f30ceeff9..15b0027b32f 100644 --- a/src/mongo/db/repl/replication_coordinator_test_fixture.h +++ b/src/mongo/db/repl/replication_coordinator_test_fixture.h @@ -74,6 +74,7 @@ public: * Constructs a ReplSetConfig from the given BSON, or raises a test failure exception. */ static ReplSetConfig assertMakeRSConfig(const BSONObj& configBSON); + static ReplSetConfig assertMakeRSConfigV0(const BSONObj& configBson); /** * Adds { protocolVersion: 0 or 1 } to the config. @@ -224,6 +225,7 @@ protected: * * Behavior is unspecified if node does not have a clean config, is not in SECONDARY, etc. */ + void simulateSuccessfulElection(); void simulateSuccessfulV1Election(); /** diff --git a/src/mongo/shell/utils.js b/src/mongo/shell/utils.js index 6d130491b29..c07aebac8d7 100644 --- a/src/mongo/shell/utils.js +++ b/src/mongo/shell/utils.js @@ -1405,29 +1405,49 @@ rs.debug.getLastOpWritten = function(server) { }; /** - * Compares OpTimes in the format {ts:Timestamp, t:NumberLong}. - * Returns -1 if ot1 is 'earlier' than ot2, 1 if 'later' and 0 if equal. + * Compares OpTimes. Returns -1 if ot1 is 'earlier' than ot2, 1 if 'later' and 0 if equal. + * + * Note: Since Protocol Version 1 was introduced for replication, 'OpTimes' + * can come in two different formats. This function will throw an error when the OpTime + * passed do not have the same protocol version. + * + * OpTime Formats: + * PV0: Timestamp + * PV1: {ts:Timestamp, t:NumberLong} */ rs.compareOpTimes = function(ot1, ot2) { + function _isOpTimeV1(opTime) { + return (opTime.hasOwnProperty("ts") && opTime.hasOwnProperty("t")); + } + function _isEmptyOpTime(opTime) { + return (opTime.ts.getTime() == 0 && opTime.ts.getInc() == 0 && opTime.t == -1); + } - function _isValidOptime(opTime) { - let timestampIsValid = (opTime.hasOwnProperty("ts") && (opTime.ts !== Timestamp(0, 0))); - let termIsValid = (opTime.hasOwnProperty("t") && (opTime.t != -1)); + // Make sure both OpTimes have a timestamp and a term. + var ot1 = _isOpTimeV1(ot1) ? ot1 : {ts: ot1, t: NumberLong(-1)}; + var ot2 = _isOpTimeV1(ot2) ? ot2 : {ts: ot2, t: NumberLong(-1)}; - return timestampIsValid && termIsValid; + if (_isEmptyOpTime(ot1) || _isEmptyOpTime(ot2)) { + throw Error("cannot do comparison with empty OpTime, received: " + tojson(ot1) + " and " + + tojson(ot2)); } - if (!_isValidOptime(ot1) || !_isValidOptime(ot2)) { - throw Error("invalid optimes, received: " + tojson(ot1) + " and " + tojson(ot2)); + if ((ot1.t == -1 && ot2.t != -1) || (ot1.t != -1 && ot2.t == -1)) { + throw Error("cannot compare OpTimes between different protocol versions, received: " + + tojson(ot1) + " and " + tojson(ot2)); } - if (ot1.t > ot2.t) { - return 1; - } else if (ot1.t < ot2.t) { - return -1; - } else { - return timestampCmp(ot1.ts, ot2.ts); + if (!friendlyEqual(ot1.t, ot2.t)) { + if (ot1.t < ot2.t) { + return -1; + } else { + return 1; + } } + // else equal terms, so proceed to compare timestamp component. + + // Otherwise, choose the OpTime with the lower timestamp. + return timestampCmp(ot1.ts, ot2.ts); }; help = shellHelper.help = function(x) { |