diff options
author | matt dannenberg <matt.dannenberg@10gen.com> | 2015-05-13 11:44:27 -0400 |
---|---|---|
committer | matt dannenberg <matt.dannenberg@10gen.com> | 2015-05-19 17:57:13 -0400 |
commit | a24b7e7653abd981cc468a790b42c0fa796b14c9 (patch) | |
tree | 9aea56cebafa2dd3c3f258d774197a2c2c794910 /src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp | |
parent | fde1c4ecfd436484ddd20dbf446222496e416128 (diff) | |
download | mongo-a24b7e7653abd981cc468a790b42c0fa796b14c9.tar.gz |
SERVER-18152 SERVER-18254 new election code path and heartbeat protocol
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp | 251 |
1 files changed, 251 insertions, 0 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp new file mode 100644 index 00000000000..7a64aad68d8 --- /dev/null +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -0,0 +1,251 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/jsobj.h" +#include "mongo/db/operation_context_noop.h" +#include "mongo/db/repl/network_interface_mock.h" +#include "mongo/db/repl/repl_set_heartbeat_args.h" +#include "mongo/db/repl/repl_set_heartbeat_args_v1.h" +#include "mongo/db/repl/replica_set_config.h" +#include "mongo/db/repl/replication_coordinator_external_state_mock.h" +#include "mongo/db/repl/replication_coordinator_impl.h" +#include "mongo/db/repl/replication_coordinator_test_fixture.h" +#include "mongo/db/repl/topology_coordinator_impl.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace repl { +namespace { + + class ReplCoordHBV1Test : public ReplCoordTest { + protected: + void assertMemberState(MemberState expected, std::string msg = ""); + ReplSetHeartbeatResponse receiveHeartbeatFrom( + const ReplicaSetConfig& rsConfig, + int sourceId, + const HostAndPort& source); + }; + + void ReplCoordHBV1Test::assertMemberState(const MemberState expected, std::string msg) { + const MemberState actual = getReplCoord()->getMemberState(); + ASSERT(expected == actual) << "Expected coordinator to report state " << + expected.toString() << " but found " << actual.toString() << " - " << msg; + } + + ReplSetHeartbeatResponse ReplCoordHBV1Test::receiveHeartbeatFrom( + const ReplicaSetConfig& rsConfig, + int sourceId, + const HostAndPort& source) { + ReplSetHeartbeatArgsV1 hbArgs; + hbArgs.setConfigVersion(rsConfig.getConfigVersion()); + hbArgs.setSetName(rsConfig.getReplSetName()); + hbArgs.setSenderHost(source); + hbArgs.setSenderId(sourceId); + hbArgs.setTerm(1); + ASSERT(hbArgs.isInitialized()); + + ReplSetHeartbeatResponse response; + ASSERT_OK(getReplCoord()->processHeartbeatV1(hbArgs, &response)); + return response; + } + + TEST_F(ReplCoordHBV1Test, JoinExistingReplSet) { + logger::globalLogDomain()->setMinimumLoggedSeverity(logger::LogSeverity::Debug(3)); + ReplicaSetConfig rsConfig = assertMakeRSConfig( + BSON("_id" << "mySet" << + "version" << 3 << + "members" << BSON_ARRAY(BSON("_id" << 1 << "host" << "h1:1") << + BSON("_id" << 2 << "host" << "h2:1") << + BSON("_id" << 3 << "host" << "h3:1")) << + "protocolVersion" << 1)); + init("mySet"); + addSelf(HostAndPort("h2", 1)); + const Date_t startDate = getNet()->now(); + start(); + enterNetwork(); + assertMemberState(MemberState::RS_STARTUP); + NetworkInterfaceMock* net = getNet(); + ASSERT_FALSE(net->hasReadyRequests()); + exitNetwork(); + receiveHeartbeatFrom(rsConfig, 1, HostAndPort("h1", 1)); + + enterNetwork(); + NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); + const RemoteCommandRequest& request = noi->getRequest(); + ASSERT_EQUALS(HostAndPort("h1", 1), request.target); + ReplSetHeartbeatArgs hbArgs; + ASSERT_OK(hbArgs.initialize(request.cmdObj)); + ASSERT_EQUALS("mySet", hbArgs.getSetName()); + ASSERT_EQUALS(-2, hbArgs.getConfigVersion()); + ReplSetHeartbeatResponse hbResp; + hbResp.setSetName("mySet"); + hbResp.setState(MemberState::RS_PRIMARY); + hbResp.setConfigVersion(rsConfig.getConfigVersion()); + hbResp.setConfig(rsConfig); + BSONObjBuilder responseBuilder; + responseBuilder << "ok" << 1; + hbResp.addToBSON(&responseBuilder); + net->scheduleResponse(noi, + startDate + Milliseconds(200), + makeResponseStatus(responseBuilder.obj())); + assertRunUntil(startDate + Milliseconds(200)); + + // Because the new config is stored using an out-of-band thread, we need to perform some + // extra synchronization to let the executor finish the heartbeat reconfig. We know that + // after the out-of-band thread completes, it schedules new heartbeats. We assume that no + // other network operations get scheduled during or before the reconfig, though this may + // cease to be true in the future. + noi = net->getNextReadyRequest(); + + assertMemberState(MemberState::RS_STARTUP2); + OperationContextNoop txn; + ReplicaSetConfig storedConfig; + ASSERT_OK(storedConfig.initialize( + unittest::assertGet(getExternalState()->loadLocalConfigDocument(&txn)))); + ASSERT_OK(storedConfig.validate()); + ASSERT_EQUALS(3, storedConfig.getConfigVersion()); + ASSERT_EQUALS(3, storedConfig.getNumMembers()); + exitNetwork(); + } + + TEST_F(ReplCoordHBV1Test, DoNotJoinReplSetIfNotAMember) { + // Tests that a node in RS_STARTUP will not transition to RS_REMOVED if it receives a + // configuration that does not contain it. + logger::globalLogDomain()->setMinimumLoggedSeverity(logger::LogSeverity::Debug(3)); + ReplicaSetConfig rsConfig = assertMakeRSConfig( + BSON("_id" << "mySet" << + "version" << 3 << + "members" << BSON_ARRAY(BSON("_id" << 1 << "host" << "h1:1") << + BSON("_id" << 2 << "host" << "h2:1") << + BSON("_id" << 3 << "host" << "h3:1")) << + "protocolVersion" << 1)); + init("mySet"); + addSelf(HostAndPort("h4", 1)); + const Date_t startDate = getNet()->now(); + start(); + enterNetwork(); + assertMemberState(MemberState::RS_STARTUP, "1"); + NetworkInterfaceMock* net = getNet(); + ASSERT_FALSE(net->hasReadyRequests()); + exitNetwork(); + receiveHeartbeatFrom(rsConfig, 1, HostAndPort("h1", 1)); + + enterNetwork(); + NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); + const RemoteCommandRequest& request = noi->getRequest(); + ASSERT_EQUALS(HostAndPort("h1", 1), request.target); + ReplSetHeartbeatArgs hbArgs; + ASSERT_OK(hbArgs.initialize(request.cmdObj)); + ASSERT_EQUALS("mySet", hbArgs.getSetName()); + ASSERT_EQUALS(-2, hbArgs.getConfigVersion()); + ReplSetHeartbeatResponse hbResp; + hbResp.setSetName("mySet"); + hbResp.setState(MemberState::RS_PRIMARY); + hbResp.setConfigVersion(rsConfig.getConfigVersion()); + hbResp.setConfig(rsConfig); + BSONObjBuilder responseBuilder; + responseBuilder << "ok" << 1; + hbResp.addToBSON(&responseBuilder); + net->scheduleResponse(noi, + startDate + Milliseconds(200), + makeResponseStatus(responseBuilder.obj())); + assertRunUntil(startDate + Milliseconds(2200)); + + // Because the new config is stored using an out-of-band thread, we need to perform some + // extra synchronization to let the executor finish the heartbeat reconfig. We know that + // after the out-of-band thread completes, it schedules new heartbeats. We assume that no + // other network operations get scheduled during or before the reconfig, though this may + // cease to be true in the future. + noi = net->getNextReadyRequest(); + + assertMemberState(MemberState::RS_STARTUP, "2"); + OperationContextNoop txn; + + StatusWith<BSONObj> loadedConfig(getExternalState()->loadLocalConfigDocument(&txn)); + ASSERT_NOT_OK(loadedConfig.getStatus()) << loadedConfig.getValue(); + exitNetwork(); + } + + TEST_F(ReplCoordHBV1Test, NotYetInitializedConfigStateEarlyReturn) { + // ensure that if we've yet to receive an initial config, we return NotYetInitialized + init("mySet"); + ReplSetHeartbeatArgsV1 hbArgs; + hbArgs.setConfigVersion(3); + hbArgs.setSetName("mySet"); + hbArgs.setSenderHost(HostAndPort("h1:1")); + hbArgs.setSenderId(1); + hbArgs.setTerm(1); + ASSERT(hbArgs.isInitialized()); + + ReplSetHeartbeatResponse response; + Status status = getReplCoord()->processHeartbeatV1(hbArgs, &response); + ASSERT_EQUALS(ErrorCodes::NotYetInitialized, status.code()); + } + + TEST_F(ReplCoordHBV1Test, OnlyUnauthorizedUpCausesRecovering) { + // Tests that a node that only has auth error heartbeats is recovering + logger::globalLogDomain()->setMinimumLoggedSeverity(logger::LogSeverity::Debug(3)); + assertStartSuccess( + BSON("_id" << "mySet" << + "version" << 1 << + "members" << BSON_ARRAY(BSON("_id" << 1 << "host" << "node1:12345") << + BSON("_id" << 2 << "host" << "node2:12345"))), + HostAndPort("node1", 12345)); + ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); + + // process heartbeat + enterNetwork(); + const NetworkInterfaceMock::NetworkOperationIterator noi = getNet()->getNextReadyRequest(); + const RemoteCommandRequest& request = noi->getRequest(); + log() << request.target.toString() << " processing " << request.cmdObj; + getNet()->scheduleResponse(noi, getNet()->now(), makeResponseStatus( + BSON("ok" << 0.0 << + "errmsg" << "unauth'd" << + "code" << ErrorCodes::Unauthorized))); + + if (request.target != HostAndPort("node2", 12345) + && request.cmdObj.firstElement().fieldNameStringData() != "replSetHeartbeat") { + error() << "Black holing unexpected request to " + << request.target << ": " << request.cmdObj; + getNet()->blackHole(noi); + } + getNet()->runReadyNetworkOperations(); + exitNetwork(); + + ASSERT_TRUE(getTopoCoord().getMemberState().recovering()); + assertMemberState(MemberState::RS_RECOVERING, "0"); + } + +} // namespace +} // namespace repl +} // namespace mongo |