/** * Copyright (C) 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include #include #include "mongo/base/status.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/member_heartbeat_data.h" #include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_executor.h" #include "mongo/db/repl/freshness_checker.h" #include "mongo/platform/unordered_set.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" #include "mongo/util/mongoutils/str.h" namespace mongo { namespace repl { namespace { using unittest::assertGet; typedef ReplicationExecutor::RemoteCommandRequest RemoteCommandRequest; bool stringContains(const std::string &haystack, const std::string& needle) { return haystack.find(needle) != std::string::npos; } class FreshnessCheckerTest : public mongo::unittest::Test { protected: void startTest(const OpTime& lastOpTimeApplied, const ReplicaSetConfig& currentConfig, int selfIndex, const std::vector& hosts); void waitOnChecker(); FreshnessChecker::ElectionAbortReason shouldAbortElection() const; int64_t countLogLinesContaining(const std::string& needle) { return std::count_if(getCapturedLogMessages().begin(), getCapturedLogMessages().end(), stdx::bind(stringContains, stdx::placeholders::_1, needle)); } NetworkInterfaceMock* _net; boost::scoped_ptr _executor; boost::scoped_ptr _executorThread; private: void freshnessCheckerRunner(const ReplicationExecutor::CallbackData& data, const OpTime& lastOpTimeApplied, const ReplicaSetConfig& currentConfig, int selfIndex, const std::vector& hosts); void setUp(); void tearDown(); boost::scoped_ptr _checker; ReplicationExecutor::EventHandle _checkerDoneEvent; }; void FreshnessCheckerTest::setUp() { _net = new NetworkInterfaceMock; _executor.reset(new ReplicationExecutor(_net, 1 /* prng seed */)); _executorThread.reset(new boost::thread(stdx::bind(&ReplicationExecutor::run, _executor.get()))); _checker.reset(new FreshnessChecker); } void FreshnessCheckerTest::tearDown() { _executor->shutdown(); _executorThread->join(); } void FreshnessCheckerTest::waitOnChecker() { _executor->waitForEvent(_checkerDoneEvent); } FreshnessChecker::ElectionAbortReason FreshnessCheckerTest::shouldAbortElection() const { return _checker->shouldAbortElection(); } ReplicaSetConfig assertMakeRSConfig(const BSONObj& configBson) { ReplicaSetConfig config; ASSERT_OK(config.initialize(configBson)); ASSERT_OK(config.validate()); return config; } const BSONObj makeFreshRequest(const ReplicaSetConfig& rsConfig, OpTime lastOpTimeApplied, int selfIndex) { const MemberConfig& myConfig = rsConfig.getMemberAt(selfIndex); return BSON("replSetFresh" << 1 << "set" << rsConfig.getReplSetName() << "opTime" << Date_t(lastOpTimeApplied.asDate()) << "who" << myConfig.getHostAndPort().toString() << "cfgver" << rsConfig.getConfigVersion() << "id" << myConfig.getId()); } // This is necessary because the run method must be scheduled in the Replication Executor // for correct concurrency operation. void FreshnessCheckerTest::freshnessCheckerRunner( const ReplicationExecutor::CallbackData& data, const OpTime& lastOpTimeApplied, const ReplicaSetConfig& currentConfig, int selfIndex, const std::vector& hosts) { invariant(data.status.isOK()); StatusWith evh = _checker->start(data.executor, lastOpTimeApplied, currentConfig, selfIndex, hosts); _checkerDoneEvent = assertGet(evh); } void FreshnessCheckerTest::startTest(const OpTime& lastOpTimeApplied, const ReplicaSetConfig& currentConfig, int selfIndex, const std::vector& hosts) { _executor->wait( assertGet( _executor->scheduleWork( stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner, this, stdx::placeholders::_1, lastOpTimeApplied, currentConfig, selfIndex, hosts)))); } TEST_F(FreshnessCheckerTest, TwoNodes) { // Two nodes, we are node h1. We are freshest, but we tie with h2. ReplicaSetConfig config = assertMakeRSConfig( BSON("_id" << "rs0" << "version" << 1 << "members" << BSON_ARRAY( BSON("_id" << 1 << "host" << "h0") << BSON("_id" << 2 << "host" << "h1")))); std::vector hosts; hosts.push_back(config.getMemberAt(1).getHostAndPort()); const BSONObj freshRequest = makeFreshRequest(config, OpTime(0,0), 0); startTest(OpTime(0, 0), config, 0, hosts); const Date_t startDate = _net->now(); _net->enterNetwork(); for (size_t i = 0; i < hosts.size(); ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); ASSERT_EQUALS("admin", noi->getRequest().dbname); ASSERT_EQUALS(freshRequest, noi->getRequest().cmdObj); ASSERT_EQUALS(HostAndPort("h1"), noi->getRequest().target); _net->scheduleResponse( noi, startDate + 10, ResponseStatus(ReplicationExecutor::RemoteCommandResponse( BSON("ok" << 1 << "id" << 2 << "set" << "rs0" << "who" << "h1" << "cfgver" << 1 << "opTime" << Date_t(OpTime(0,0).asDate())), Milliseconds(8)))); } _net->runUntil(startDate + 10); _net->exitNetwork(); ASSERT_EQUALS(startDate + 10, _net->now()); waitOnChecker(); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FreshnessTie); } TEST_F(FreshnessCheckerTest, ShuttingDown) { // Two nodes, we are node h1. Shutdown happens while we're scheduling remote commands. ReplicaSetConfig config = assertMakeRSConfig( BSON("_id" << "rs0" << "version" << 1 << "members" << BSON_ARRAY( BSON("_id" << 1 << "host" << "h0") << BSON("_id" << 2 << "host" << "h1")))); std::vector hosts; hosts.push_back(config.getMemberAt(1).getHostAndPort()); startTest( OpTime(0, 0), config, 0, hosts); _executor->shutdown(); waitOnChecker(); // This seems less than ideal, but if we are shutting down, the next phase of election // cannot proceed anyway. ASSERT_EQUALS(shouldAbortElection(),FreshnessChecker::None); } TEST_F(FreshnessCheckerTest, ElectNotElectingSelfWeAreNotFreshest) { // other responds as fresher than us startCapturingLogMessages(); ReplicaSetConfig config = assertMakeRSConfig( BSON("_id" << "rs0" << "version" << 1 << "members" << BSON_ARRAY( BSON("_id" << 1 << "host" << "h0") << BSON("_id" << 2 << "host" << "h1")))); std::vector hosts; hosts.push_back(config.getMemberAt(1).getHostAndPort()); const BSONObj freshRequest = makeFreshRequest(config, OpTime(10,0), 0); startTest(OpTime(10, 0), config, 0, hosts); const Date_t startDate = _net->now(); _net->enterNetwork(); for (size_t i = 0; i < hosts.size(); ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); ASSERT_EQUALS("admin", noi->getRequest().dbname); ASSERT_EQUALS(freshRequest, noi->getRequest().cmdObj); ASSERT_EQUALS(HostAndPort("h1"), noi->getRequest().target); _net->scheduleResponse( noi, startDate + 10, ResponseStatus(ReplicationExecutor::RemoteCommandResponse( BSON("ok" << 1 << "id" << 2 << "set" << "rs0" << "who" << "h1" << "cfgver" << 1 << "fresher" << true << "opTime" << Date_t(OpTime(0,0).asDate())), Milliseconds(8)))); } _net->runUntil(startDate + 10); _net->exitNetwork(); ASSERT_EQUALS(startDate + 10, _net->now()); waitOnChecker(); stopCapturingLogMessages(); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound); ASSERT_EQUALS(1, countLogLinesContaining("not electing self, we are not freshest")); } TEST_F(FreshnessCheckerTest, ElectNotElectingSelfWeAreNotFreshestOpTime) { // other responds with a later optime than ours startCapturingLogMessages(); ReplicaSetConfig config = assertMakeRSConfig( BSON("_id" << "rs0" << "version" << 1 << "members" << BSON_ARRAY( BSON("_id" << 1 << "host" << "h0") << BSON("_id" << 2 << "host" << "h1")))); std::vector hosts; hosts.push_back(config.getMemberAt(1).getHostAndPort()); const BSONObj freshRequest = makeFreshRequest(config, OpTime(0,0), 0); startTest(OpTime(0, 0), config, 0, hosts); const Date_t startDate = _net->now(); _net->enterNetwork(); for (size_t i = 0; i < hosts.size(); ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); ASSERT_EQUALS("admin", noi->getRequest().dbname); ASSERT_EQUALS(freshRequest, noi->getRequest().cmdObj); ASSERT_EQUALS(HostAndPort("h1"), noi->getRequest().target); _net->scheduleResponse( noi, startDate + 10, ResponseStatus(ReplicationExecutor::RemoteCommandResponse( BSON("ok" << 1 << "id" << 2 << "set" << "rs0" << "who" << "h1" << "cfgver" << 1 << "fresher" << true << "opTime" << Date_t(OpTime(10,0).asDate())), Milliseconds(8)))); } _net->runUntil(startDate + 10); _net->exitNetwork(); ASSERT_EQUALS(startDate + 10, _net->now()); waitOnChecker(); stopCapturingLogMessages(); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound); ASSERT_EQUALS(1, countLogLinesContaining("not electing self, we are not freshest")); } TEST_F(FreshnessCheckerTest, ElectWrongTypeInFreshnessResponse) { // other responds with "opTime" field of non-Date value, causing not freshest startCapturingLogMessages(); ReplicaSetConfig config = assertMakeRSConfig( BSON("_id" << "rs0" << "version" << 1 << "members" << BSON_ARRAY( BSON("_id" << 1 << "host" << "h0") << BSON("_id" << 2 << "host" << "h1")))); std::vector hosts; hosts.push_back(config.getMemberAt(1).getHostAndPort()); const BSONObj freshRequest = makeFreshRequest(config, OpTime(10,0), 0); startTest(OpTime(10, 0), config, 0, hosts); const Date_t startDate = _net->now(); _net->enterNetwork(); for (size_t i = 0; i < hosts.size(); ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); ASSERT_EQUALS("admin", noi->getRequest().dbname); ASSERT_EQUALS(freshRequest, noi->getRequest().cmdObj); ASSERT_EQUALS(HostAndPort("h1"), noi->getRequest().target); _net->scheduleResponse( noi, startDate + 10, ResponseStatus(ReplicationExecutor::RemoteCommandResponse( BSON("ok" << 1 << "id" << 2 << "set" << "rs0" << "who" << "h1" << "cfgver" << 1 << "opTime" << 3), Milliseconds(8)))); } _net->runUntil(startDate + 10); _net->exitNetwork(); ASSERT_EQUALS(startDate + 10, _net->now()); waitOnChecker(); stopCapturingLogMessages(); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound); ASSERT_EQUALS(1, countLogLinesContaining("wrong type for opTime argument in replSetFresh " "response: NumberInt32")); } TEST_F(FreshnessCheckerTest, ElectVetoed) { // other responds with veto startCapturingLogMessages(); ReplicaSetConfig config = assertMakeRSConfig( BSON("_id" << "rs0" << "version" << 1 << "members" << BSON_ARRAY( BSON("_id" << 1 << "host" << "h0") << BSON("_id" << 2 << "host" << "h1")))); std::vector hosts; hosts.push_back(config.getMemberAt(1).getHostAndPort()); const BSONObj freshRequest = makeFreshRequest(config, OpTime(10,0), 0); startTest(OpTime(10, 0), config, 0, hosts); const Date_t startDate = _net->now(); _net->enterNetwork(); for (size_t i = 0; i < hosts.size(); ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); ASSERT_EQUALS("admin", noi->getRequest().dbname); ASSERT_EQUALS(freshRequest, noi->getRequest().cmdObj); ASSERT_EQUALS(HostAndPort("h1"), noi->getRequest().target); _net->scheduleResponse( noi, startDate + 10, ResponseStatus(ReplicationExecutor::RemoteCommandResponse( BSON("ok" << 1 << "id" << 2 << "set" << "rs0" << "who" << "h1" << "cfgver" << 1 << "veto" << true << "errmsg" << "I'd rather you didn't" << "opTime" << Date_t(OpTime(0,0).asDate())), Milliseconds(8)))); } _net->runUntil(startDate + 10); _net->exitNetwork(); ASSERT_EQUALS(startDate + 10, _net->now()); waitOnChecker(); stopCapturingLogMessages(); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound); ASSERT_EQUALS(1, countLogLinesContaining("not electing self, h1:27017 would veto with '" "errmsg: \"I'd rather you didn't\"'")); } int findIdForMember(const ReplicaSetConfig& rsConfig, const HostAndPort& host) { const MemberConfig* member = rsConfig.findMemberByHostAndPort(host); ASSERT_TRUE(member != NULL) << "No host named " << host.toString() << " in config"; return member->getId(); } TEST_F(FreshnessCheckerTest, ElectNotElectingSelfWeAreNotFreshestManyNodes) { // one other responds as fresher than us startCapturingLogMessages(); ReplicaSetConfig config = assertMakeRSConfig( BSON("_id" << "rs0" << "version" << 1 << "members" << BSON_ARRAY( BSON("_id" << 1 << "host" << "h0") << BSON("_id" << 2 << "host" << "h1") << BSON("_id" << 3 << "host" << "h2") << BSON("_id" << 4 << "host" << "h3") << BSON("_id" << 5 << "host" << "h4")))); std::vector hosts; for (ReplicaSetConfig::MemberIterator mem = ++config.membersBegin(); mem != config.membersEnd(); ++mem) { hosts.push_back(mem->getHostAndPort()); } const BSONObj freshRequest = makeFreshRequest(config, OpTime(10,0), 0); startTest(OpTime(10, 0), config, 0, hosts); const Date_t startDate = _net->now(); unordered_set seen; _net->enterNetwork(); for (size_t i = 0; i < hosts.size(); ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); const HostAndPort target = noi->getRequest().target; ASSERT_EQUALS("admin", noi->getRequest().dbname); ASSERT_EQUALS(freshRequest, noi->getRequest().cmdObj); ASSERT(seen.insert(target).second) << "Already saw " << target; BSONObjBuilder responseBuilder; responseBuilder << "ok" << 1 << "id" << findIdForMember(config, target) << "set" << "rs0" << "who" << target.toString() << "cfgver" << 1 << "opTime" << Date_t(OpTime(0,0).asDate()); if (target.host() == "h1") { responseBuilder << "fresher" << true; } _net->scheduleResponse( noi, startDate + 10, ResponseStatus(ReplicationExecutor::RemoteCommandResponse( responseBuilder.obj(), Milliseconds(8)))); } _net->runUntil(startDate + 10); _net->exitNetwork(); ASSERT_EQUALS(startDate + 10, _net->now()); waitOnChecker(); stopCapturingLogMessages(); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound); ASSERT_EQUALS(1, countLogLinesContaining("not electing self, we are not freshest")); } // TODO(dannenberg) re-enable this test once we can control message order // TEST_F(FreshnessCheckerTest, ElectNotElectingSelfWeAreNotFreshestOpTimeManyNodes) { // // one other responds with a later optime than ours // startCapturingLogMessages(); // ReplicaSetConfig config = assertMakeRSConfig( // BSON("_id" << "rs0" << // "version" << 1 << // "members" << BSON_ARRAY( // BSON("_id" << 1 << "host" << "h0") << // BSON("_id" << 2 << "host" << "h1") << // BSON("_id" << 3 << "host" << "h2") << // BSON("_id" << 4 << "host" << "h3") << // BSON("_id" << 5 << "host" << "h4")))); // StatusWith evh = _executor->makeEvent(); // ASSERT_OK(evh.getStatus()); // Date_t now(0); // std::vector hosts; // for (ReplicaSetConfig::MemberIterator mem = config.membersBegin(); // mem != config.membersEnd(); // ++mem) { // hosts.push_back(mem->getHostAndPort()); // } // const BSONObj freshRequest = makeFreshRequest(config, OpTime(0,0), 0); // _net->addResponse(RemoteCommandRequest(HostAndPort("h1"), // "admin", // freshRequest), // StatusWith(BSON("ok" << 1 << // "id" << 2 << // "set" << "rs0" << // "who" << "h1" << // "cfgver" << 1 << // "fresher" << true << // "opTime" << Date_t(OpTime(10,0).asDate())))); // _net->addResponse(RemoteCommandRequest(HostAndPort("h2"), // "admin", // freshRequest), // StatusWith(BSON("ok" << 1 << // "id" << 3 << // "set" << "rs0" << // "who" << "h2" << // "cfgver" << 1 << // "opTime" << Date_t(OpTime(0,0).asDate())))); // _net->addResponse(RemoteCommandRequest(HostAndPort("h3"), // "admin", // freshRequest), // StatusWith(BSON("ok" << 1 << // "id" << 4 << // "set" << "rs0" << // "who" << "h3" << // "cfgver" << 1 << // "opTime" << Date_t(OpTime(0,0).asDate())))); // _net->addResponse(RemoteCommandRequest(HostAndPort("h4"), // "admin", // freshRequest), // StatusWith(BSON("ok" << 1 << // "id" << 5 << // "set" << "rs0" << // "who" << "h4" << // "cfgver" << 1 << // "opTime" << Date_t(OpTime(0,0).asDate())))); // FreshnessChecker checker; // StatusWith cbh = // _executor->scheduleWork( // stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner, // this, // stdx::placeholders::_1, // &checker, // evh.getValue(), // OpTime(0,0), // config, // 0, // hosts)); // ASSERT_OK(cbh.getStatus()); // _executor->wait(cbh.getValue()); // ASSERT_OK(_lastStatus); // _executor->waitForEvent(evh.getValue()); // stopCapturingLogMessages(); // bool weAreFreshest(true); // bool tied(true); // checker.getResults(&weAreFreshest, &tied); // ASSERT_FALSE(weAreFreshest); // ASSERT_FALSE(tied); // ASSERT_EQUALS(1, countLogLinesContaining("not electing self, we are not freshest")); // } TEST_F(FreshnessCheckerTest, ElectWrongTypeInFreshnessResponseManyNodes) { // one other responds with "opTime" field of non-Date value, causing not freshest startCapturingLogMessages(); ReplicaSetConfig config = assertMakeRSConfig( BSON("_id" << "rs0" << "version" << 1 << "members" << BSON_ARRAY( BSON("_id" << 1 << "host" << "h0") << BSON("_id" << 2 << "host" << "h1") << BSON("_id" << 3 << "host" << "h2") << BSON("_id" << 4 << "host" << "h3") << BSON("_id" << 5 << "host" << "h4")))); std::vector hosts; for (ReplicaSetConfig::MemberIterator mem = ++config.membersBegin(); mem != config.membersEnd(); ++mem) { hosts.push_back(mem->getHostAndPort()); } const BSONObj freshRequest = makeFreshRequest(config, OpTime(10,0), 0); startTest(OpTime(10, 0), config, 0, hosts); const Date_t startDate = _net->now(); unordered_set seen; _net->enterNetwork(); for (size_t i = 0; i < hosts.size(); ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); const HostAndPort target = noi->getRequest().target; ASSERT_EQUALS("admin", noi->getRequest().dbname); ASSERT_EQUALS(freshRequest, noi->getRequest().cmdObj); ASSERT(seen.insert(target).second) << "Already saw " << target; BSONObjBuilder responseBuilder; responseBuilder << "ok" << 1 << "id" << findIdForMember(config, target) << "set" << "rs0" << "who" << target.toString() << "cfgver" << 1; if (target.host() == "h1") { responseBuilder << "opTime" << 3; } else { responseBuilder << "opTime" << Date_t(OpTime(0,0).asDate()); } _net->scheduleResponse( noi, startDate + 10, ResponseStatus(ReplicationExecutor::RemoteCommandResponse( responseBuilder.obj(), Milliseconds(8)))); } _net->runUntil(startDate + 10); _net->exitNetwork(); ASSERT_EQUALS(startDate + 10, _net->now()); waitOnChecker(); stopCapturingLogMessages(); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound); ASSERT_EQUALS(1, countLogLinesContaining("wrong type for opTime argument in replSetFresh " "response: NumberInt32")); } TEST_F(FreshnessCheckerTest, ElectVetoedManyNodes) { // one other responds with veto startCapturingLogMessages(); ReplicaSetConfig config = assertMakeRSConfig( BSON("_id" << "rs0" << "version" << 1 << "members" << BSON_ARRAY( BSON("_id" << 1 << "host" << "h0") << BSON("_id" << 2 << "host" << "h1") << BSON("_id" << 3 << "host" << "h2") << BSON("_id" << 4 << "host" << "h3") << BSON("_id" << 5 << "host" << "h4")))); std::vector hosts; for (ReplicaSetConfig::MemberIterator mem = ++config.membersBegin(); mem != config.membersEnd(); ++mem) { hosts.push_back(mem->getHostAndPort()); } const BSONObj freshRequest = makeFreshRequest(config, OpTime(10,0), 0); startTest(OpTime(10, 0), config, 0, hosts); const Date_t startDate = _net->now(); unordered_set seen; _net->enterNetwork(); for (size_t i = 0; i < hosts.size(); ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); const HostAndPort target = noi->getRequest().target; ASSERT_EQUALS("admin", noi->getRequest().dbname); ASSERT_EQUALS(freshRequest, noi->getRequest().cmdObj); ASSERT(seen.insert(target).second) << "Already saw " << target; BSONObjBuilder responseBuilder; responseBuilder << "ok" << 1 << "id" << findIdForMember(config, target) << "set" << "rs0" << "who" << target.toString() << "cfgver" << 1 << "opTime" << Date_t(OpTime(0,0).asDate()); if (target.host() == "h1") { responseBuilder << "veto" << true << "errmsg" << "I'd rather you didn't"; } _net->scheduleResponse( noi, startDate + 10, ResponseStatus(ReplicationExecutor::RemoteCommandResponse( responseBuilder.obj(), Milliseconds(8)))); } _net->runUntil(startDate + 10); _net->exitNetwork(); ASSERT_EQUALS(startDate + 10, _net->now()); waitOnChecker(); stopCapturingLogMessages(); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound); ASSERT_EQUALS(1, countLogLinesContaining("not electing self, h1:27017 would veto with '" "errmsg: \"I'd rather you didn't\"'")); } // TODO(dannenberg) re-enable this test once we can control message order // TEST_F(FreshnessCheckerTest, ElectVetoedAndTiedFreshnessManyNodes) { // // one other responds with veto and another responds with tie // startCapturingLogMessages(); // ReplicaSetConfig config = assertMakeRSConfig( // BSON("_id" << "rs0" << // "version" << 1 << // "members" << BSON_ARRAY( // BSON("_id" << 1 << "host" << "h0") << // BSON("_id" << 2 << "host" << "h1") << // BSON("_id" << 3 << "host" << "h2") << // BSON("_id" << 4 << "host" << "h3") << // BSON("_id" << 5 << "host" << "h4")))); // StatusWith evh = _executor->makeEvent(); // ASSERT_OK(evh.getStatus()); // Date_t now(0); // std::vector hosts; // for (ReplicaSetConfig::MemberIterator mem = config.membersBegin(); // mem != config.membersEnd(); // ++mem) { // hosts.push_back(mem->getHostAndPort()); // } // const BSONObj freshRequest = makeFreshRequest(config, OpTime(10,0), 0); // _net->addResponse(RemoteCommandRequest(HostAndPort("h1"), // "admin", // freshRequest), // StatusWith(BSON("ok" << 1 << // "id" << 2 << // "set" << "rs0" << // "who" << "h1" << // "cfgver" << 1 << // "veto" << true << // "errmsg" << "I'd rather you didn't" << // "opTime" << Date_t(OpTime(0,0).asDate())))); // _net->addResponse(RemoteCommandRequest(HostAndPort("h2"), // "admin", // freshRequest), // StatusWith(BSON("ok" << 1 << // "id" << 3 << // "set" << "rs0" << // "who" << "h2" << // "cfgver" << 1 << // "opTime" << Date_t(OpTime(10,0).asDate())))); // _net->addResponse(RemoteCommandRequest(HostAndPort("h3"), // "admin", // freshRequest), // StatusWith(BSON("ok" << 1 << // "id" << 4 << // "set" << "rs0" << // "who" << "h3" << // "cfgver" << 1 << // "opTime" << Date_t(OpTime(0,0).asDate())))); // _net->addResponse(RemoteCommandRequest(HostAndPort("h4"), // "admin", // freshRequest), // StatusWith(BSON("ok" << 1 << // "id" << 5 << // "set" << "rs0" << // "who" << "h4" << // "cfgver" << 1 << // "opTime" << Date_t(OpTime(0,0).asDate())))); // FreshnessChecker checker; // StatusWith cbh = // _executor->scheduleWork( // stdx::bind(&FreshnessCheckerTest::freshnessCheckerRunner, // this, // stdx::placeholders::_1, // &checker, // evh.getValue(), // OpTime(10,0), // config, // 0, // hosts)); // ASSERT_OK(cbh.getStatus()); // _executor->wait(cbh.getValue()); // ASSERT_OK(_lastStatus); // _executor->waitForEvent(evh.getValue()); // stopCapturingLogMessages(); // bool weAreFreshest(true); // bool tied(true); // checker.getResults(&weAreFreshest, &tied); // ASSERT_FALSE(weAreFreshest); // ASSERT_FALSE(tied); // ASSERT_EQUALS(1, countLogLinesContaining("not electing self, h1:27017 would veto with '" // "errmsg: \"I'd rather you didn't\"'")); // } TEST_F(FreshnessCheckerTest, ElectManyNodesNotAllRespond) { ReplicaSetConfig config = assertMakeRSConfig( BSON("_id" << "rs0" << "version" << 1 << "members" << BSON_ARRAY( BSON("_id" << 1 << "host" << "h0") << BSON("_id" << 2 << "host" << "h1") << BSON("_id" << 3 << "host" << "h2") << BSON("_id" << 4 << "host" << "h3") << BSON("_id" << 5 << "host" << "h4")))); std::vector hosts; for (ReplicaSetConfig::MemberIterator mem = ++config.membersBegin(); mem != config.membersEnd(); ++mem) { hosts.push_back(mem->getHostAndPort()); } const OpTime lastOpTimeApplied(10,0); const BSONObj freshRequest = makeFreshRequest(config, lastOpTimeApplied, 0); startTest(OpTime(10, 0), config, 0, hosts); const Date_t startDate = _net->now(); unordered_set seen; _net->enterNetwork(); for (size_t i = 0; i < hosts.size(); ++i) { const NetworkInterfaceMock::NetworkOperationIterator noi = _net->getNextReadyRequest(); const HostAndPort target = noi->getRequest().target; ASSERT_EQUALS("admin", noi->getRequest().dbname); ASSERT_EQUALS(freshRequest, noi->getRequest().cmdObj); ASSERT(seen.insert(target).second) << "Already saw " << target; if (target.host() == "h2" || target.host() == "h3") { _net->scheduleResponse( noi, startDate + 10, ResponseStatus(ErrorCodes::NoSuchKey, "No response")); } else { BSONObjBuilder responseBuilder; responseBuilder << "ok" << 1 << "id" << findIdForMember(config, target) << "set" << "rs0" << "who" << target.toString() << "cfgver" << 1 << "opTime" << Date_t(OpTime(0,0).asDate()); _net->scheduleResponse( noi, startDate + 10, ResponseStatus(ReplicationExecutor::RemoteCommandResponse( responseBuilder.obj(), Milliseconds(8)))); } } _net->runUntil(startDate + 10); _net->exitNetwork(); ASSERT_EQUALS(startDate + 10, _net->now()); waitOnChecker(); ASSERT_EQUALS(shouldAbortElection(),FreshnessChecker::None); } class FreshnessScatterGatherTest : public mongo::unittest::Test { public: virtual void setUp() { int selfConfigIndex = 0; OpTime lastOpTimeApplied(100, 0); ReplicaSetConfig config; config.initialize(BSON("_id" << "rs0" << "version" << 1 << "members" << BSON_ARRAY( BSON("_id" << 0 << "host" << "host0") << BSON("_id" << 1 << "host" << "host1") << BSON("_id" << 2 << "host" << "host2")))); std::vector hosts; for (ReplicaSetConfig::MemberIterator mem = ++config.membersBegin(); mem != config.membersEnd(); ++mem) { hosts.push_back(mem->getHostAndPort()); } _checker.reset(new FreshnessChecker::Algorithm(lastOpTimeApplied, config, selfConfigIndex, hosts)); } virtual void tearDown() { _checker.reset(NULL); } protected: bool hasReceivedSufficientResponses() { return _checker->hasReceivedSufficientResponses(); } void processResponse(const RemoteCommandRequest& request, const ResponseStatus& response) { _checker->processResponse(request, response); } FreshnessChecker::ElectionAbortReason shouldAbortElection() const { return _checker->shouldAbortElection(); } ResponseStatus lessFresh() { BSONObjBuilder bb; bb.append("ok", 1.0); bb.appendDate("opTime", OpTime(10, 0).asDate()); return ResponseStatus(NetworkInterfaceMock::Response(bb.obj(), Milliseconds(10))); } ResponseStatus moreFreshViaOpTime() { BSONObjBuilder bb; bb.append("ok", 1.0); bb.appendDate("opTime", OpTime(110, 0).asDate()); return ResponseStatus(NetworkInterfaceMock::Response(bb.obj(), Milliseconds(10))); } ResponseStatus wrongTypeForOpTime() { BSONObjBuilder bb; bb.append("ok", 1.0); bb.append("opTime", std::string("several minutes ago")); return ResponseStatus(NetworkInterfaceMock::Response(bb.obj(), Milliseconds(10))); } ResponseStatus unauthorized() { BSONObjBuilder bb; bb.append("ok", 0.0); bb.append("code", ErrorCodes::Unauthorized); bb.append("errmsg", "Unauthorized"); return ResponseStatus(NetworkInterfaceMock::Response(bb.obj(), Milliseconds(10))); } ResponseStatus tiedForFreshness() { BSONObjBuilder bb; bb.append("ok", 1.0); bb.appendDate("opTime", OpTime(100, 0).asDate()); return ResponseStatus(NetworkInterfaceMock::Response(bb.obj(), Milliseconds(10))); } ResponseStatus moreFresh() { return ResponseStatus(NetworkInterfaceMock::Response(BSON("ok" << 1.0 << "fresher" << true), Milliseconds(10))); } ResponseStatus veto() { return ResponseStatus(NetworkInterfaceMock::Response(BSON("ok" << 1.0 << "veto" << true << "errmsg" << "vetoed!"), Milliseconds(10))); } RemoteCommandRequest requestFrom(std::string hostname) { return RemoteCommandRequest(HostAndPort(hostname), "", // the non-hostname fields do not matter in Freshness BSONObj(), Milliseconds(0)); } private: scoped_ptr _checker; }; TEST_F(FreshnessScatterGatherTest, BothNodesLessFresh) { ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host1"), lessFresh()); ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host2"), lessFresh()); ASSERT_TRUE(hasReceivedSufficientResponses()); ASSERT_EQUALS(shouldAbortElection(),FreshnessChecker::None); } TEST_F(FreshnessScatterGatherTest, FirstNodeFresher) { ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host1"), moreFresh()); ASSERT_TRUE(hasReceivedSufficientResponses()); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound); } TEST_F(FreshnessScatterGatherTest, FirstNodeFresherViaOpTime) { ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host1"), moreFreshViaOpTime()); ASSERT_TRUE(hasReceivedSufficientResponses()); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound); } TEST_F(FreshnessScatterGatherTest, FirstNodeVetoes) { ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host1"), veto()); ASSERT_TRUE(hasReceivedSufficientResponses()); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound); } TEST_F(FreshnessScatterGatherTest, FirstNodeWrongTypeForOpTime) { ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host1"), wrongTypeForOpTime()); ASSERT_TRUE(hasReceivedSufficientResponses()); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound); } TEST_F(FreshnessScatterGatherTest, FirstNodeTiedForFreshness) { ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host1"), tiedForFreshness()); ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host2"), lessFresh()); ASSERT_TRUE(hasReceivedSufficientResponses()); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FreshnessTie); } TEST_F(FreshnessScatterGatherTest, FirstNodeTiedAndSecondFresher) { ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host1"), tiedForFreshness()); ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host2"), moreFresh()); ASSERT_TRUE(hasReceivedSufficientResponses()); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound); } TEST_F(FreshnessScatterGatherTest, FirstNodeTiedAndSecondFresherViaOpTime) { ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host1"), tiedForFreshness()); ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host2"), moreFreshViaOpTime()); ASSERT_TRUE(hasReceivedSufficientResponses()); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound); } TEST_F(FreshnessScatterGatherTest, FirstNodeTiedAndSecondVetoes) { ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host1"), tiedForFreshness()); ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host2"), veto()); ASSERT_TRUE(hasReceivedSufficientResponses()); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound); } TEST_F(FreshnessScatterGatherTest, FirstNodeTiedAndSecondWrongTypeForOpTime) { ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host1"), tiedForFreshness()); ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host2"), wrongTypeForOpTime()); ASSERT_TRUE(hasReceivedSufficientResponses()); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound); } TEST_F(FreshnessScatterGatherTest, FirstNodeLessFreshAndSecondWrongTypeForOpTime) { ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host1"), lessFresh()); ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host2"), wrongTypeForOpTime()); ASSERT_TRUE(hasReceivedSufficientResponses()); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound); } TEST_F(FreshnessScatterGatherTest, SecondNodeTiedAndFirstWrongTypeForOpTime) { ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host2"), wrongTypeForOpTime()); ASSERT_TRUE(hasReceivedSufficientResponses()); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::FresherNodeFound); } TEST_F(FreshnessScatterGatherTest, NotEnoughVotersDueNetworkErrors) { ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host1"), ResponseStatus(Status(ErrorCodes::NetworkTimeout, ""))); ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host2"), ResponseStatus(Status(ErrorCodes::NetworkTimeout, ""))); ASSERT_TRUE(hasReceivedSufficientResponses()); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::QuorumUnreachable); } TEST_F(FreshnessScatterGatherTest, NotEnoughVotersDueToUnauthorized) { ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host1"), unauthorized()); ASSERT_FALSE(hasReceivedSufficientResponses()); processResponse(requestFrom("host2"), unauthorized()); ASSERT_TRUE(hasReceivedSufficientResponses()); ASSERT_EQUALS(shouldAbortElection(), FreshnessChecker::QuorumUnreachable); } } // namespace } // namespace repl } // namespace mongo