/** * Copyright (C) 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication #include "mongo/platform/basic.h" #include "mongo/db/repl/replication_coordinator_test_fixture.h" #include "mongo/db/operation_context_noop.h" #include "mongo/db/repl/is_master_response.h" #include "mongo/db/repl/operation_context_repl_mock.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_args_v1.h" #include "mongo/db/repl/replication_coordinator_external_state_mock.h" #include "mongo/db/repl/replication_coordinator_impl.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/executor/network_interface_mock.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" #include "mongo/util/log.h" namespace mongo { namespace repl { namespace { bool stringContains(const std::string &haystack, const std::string& needle) { return haystack.find(needle) != std::string::npos; } } // namespace using executor::NetworkInterfaceMock; ReplicaSetConfig ReplCoordTest::assertMakeRSConfig(const BSONObj& configBson) { ReplicaSetConfig config; ASSERT_OK(config.initialize(configBson)); ASSERT_OK(config.validate()); return config; } ReplCoordTest::ReplCoordTest() : _callShutdown(false) {} ReplCoordTest::~ReplCoordTest() {} void ReplCoordTest::setUp() { _settings.replSet = "mySet/node1:12345,node2:54321"; } void ReplCoordTest::tearDown() { if (_externalState) { _externalState->setStoreLocalConfigDocumentToHang(false); } if (_callShutdown) { shutdown(); } } void ReplCoordTest::assertRunUntil(Date_t newTime) { this->_net->runUntil(newTime); ASSERT_EQUALS(newTime, getNet()->now()); } void ReplCoordTest::enterNetwork() { getNet()->enterNetwork(); } void ReplCoordTest::exitNetwork() { getNet()->exitNetwork(); } void ReplCoordTest::addSelf(const HostAndPort& selfHost) { getExternalState()->addSelf(selfHost); } void ReplCoordTest::init() { invariant(!_repl); invariant(!_callShutdown); // PRNG seed for tests. const int64_t seed = 0; _topo = new TopologyCoordinatorImpl(Seconds(0)); _net = new NetworkInterfaceMock; _storage = new StorageInterfaceMock; _externalState = new ReplicationCoordinatorExternalStateMock; _repl.reset(new ReplicationCoordinatorImpl(_settings, _externalState, _net, _storage, _topo, seed)); } void ReplCoordTest::init(const ReplSettings& settings) { _settings = settings; init(); } void ReplCoordTest::init(const std::string& replSet) { _settings.replSet = replSet; init(); } void ReplCoordTest::start() { invariant(!_callShutdown); // if we haven't initialized yet, do that first. if (!_repl) { init(); } OperationContextNoop txn; _repl->startReplication(&txn); _repl->waitForStartUpComplete(); _callShutdown = true; } void ReplCoordTest::start(const BSONObj& configDoc, const HostAndPort& selfHost) { if (!_repl) { init(); } _externalState->setLocalConfigDocument(StatusWith(configDoc)); _externalState->addSelf(selfHost); start(); } void ReplCoordTest::start(const HostAndPort& selfHost) { if (!_repl) { init(); } _externalState->addSelf(selfHost); start(); } void ReplCoordTest::assertStartSuccess( const BSONObj& configDoc, const HostAndPort& selfHost) { start(configDoc, selfHost); ASSERT_NE(MemberState::RS_STARTUP, getReplCoord()->getMemberState().s); } ResponseStatus ReplCoordTest::makeResponseStatus(const BSONObj& doc, Milliseconds millis) { log() << "Responding with " << doc; return ResponseStatus(RemoteCommandResponse(doc, millis)); } void ReplCoordTest::simulateSuccessfulV1Election() { OperationContextReplMock txn; ReplicationCoordinatorImpl* replCoord = getReplCoord(); NetworkInterfaceMock* net = getNet(); ReplicaSetConfig rsConfig = replCoord->getReplicaSetConfig_forTest(); ASSERT(replCoord->getMemberState().secondary()) << replCoord->getMemberState().toString(); while (!replCoord->getMemberState().primary()) { log() << "Waiting on network in state " << replCoord->getMemberState(); getNet()->enterNetwork(); const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); const RemoteCommandRequest& request = noi->getRequest(); log() << request.target.toString() << " processing " << request.cmdObj; ReplSetHeartbeatArgsV1 hbArgs; Status status = hbArgs.initialize(request.cmdObj); if (hbArgs.initialize(request.cmdObj).isOK()) { ReplSetHeartbeatResponse hbResp; hbResp.setSetName(rsConfig.getReplSetName()); hbResp.setState(MemberState::RS_SECONDARY); hbResp.setConfigVersion(rsConfig.getConfigVersion()); net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON(true))); } else if (request.cmdObj.firstElement().fieldNameStringData() == "replSetRequestVotes") { net->scheduleResponse(noi, net->now(), makeResponseStatus( BSON("ok" << 1 << "reason" << "" << "term" << request.cmdObj["term"].Long() << "voteGranted" << true))); } else if (request.cmdObj.firstElement().fieldNameStringData() == "replSetDeclareElectionWinner") { net->scheduleResponse(noi, net->now(), makeResponseStatus( BSON("ok" << 1 << "term" << request.cmdObj["term"].Long()))); } else { error() << "Black holing unexpected request to " << request.target << ": " << request.cmdObj; net->blackHole(noi); } net->runReadyNetworkOperations(); getNet()->exitNetwork(); } ASSERT(replCoord->isWaitingForApplierToDrain()); ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString(); IsMasterResponse imResponse; replCoord->fillIsMasterForReplSet(&imResponse); ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString(); replCoord->signalDrainComplete(&txn); replCoord->fillIsMasterForReplSet(&imResponse); ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString(); ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString(); } void ReplCoordTest::simulateSuccessfulElection() { OperationContextReplMock txn; ReplicationCoordinatorImpl* replCoord = getReplCoord(); NetworkInterfaceMock* net = getNet(); ReplicaSetConfig rsConfig = replCoord->getReplicaSetConfig_forTest(); ASSERT(replCoord->getMemberState().secondary()) << replCoord->getMemberState().toString(); while (!replCoord->getMemberState().primary()) { log() << "Waiting on network in state " << replCoord->getMemberState(); getNet()->enterNetwork(); const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); const RemoteCommandRequest& request = noi->getRequest(); log() << request.target.toString() << " processing " << request.cmdObj; ReplSetHeartbeatArgs hbArgs; if (hbArgs.initialize(request.cmdObj).isOK()) { ReplSetHeartbeatResponse hbResp; hbResp.setSetName(rsConfig.getReplSetName()); hbResp.setState(MemberState::RS_SECONDARY); hbResp.setConfigVersion(rsConfig.getConfigVersion()); BSONObjBuilder respObj; respObj << "ok" << 1; hbResp.addToBSON(&respObj, false); net->scheduleResponse(noi, net->now(), makeResponseStatus(respObj.obj())); } else if (request.cmdObj.firstElement().fieldNameStringData() == "replSetFresh") { net->scheduleResponse(noi, net->now(), makeResponseStatus( BSON("ok" << 1 << "fresher" << false << "opTime" << Date_t() << "veto" << false))); } else if (request.cmdObj.firstElement().fieldNameStringData() == "replSetElect") { net->scheduleResponse(noi, net->now(), makeResponseStatus( BSON("ok" << 1 << "vote" << 1 << "round" << request.cmdObj["round"].OID()))); } else { error() << "Black holing unexpected request to " << request.target << ": " << request.cmdObj; net->blackHole(noi); } net->runReadyNetworkOperations(); getNet()->exitNetwork(); } ASSERT(replCoord->isWaitingForApplierToDrain()); ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString(); IsMasterResponse imResponse; replCoord->fillIsMasterForReplSet(&imResponse); ASSERT_FALSE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_TRUE(imResponse.isSecondary()) << imResponse.toBSON().toString(); replCoord->signalDrainComplete(&txn); replCoord->fillIsMasterForReplSet(&imResponse); ASSERT_TRUE(imResponse.isMaster()) << imResponse.toBSON().toString(); ASSERT_FALSE(imResponse.isSecondary()) << imResponse.toBSON().toString(); ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString(); } void ReplCoordTest::simulateStepDownOnIsolation() { ReplicationCoordinatorImpl* replCoord = getReplCoord(); NetworkInterfaceMock* net = getNet(); ReplicaSetConfig rsConfig = replCoord->getReplicaSetConfig_forTest(); ASSERT(replCoord->getMemberState().primary()) << replCoord->getMemberState().toString(); while (replCoord->getMemberState().primary()) { log() << "Waiting on network in state " << replCoord->getMemberState(); getNet()->enterNetwork(); net->runUntil(net->now() + Seconds(10)); const NetworkInterfaceMock::NetworkOperationIterator noi = net->getNextReadyRequest(); const RemoteCommandRequest& request = noi->getRequest(); log() << request.target.toString() << " processing " << request.cmdObj; ReplSetHeartbeatArgs hbArgs; if (hbArgs.initialize(request.cmdObj).isOK()) { net->scheduleResponse(noi, net->now(), ResponseStatus(ErrorCodes::NetworkTimeout, "Nobody's home")); } else { error() << "Black holing unexpected request to " << request.target << ": " << request.cmdObj; net->blackHole(noi); } net->runReadyNetworkOperations(); getNet()->exitNetwork(); } } void ReplCoordTest::shutdown() { invariant(_callShutdown); _net->exitNetwork(); _repl->shutdown(); _callShutdown = false; } int64_t ReplCoordTest::countLogLinesContaining(const std::string& needle) { return std::count_if(getCapturedLogMessages().begin(), getCapturedLogMessages().end(), stdx::bind(stringContains, stdx::placeholders::_1, needle)); } } // namespace repl } // namespace mongo