diff options
author | Eric Milkie <milkie@10gen.com> | 2014-08-26 14:04:52 -0400 |
---|---|---|
committer | Eric Milkie <milkie@10gen.com> | 2014-08-29 15:34:34 -0400 |
commit | 4763ed8e758149413b214f3788cb053589ab76c5 (patch) | |
tree | ef66755045f1ebd17f51086a539bb6e6001592cf | |
parent | 966a1078a716694fd7e58892441546e7b1f5ce6b (diff) | |
download | mongo-4763ed8e758149413b214f3788cb053589ab76c5.tar.gz |
SERVER-15000 replcoordinator half of election proceedings
15 files changed, 586 insertions, 146 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 5d4d945b96f..2bb0d9ec808 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -35,8 +35,6 @@ env.CppUnitTest('replication_executor_test', env.Library('topology_coordinator_impl', ['topology_coordinator_impl.cpp', - 'topology_freshness_checker.cpp', - 'topology_elect_cmd_runner.cpp', 'member_heartbeat_data.cpp'], LIBDEPS=['replication_executor', 'repl_settings', @@ -47,23 +45,13 @@ env.CppUnitTest('topology_coordinator_impl_test', LIBDEPS=['topology_coordinator_impl', 'replica_set_messages']) -env.CppUnitTest('topology_freshness_checker_test', - 'topology_freshness_checker_test.cpp', - LIBDEPS=['topology_coordinator_impl', - 'replica_set_messages', - 'replmocks']) - -env.CppUnitTest('topology_elect_cmd_runner_test', - 'topology_elect_cmd_runner_test.cpp', - LIBDEPS=['topology_coordinator_impl', - 'replica_set_messages', - 'replmocks']) - - env.Library('repl_coordinator_impl', [ 'check_quorum_for_config_change.cpp', + 'elect_cmd_runner.cpp', + 'freshness_checker.cpp', 'repl_coordinator_impl.cpp', + 'repl_coordinator_impl_elect.cpp', 'repl_coordinator_impl_heartbeat.cpp', 'replica_set_config_checks.cpp', ], @@ -103,6 +91,24 @@ env.CppUnitTest('check_quorum_for_config_change_test', 'replmocks', ]) +env.CppUnitTest('freshness_checker_test', + 'freshness_checker_test.cpp', + LIBDEPS=['repl_coordinator_impl', + 'replica_set_messages', + 'replmocks']) + +env.CppUnitTest('elect_cmd_runner_test', + 'elect_cmd_runner_test.cpp', + LIBDEPS=['repl_coordinator_impl', + 'replica_set_messages', + 'replmocks']) + +env.CppUnitTest('repl_coordinator_impl_elect_test', + 'repl_coordinator_impl_elect_test.cpp', + LIBDEPS=['repl_coordinator_impl', + 'topology_coordinator_impl', + 'replmocks']) + env.Library('repl_coordinator_interface', ['repl_coordinator.cpp', 'repl_coordinator_external_state.cpp', diff --git a/src/mongo/db/repl/topology_elect_cmd_runner.cpp b/src/mongo/db/repl/elect_cmd_runner.cpp index dc9a6382b4e..d4901a445ba 100644 --- a/src/mongo/db/repl/topology_elect_cmd_runner.cpp +++ b/src/mongo/db/repl/elect_cmd_runner.cpp @@ -30,7 +30,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/repl/topology_elect_cmd_runner.h" +#include "mongo/db/repl/elect_cmd_runner.h" #include "mongo/base/status.h" #include "mongo/db/repl/member_heartbeat_data.h" @@ -50,7 +50,7 @@ namespace repl { const ReplicationExecutor::EventHandle& evh, const ReplicaSetConfig& currentConfig, int selfIndex, - const std::vector<MemberHeartbeatData>& hbdata) { + const std::vector<HostAndPort>& hosts) { _sufficientResponsesReceived = evh; @@ -66,28 +66,24 @@ namespace repl { "round" << OID::gen()); // Schedule a RemoteCommandRequest for each non-DOWN node - for (std::vector<MemberHeartbeatData>::const_iterator it = hbdata.begin(); - it != hbdata.end(); + for (std::vector<HostAndPort>::const_iterator it = hosts.begin(); + it != hosts.end(); ++it) { - if (it->getConfigIndex() == selfIndex) { - continue; // skip ourselves - } - if (!it->maybeUp()) { - continue; // skip DOWN nodes - } const StatusWith<ReplicationExecutor::CallbackHandle> cbh = executor->scheduleRemoteCommand( ReplicationExecutor::RemoteCommandRequest( - currentConfig.getMemberAt(it->getConfigIndex()).getHostAndPort(), + *it, "admin", replSetElectCmd, Milliseconds(30*1000)), // trying to match current Socket timeout stdx::bind(&ElectCmdRunner::_onReplSetElectResponse, this, stdx::placeholders::_1)); - if (!cbh.isOK()) { + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { return cbh.getStatus(); } + fassert(18683, cbh.getStatus()); + _responseCallbacks.push_back(cbh.getValue()); } diff --git a/src/mongo/db/repl/topology_elect_cmd_runner.h b/src/mongo/db/repl/elect_cmd_runner.h index 2a9615e8e70..4879da252dd 100644 --- a/src/mongo/db/repl/topology_elect_cmd_runner.h +++ b/src/mongo/db/repl/elect_cmd_runner.h @@ -58,7 +58,7 @@ namespace repl { const ReplicationExecutor::EventHandle& evh, const ReplicaSetConfig& currentConfig, int selfIndex, - const std::vector<MemberHeartbeatData>& hbdata); + const std::vector<HostAndPort>& hosts); /** * Returns the number of received votes. Only valid to call after diff --git a/src/mongo/db/repl/topology_elect_cmd_runner_test.cpp b/src/mongo/db/repl/elect_cmd_runner_test.cpp index 4ffe4a3e089..ad12ccb7952 100644 --- a/src/mongo/db/repl/topology_elect_cmd_runner_test.cpp +++ b/src/mongo/db/repl/elect_cmd_runner_test.cpp @@ -37,7 +37,7 @@ #include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_executor.h" -#include "mongo/db/repl/topology_elect_cmd_runner.h" +#include "mongo/db/repl/elect_cmd_runner.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" @@ -55,7 +55,7 @@ namespace { const ReplicationExecutor::EventHandle& evh, const ReplicaSetConfig& currentConfig, int selfIndex, - const std::vector<MemberHeartbeatData>& hbData); + const std::vector<HostAndPort>& hosts); protected: NetworkInterfaceMockWithMap* _net; boost::scoped_ptr<ReplicationExecutor> _executor; @@ -108,13 +108,13 @@ namespace { const ReplicationExecutor::EventHandle& evh, const ReplicaSetConfig& currentConfig, int selfIndex, - const std::vector<MemberHeartbeatData>& hbData) { + const std::vector<HostAndPort>& hosts) { invariant(data.status.isOK()); _lastStatus = electCmdRunner->start(data.executor, evh, currentConfig, selfIndex, - hbData); + hosts); } TEST_F(ElectCmdRunnerTest, OneNode) { @@ -129,10 +129,8 @@ namespace { ASSERT_OK(evh.getStatus()); Date_t now(0); - std::vector<MemberHeartbeatData> hbData; - MemberHeartbeatData h1Info(0); - h1Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0,0), OpTime(0,0), "", ""); - hbData.push_back(h1Info); + std::vector<HostAndPort> hosts; + hosts.push_back(config.getMemberAt(0).getHostAndPort()); ElectCmdRunner electCmdRunner; @@ -145,7 +143,7 @@ namespace { evh.getValue(), config, 0, - hbData)); + hosts)); ASSERT_OK(cbh.getStatus()); _executor->wait(cbh.getValue()); @@ -169,13 +167,9 @@ namespace { ASSERT_OK(evh.getStatus()); Date_t now(0); - std::vector<MemberHeartbeatData> hbData; - MemberHeartbeatData h0Info(0); - h0Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0,0), OpTime(0,0), "", ""); - hbData.push_back(h0Info); - MemberHeartbeatData h1Info(1); - h1Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0,0), OpTime(0,0), "", ""); - hbData.push_back(h1Info); + std::vector<HostAndPort> hosts; + hosts.push_back(config.getMemberAt(0).getHostAndPort()); + hosts.push_back(config.getMemberAt(1).getHostAndPort()); const BSONObj electRequest = makeElectRequest(config, 0); @@ -196,7 +190,7 @@ namespace { evh.getValue(), config, 0, - hbData)); + hosts)); ASSERT_OK(cbh.getStatus()); _executor->wait(cbh.getValue()); @@ -223,13 +217,9 @@ namespace { ASSERT_OK(evh.getStatus()); Date_t now(0); - std::vector<MemberHeartbeatData> hbData; - MemberHeartbeatData h0Info(0); - h0Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0,0), OpTime(0,0), "", ""); - hbData.push_back(h0Info); - MemberHeartbeatData h1Info(1); - h1Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0,0), OpTime(0,0), "", ""); - hbData.push_back(h1Info); + std::vector<HostAndPort> hosts; + hosts.push_back(config.getMemberAt(0).getHostAndPort()); + hosts.push_back(config.getMemberAt(1).getHostAndPort()); const BSONObj electRequest = makeElectRequest(config, 0); _net->addResponse(RemoteCommandRequest(HostAndPort("h1"), @@ -250,7 +240,7 @@ namespace { evh.getValue(), config, 0, - hbData)); + hosts)); ASSERT_OK(cbh.getStatus()); _executor->wait(cbh.getValue()); diff --git a/src/mongo/db/repl/topology_freshness_checker.cpp b/src/mongo/db/repl/freshness_checker.cpp index 71436cba1b4..01c85ec592b 100644 --- a/src/mongo/db/repl/topology_freshness_checker.cpp +++ b/src/mongo/db/repl/freshness_checker.cpp @@ -30,7 +30,7 @@ #include "mongo/platform/basic.h" -#include "mongo/db/repl/topology_freshness_checker.h" +#include "mongo/db/repl/freshness_checker.h" #include "mongo/base/status.h" #include "mongo/bson/optime.h" @@ -57,7 +57,7 @@ namespace repl { const OpTime& lastOpTimeApplied, const ReplicaSetConfig& currentConfig, int selfIndex, - const std::vector<MemberHeartbeatData>& hbdata) { + const std::vector<HostAndPort>& hosts) { _lastOpTimeApplied = lastOpTimeApplied; _freshest = true; @@ -73,28 +73,22 @@ namespace repl { .getHostAndPort().toString() << "cfgver" << currentConfig.getConfigVersion() << "id" << currentConfig.getMemberAt(selfIndex).getId()); - for (std::vector<MemberHeartbeatData>::const_iterator it = hbdata.begin(); - it != hbdata.end(); - ++it) { - if (it->getConfigIndex() == selfIndex) { - continue; // skip ourselves - } - if (!it->maybeUp()) { - continue; // skip DOWN nodes - } + for (std::vector<HostAndPort>::const_iterator it = hosts.begin(); it != hosts.end(); ++it) { const StatusWith<ReplicationExecutor::CallbackHandle> cbh = executor->scheduleRemoteCommand( ReplicationExecutor::RemoteCommandRequest( - currentConfig.getMemberAt(it->getConfigIndex()).getHostAndPort(), + *it, "admin", replSetFreshCmd, Milliseconds(30*1000)), // trying to match current Socket timeout stdx::bind(&FreshnessChecker::_onReplSetFreshResponse, this, stdx::placeholders::_1)); - if (!cbh.isOK()) { + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { return cbh.getStatus(); } + fassert(18682, cbh.getStatus()); + _responseCallbacks.push_back(cbh.getValue()); } diff --git a/src/mongo/db/repl/topology_freshness_checker.h b/src/mongo/db/repl/freshness_checker.h index 288b2eb20c2..a0863f81fdf 100644 --- a/src/mongo/db/repl/topology_freshness_checker.h +++ b/src/mongo/db/repl/freshness_checker.h @@ -63,7 +63,7 @@ namespace repl { const OpTime& lastOpTimeApplied, const ReplicaSetConfig& currentConfig, int selfIndex, - const std::vector<MemberHeartbeatData>& hbdata); + const std::vector<HostAndPort>& hosts); /** * Returns whether this node is the freshest of all non-DOWN nodes in the set, diff --git a/src/mongo/db/repl/topology_freshness_checker_test.cpp b/src/mongo/db/repl/freshness_checker_test.cpp index 72507a63e64..7ddfe4aa11f 100644 --- a/src/mongo/db/repl/topology_freshness_checker_test.cpp +++ b/src/mongo/db/repl/freshness_checker_test.cpp @@ -37,7 +37,7 @@ #include "mongo/db/repl/network_interface_mock.h" #include "mongo/db/repl/replica_set_config.h" #include "mongo/db/repl/replication_executor.h" -#include "mongo/db/repl/topology_freshness_checker.h" +#include "mongo/db/repl/freshness_checker.h" #include "mongo/stdx/functional.h" #include "mongo/unittest/unittest.h" @@ -56,7 +56,7 @@ namespace { const OpTime& lastOpTimeApplied, const ReplicaSetConfig& currentConfig, int selfIndex, - const std::vector<MemberHeartbeatData>& hbData); + const std::vector<HostAndPort>& hosts); protected: NetworkInterfaceMockWithMap* _net; boost::scoped_ptr<ReplicationExecutor> _executor; @@ -111,14 +111,14 @@ namespace { const OpTime& lastOpTimeApplied, const ReplicaSetConfig& currentConfig, int selfIndex, - const std::vector<MemberHeartbeatData>& hbData) { + const std::vector<HostAndPort>& hosts) { invariant(data.status.isOK()); _lastStatus = checker->start(data.executor, evh, lastOpTimeApplied, currentConfig, selfIndex, - hbData); + hosts); } TEST_F(FreshnessCheckerTest, OneNode) { @@ -133,10 +133,8 @@ namespace { ASSERT_OK(evh.getStatus()); Date_t now(0); - std::vector<MemberHeartbeatData> hbData; - MemberHeartbeatData h1Info(0); - h1Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0,0), OpTime(0,0), "", ""); - hbData.push_back(h1Info); + std::vector<HostAndPort> hosts; + hosts.push_back(config.getMemberAt(0).getHostAndPort()); FreshnessChecker checker; @@ -150,7 +148,7 @@ namespace { OpTime(0,0), config, 0, - hbData)); + hosts)); ASSERT_OK(cbh.getStatus()); _executor->wait(cbh.getValue()); @@ -179,13 +177,9 @@ namespace { ASSERT_OK(evh.getStatus()); Date_t now(0); - std::vector<MemberHeartbeatData> hbData; - MemberHeartbeatData h0Info(0); - h0Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0,0), OpTime(0,0), "", ""); - hbData.push_back(h0Info); - MemberHeartbeatData h1Info(1); - h1Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0,0), OpTime(0,0), "", ""); - hbData.push_back(h1Info); + std::vector<HostAndPort> hosts; + hosts.push_back(config.getMemberAt(0).getHostAndPort()); + hosts.push_back(config.getMemberAt(1).getHostAndPort()); const BSONObj freshRequest = makeFreshRequest(config, OpTime(0,0), 0); @@ -210,7 +204,7 @@ namespace { OpTime(0,0), config, 0, - hbData)); + hosts)); ASSERT_OK(cbh.getStatus()); _executor->wait(cbh.getValue()); @@ -240,13 +234,9 @@ namespace { ASSERT_OK(evh.getStatus()); Date_t now(0); - std::vector<MemberHeartbeatData> hbData; - MemberHeartbeatData h0Info(0); - h0Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0,0), OpTime(0,0), "", ""); - hbData.push_back(h0Info); - MemberHeartbeatData h1Info(1); - h1Info.setUpValues(now, MemberState::RS_SECONDARY, OpTime(0,0), OpTime(0,0), "", ""); - hbData.push_back(h1Info); + std::vector<HostAndPort> hosts; + hosts.push_back(config.getMemberAt(0).getHostAndPort()); + hosts.push_back(config.getMemberAt(1).getHostAndPort()); const BSONObj freshRequest = makeFreshRequest(config, OpTime(0,0), 0); _net->addResponse(RemoteCommandRequest(HostAndPort("h1"), @@ -271,7 +261,7 @@ namespace { OpTime(0,0), config, 0, - hbData)); + hosts)); ASSERT_OK(cbh.getStatus()); _executor->wait(cbh.getValue()); diff --git a/src/mongo/db/repl/repl_coordinator.h b/src/mongo/db/repl/repl_coordinator.h index eba60a4309e..3425f5e4c5d 100644 --- a/src/mongo/db/repl/repl_coordinator.h +++ b/src/mongo/db/repl/repl_coordinator.h @@ -387,7 +387,7 @@ namespace repl { */ struct ReplSetElectArgs { StringData set; // Name of the replset - unsigned whoid; // replSet id of the member that sent the replSetFresh command + int whoid; // replSet id of the member that sent the replSetFresh command int cfgver; // replSet config version that the member who sent the command thinks it has OID round; // unique ID for this election }; diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp index ea9902aefd8..9ef7242d1d2 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl.cpp @@ -114,7 +114,8 @@ namespace { _inShutdown(false), _rsConfigState(kConfigStartingUp), _thisMembersConfigIndex(-1), - _random(prngSeed) { + _random(prngSeed), + _sleptLastElection(false) { if (!isReplEnabled()) { return; diff --git a/src/mongo/db/repl/repl_coordinator_impl.h b/src/mongo/db/repl/repl_coordinator_impl.h index d22e84d63c4..a8578e83cbf 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.h +++ b/src/mongo/db/repl/repl_coordinator_impl.h @@ -37,6 +37,8 @@ #include "mongo/base/status.h" #include "mongo/bson/optime.h" #include "mongo/db/global_environment_experiment.h" +#include "mongo/db/repl/freshness_checker.h" +#include "mongo/db/repl/elect_cmd_runner.h" #include "mongo/db/repl/member_state.h" #include "mongo/db/repl/repl_coordinator.h" #include "mongo/db/repl/repl_coordinator_external_state.h" @@ -222,6 +224,12 @@ namespace repl { */ void waitForStartUpComplete(); + /** + * Used by testing code to run election proceedings, in leiu of a better + * method to acheive this. + */ + void testElection(); + private: /** @@ -386,6 +394,32 @@ namespace repl { */ void _setConfigState_inlock(ConfigState newState); + /** + * Begins an attempt to elect this node. + * Called after an incoming heartbeat changes this node's view of the set such that it + * believes it can be elected PRIMARY. + * For proper concurrency, must be called via a ReplicationExecutor callback. + * finishEvh is an event that is signaled when election is done, regardless of success. + **/ + void _startElectSelf(const ReplicationExecutor::CallbackData& cbData, + const ReplicationExecutor::EventHandle& finishEvh); + + /** + * Callback called when the FreshnessChecker has completed; checks the results and + * decides whether to continue election proceedings. + * finishEvh is an event that is signaled when election is complete. + **/ + void _onFreshnessCheckComplete(const ReplicationExecutor::CallbackData& cbData, + const ReplicationExecutor::EventHandle& finishEvh); + + /** + * Callback called when the ElectCmdRunner has completed; checks the results and + * decides whether to complete the election and change state to primary. + * finishEvh is an event that is signaled when election is complete. + **/ + void _onElectCmdRunnerComplete(const ReplicationExecutor::CallbackData& cbData, + const ReplicationExecutor::EventHandle& finishEvh); + // Handles to actively queued heartbeats. // Only accessed serially in ReplicationExecutor callbacks, which makes it safe to access // outside of _mutex. @@ -461,6 +495,14 @@ namespace repl { // PRNG; seeded at class construction time. PseudoRandom _random; + // Used for conducting an election of this node; + // the presence of a non-null _freshnessChecker pointer indicates that an election is + // currently in progress. Only one election is allowed at once. + boost::scoped_ptr<FreshnessChecker> _freshnessChecker; + boost::scoped_ptr<ElectCmdRunner> _electCmdRunner; + + // Whether we slept last time we attempted an election but possibly tied with other nodes. + bool _sleptLastElection; }; } // namespace repl diff --git a/src/mongo/db/repl/repl_coordinator_impl_elect.cpp b/src/mongo/db/repl/repl_coordinator_impl_elect.cpp new file mode 100644 index 00000000000..83b3b29340e --- /dev/null +++ b/src/mongo/db/repl/repl_coordinator_impl_elect.cpp @@ -0,0 +1,245 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/repl_coordinator_impl.h" +#include "mongo/db/repl/topology_coordinator_impl.h" +#include "mongo/db/repl/elect_cmd_runner.h" +#include "mongo/db/repl/freshness_checker.h" +#include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { +namespace repl { + + void ReplicationCoordinatorImpl::testElection() { + // Make a new event for tracking this election attempt. + StatusWith<ReplicationExecutor::EventHandle> finishEvh = _replExecutor.makeEvent(); + fassert(18680, finishEvh.getStatus()); + + StatusWith<ReplicationExecutor::CallbackHandle> cbh = _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_startElectSelf, + this, + stdx::placeholders::_1, + finishEvh.getValue())); + fassert(18672, cbh.getStatus()); + _replExecutor.waitForEvent(finishEvh.getValue()); + } + + void ReplicationCoordinatorImpl::_startElectSelf( + const ReplicationExecutor::CallbackData& cbData, + const ReplicationExecutor::EventHandle& finishEvh) { + // Signal finish event upon early exit. + ScopeGuard finishEvhGuard(MakeGuard(&ReplicationExecutor::signalEvent, + cbData.executor, + finishEvh)); + + if (cbData.status == ErrorCodes::CallbackCanceled) + return; + + boost::unique_lock<boost::mutex> lk(_mutex); + + invariant(_rsConfig.getMemberAt(_thisMembersConfigIndex).isElectable()); + OpTime lastOpTimeApplied(_getLastOpApplied_inlock()); + + if (lastOpTimeApplied == 0) { + log() << "replSet info not trying to elect self, " + "do not yet have a complete set of data from any point in time"; + return; + } + + if (_freshnessChecker) { + // If an attempt to elect self is currently in progress, don't interrupt it. + return; + // Note that the old code, in addition to prohibiting multiple in-flight election + // attempts, used to omit processing *any* incoming knowledge about + // primaries in the cluster while an election was occurring. This seemed like + // overkill, so it has been removed. + } + + // Make an event for our internal use to help synchronize the next phase of election. + StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = cbData.executor->makeEvent(); + if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { + return; + } + fassert(18681, nextPhaseEvh.getStatus()); + + _freshnessChecker.reset(new FreshnessChecker); + StatusWith<ReplicationExecutor::CallbackHandle> finishCheckCallback = + cbData.executor->onEvent( + nextPhaseEvh.getValue(), + stdx::bind(&ReplicationCoordinatorImpl::_onFreshnessCheckComplete, + this, + stdx::placeholders::_1, + finishEvh)); + if (finishCheckCallback.getStatus() == ErrorCodes::ShutdownInProgress) { + return; + } + fassert(18670, finishCheckCallback.getStatus()); + + Status status = _freshnessChecker->start(cbData.executor, + nextPhaseEvh.getValue(), + lastOpTimeApplied, + _rsConfig, + _thisMembersConfigIndex, + _topCoord->getMaybeUpHostAndPorts()); + if (status == ErrorCodes::ShutdownInProgress) { + return; + } + fassert(18688, status); + + finishEvhGuard.Dismiss(); + } + + + void ReplicationCoordinatorImpl::_onFreshnessCheckComplete( + const ReplicationExecutor::CallbackData& cbData, + const ReplicationExecutor::EventHandle& finishEvh) { + + // Signal finish event upon early exit. + ScopeGuard finishEvhGuard(MakeGuard(&ReplicationExecutor::signalEvent, + cbData.executor, + finishEvh)); + + // Make sure to reset our state on all error exit paths + ScopeGuard freshnessCheckerDeleter = + MakeObjGuard(_freshnessChecker, + &boost::scoped_ptr<FreshnessChecker>::reset, + static_cast<FreshnessChecker*>(NULL)); + + if (cbData.status == ErrorCodes::CallbackCanceled) { + return; + } + + Date_t now(cbData.executor->now()); + bool weAreFreshest; + bool tied; + _freshnessChecker->getResults(&weAreFreshest, &tied); + + // need to not sleep after last time sleeping, + if (tied) { + boost::unique_lock<boost::mutex> lk(_mutex); + if ((_thisMembersConfigIndex != 0) && !_sleptLastElection) { + long long ms = _random.nextInt64(1000) + 50; + log() << "replSet possible election tie; sleeping a little " << ms << "ms"; + _topCoord->setStepDownTime(now + ms); + _sleptLastElection = true; + return; + } + _sleptLastElection = false; + } + + if (!weAreFreshest) { + log() << "not electing self, we are not freshest"; + return; + } + + log() << "replSet info electSelf"; + + // Secure our vote for ourself first + if (!_topCoord->voteForMyself(now)) { + return; + } + + StatusWith<ReplicationExecutor::EventHandle> nextPhaseEvh = cbData.executor->makeEvent(); + if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { + return; + } + fassert(18685, nextPhaseEvh.getStatus()); + + + _electCmdRunner.reset(new ElectCmdRunner); + StatusWith<ReplicationExecutor::CallbackHandle> finishCheckCallback = + cbData.executor->onEvent( + nextPhaseEvh.getValue(), + stdx::bind(&ReplicationCoordinatorImpl::_onElectCmdRunnerComplete, + this, + stdx::placeholders::_1, + finishEvh)); + if (finishCheckCallback.getStatus() == ErrorCodes::ShutdownInProgress) { + return; + } + fassert(18671, finishCheckCallback.getStatus()); + + Status electionCompleteStatus = _electCmdRunner->start(cbData.executor, + nextPhaseEvh.getValue(), + _rsConfig, + _thisMembersConfigIndex, + _topCoord->getMaybeUpHostAndPorts()); + if (electionCompleteStatus == ErrorCodes::ShutdownInProgress) { + return; + } + fassert(18686, electionCompleteStatus); + + freshnessCheckerDeleter.Dismiss(); + finishEvhGuard.Dismiss(); + } + + void ReplicationCoordinatorImpl::_onElectCmdRunnerComplete( + const ReplicationExecutor::CallbackData& cbData, + const ReplicationExecutor::EventHandle& finishEvh) { + + // Signal finish event and cleanup, upon function exit in all cases. + ON_BLOCK_EXIT(&ReplicationExecutor::signalEvent, cbData.executor, finishEvh); + ON_BLOCK_EXIT_OBJ(_freshnessChecker, + &boost::scoped_ptr<FreshnessChecker>::reset, + static_cast<FreshnessChecker*>(NULL)); + ON_BLOCK_EXIT_OBJ(_electCmdRunner, + &boost::scoped_ptr<ElectCmdRunner>::reset, + static_cast<ElectCmdRunner*>(NULL)); + + if (cbData.status == ErrorCodes::CallbackCanceled) { + return; + } + + int receivedVotes = _electCmdRunner->getReceivedVotes(); + + if (receivedVotes * 2 <= _rsConfig.getMajorityVoteCount()) { + log() << "replSet couldn't elect self, only received " << receivedVotes << " votes"; + return; + } + + if (_rsConfig.getConfigVersion() != _freshnessChecker->getOriginalConfigVersion()) { + log() << "replSet config version changed during our election, ignoring result"; + return; + } + + log() << "replSet election succeeded, assuming primary role"; + + // + // TODO: setElectionTime(getNextGlobalOptime()), ask Applier to pause, wait for + // applier's signal that it's done flushing ops (signalDrainComplete) + // and then _changememberstate to PRIMARY. + + } + +} +} diff --git a/src/mongo/db/repl/repl_coordinator_impl_elect_test.cpp b/src/mongo/db/repl/repl_coordinator_impl_elect_test.cpp new file mode 100644 index 00000000000..aa3703183c2 --- /dev/null +++ b/src/mongo/db/repl/repl_coordinator_impl_elect_test.cpp @@ -0,0 +1,147 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/operation_context_noop.h" +#include "mongo/db/repl/network_interface_mock.h" +#include "mongo/db/repl/repl_coordinator_external_state_mock.h" +#include "mongo/db/repl/repl_coordinator_impl.h" +#include "mongo/db/repl/replica_set_config.h" +#include "mongo/db/repl/topology_coordinator_impl.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace repl { +namespace { + + bool stringContains(const std::string &haystack, const std::string& needle) { + return haystack.find(needle) != std::string::npos; + } + + class ReplCoordElectTest : public mongo::unittest::Test { + public: + virtual void setUp() { + _settings.replSet = "mySet/node1:12345,node2:54321"; + } + + virtual void tearDown() { + _repl->shutdown(); + } + + protected: + NetworkInterfaceMockWithMap* getNet() { return _net; } + ReplicationCoordinatorImpl* getReplCoord() { return _repl.get(); } + + void init() { + invariant(!_repl); + + // PRNG seed for tests. + const int64_t seed = 0; + _externalState = new ReplicationCoordinatorExternalStateMock; + _net = new NetworkInterfaceMockWithMap; + _repl.reset(new ReplicationCoordinatorImpl(_settings, + _externalState, + _net, + new TopologyCoordinatorImpl(Seconds(999)), + seed)); + } + + void assertReplStart(const BSONObj& configDoc, const HostAndPort& selfHost) { + init(); + _externalState->setLocalConfigDocument(StatusWith<BSONObj>(configDoc)); + _externalState->addSelf(selfHost); + OperationContextNoop txn; + _repl->startReplication(&txn); + _repl->waitForStartUpComplete(); + + ASSERT_EQUALS(ReplicationCoordinator::modeReplSet, + getReplCoord()->getReplicationMode()); + } + + int64_t countLogLinesContaining(const std::string& needle) { + return std::count_if(getCapturedLogMessages().begin(), + getCapturedLogMessages().end(), + stdx::bind(stringContains, + stdx::placeholders::_1, + needle)); + } + + private: + boost::scoped_ptr<ReplicationCoordinatorImpl> _repl; + // Owned by ReplicationCoordinatorImpl + ReplicationCoordinatorExternalStateMock* _externalState; + // Owned by ReplicationCoordinatorImpl + NetworkInterfaceMockWithMap* _net; + ReplSettings _settings; + }; + + TEST_F(ReplCoordElectTest, ElectTooSoon) { + // Election fails because we haven't set a lastOpTimeApplied value yet, via a heartbeat. + startCapturingLogMessages(); + assertReplStart( + BSON("_id" << "mySet" << + "version" << 1 << + "members" << BSON_ARRAY(BSON("_id" << 1 << "host" << "node1:12345"))), + HostAndPort("node1", 12345)); + getReplCoord()->testElection(); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("do not yet have a complete set of data")); + } + + TEST_F(ReplCoordElectTest, Elect1NodeSuccess) { + startCapturingLogMessages(); + assertReplStart( + BSON("_id" << "mySet" << + "version" << 1 << + "members" << BSON_ARRAY(BSON("_id" << 1 << "host" << "node1:12345"))), + HostAndPort("node1", 12345)); + + OperationContextNoop txn; + OID selfRID = getReplCoord()->getMyRID(); + OpTime time1(1, 1); + getReplCoord()->setLastOptime(&txn, selfRID, time1); + + getReplCoord()->testElection(); + stopCapturingLogMessages(); + ASSERT_EQUALS(1, countLogLinesContaining("election succeeded")); + } + +/* + TEST_F(ReplCoordElectTest, ElectManyNodesSuccess) {} + TEST_F(ReplCoordElectTest, ShutdownExecutor) {} + + // Uses PRNG + TEST_F(ReplCoordElectTest, ElectTiedWithAnother) {} +*/ + +} +} +} diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index eeb28c97813..09b74531626 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -151,6 +151,17 @@ namespace repl { // Record a "ping" based on the round-trip time of the heartbeat for the member virtual void recordPing(const HostAndPort& host, const Milliseconds elapsedMillis) = 0; + // Retrieves a vector of HostAndPorts containing only nodes that are not DOWN + // and are not ourselves. + virtual std::vector<HostAndPort> getMaybeUpHostAndPorts() const = 0; + + // If we can vote for ourselves, updates the lastVote tracker and returns true. + // If we cannot vote for ourselves (because we already voted too recently), returns false. + virtual bool voteForMyself(Date_t now) = 0; + + // Sets _stepDownTime to newTime. + virtual void setStepDownTime(Date_t newTime) = 0; + protected: TopologyCoordinator() {} }; diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index 75deca98d05..d2f5fb8b0ff 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -43,11 +43,14 @@ #include "mongo/db/server_parameters.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" +#include "mongo/util/scopeguard.h" namespace mongo { namespace repl { + const Seconds TopologyCoordinatorImpl::LastVote::leaseTime = Seconds(3); + PingStats::PingStats() : count(0), value(std::numeric_limits<unsigned int>::max()) { } @@ -63,7 +66,6 @@ namespace repl { _syncSourceIndex(-1), _forceSyncSourceIndex(-1), _maxSyncSourceLagSecs(maxSyncSourceLagSecs), - _busyWithElectSelf(false), _selfIndex(0), _blockSync(false), _maintenanceModeCalls(0) @@ -252,29 +254,6 @@ namespace repl { // XXX Eric: what to do here? } - // election entry point - void TopologyCoordinatorImpl::_electSelf(Date_t now) { - verify( !_selfConfig().isArbiter() ); - verify( _selfConfig().getSlaveDelay() == Seconds(0) ); -/* - try { - // XXX Eric - // _electSelf(now); - } - catch (RetryAfterSleepException&) { - throw; - } - catch (VoteException& ) { // Eric: XXX - log() << "replSet not trying to elect self as responded yea to someone else recently"; - } - catch (const DBException& e) { - log() << "replSet warning caught unexpected exception in electSelf() " << e.toString(); - } - catch (...) { - log() << "replSet warning caught unexpected exception in electSelf()"; - } -*/ - } void TopologyCoordinatorImpl::prepareSyncFromResponse( const ReplicationExecutor::CallbackData& data, @@ -463,10 +442,6 @@ namespace repl { return false; } - namespace { - const Seconds LeaseTime(3); - } // namespace - // produce a reply to a received electCmd void TopologyCoordinatorImpl::prepareElectResponse( const ReplicationExecutor::CallbackData& data, @@ -519,20 +494,20 @@ namespace repl { << highestPriority->getHostAndPort().toString(); vote = -10000; } - else if (_lastVote.when > 0 && - _lastVote.when.millis + LeaseTime.total_milliseconds() >= now.millis && - _lastVote.whoID != args.whoid) { - log() << "replSetElect voting no for " + else if (_lastVote.when.millis > 0 && + _lastVote.when.millis + LastVote::leaseTime.total_milliseconds() >= now.millis && + _lastVote.whoId != args.whoid) { + log() << "replSet voting no for " << hopeful->getHostAndPort().toString() << "; voted for " << _lastVote.whoHostAndPort.toString() << ' ' << (now.millis - _lastVote.when.millis) / 1000 << " secs ago"; } else { _lastVote.when = now; - _lastVote.whoID = args.whoid; + _lastVote.whoId = args.whoid; _lastVote.whoHostAndPort = hopeful->getHostAndPort(); vote = _selfConfig().getNumVotes(); - invariant(hopeful->getId() == static_cast<int>(args.whoid)); + invariant(hopeful->getId() == args.whoid); log() << "replSetElect voting yea for " << hopeful->getHostAndPort().toString() << " (" << args.whoid << ')'; } @@ -662,9 +637,6 @@ namespace repl { } } - // Don't bother to make any changes if we are an election candidate - if (_busyWithElectSelf) return ReplSetHeartbeatResponse::NoAction; - // check if we should ask the primary (possibly ourselves) to step down int highestPriorityIndex = _getHighestPriorityElectableIndex(); @@ -830,9 +802,6 @@ namespace repl { switch (myUnelectableReason){ case None: // All checks passed, become a candidate and start election proceedings. - - // don't try to do further elections & such while we are already working on one. - _busyWithElectSelf = true; return ReplSetHeartbeatResponse::StartElection; default: return ReplSetHeartbeatResponse::NoAction; @@ -1082,6 +1051,7 @@ namespace repl { */ *result = Status::OK(); } + void TopologyCoordinatorImpl::prepareFreezeResponse( const ReplicationExecutor::CallbackData& data, Date_t now, @@ -1103,7 +1073,7 @@ namespace repl { response->append("warning", "you really want to freeze for only 1 second?"); if (_memberState != MemberState::RS_PRIMARY) { - _stepDownUntil = now + secs; + setStepDownTime(now + (secs * 1000)); log() << "replSet info 'freezing' for " << secs << " seconds"; } else { @@ -1113,6 +1083,11 @@ namespace repl { *result = Status::OK(); } + void TopologyCoordinatorImpl::setStepDownTime(Date_t newTime) { + invariant(newTime > _stepDownUntil); + _stepDownUntil = newTime; + } + // This function installs a new config object and recreates MemberHeartbeatData objects // that reflect the new config. void TopologyCoordinatorImpl::updateConfig(const ReplicaSetConfig& newConfig, @@ -1270,5 +1245,36 @@ namespace repl { return totalPings; } + std::vector<HostAndPort> TopologyCoordinatorImpl::getMaybeUpHostAndPorts() const { + std::vector<HostAndPort> upHosts; + for (std::vector<MemberHeartbeatData>::const_iterator it = _hbdata.begin(); + it != _hbdata.end(); + ++it) { + if (it->getConfigIndex() == _selfIndex) { + continue; // skip ourselves + } + if (!it->maybeUp()) { + continue; // skip DOWN nodes + } + + upHosts.push_back(_currentConfig.getMemberAt(it->getConfigIndex()).getHostAndPort()); + } + return upHosts; + } + + bool TopologyCoordinatorImpl::voteForMyself(Date_t now) { + int selfId = _currentConfig.getMemberAt(_selfIndex).getId(); + if ((_lastVote.when + LastVote::leaseTime.total_milliseconds() >= now) + && (_lastVote.whoId != selfId)) { + log() << "replSet not voting yea for " << selfId << + " voted for " << _lastVote.whoHostAndPort.toString() << ' ' << + (now - _lastVote.when) / 1000 << " secs ago"; + return false; + } + _lastVote.when = now; + _lastVote.whoId = selfId; + return true; + } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index 884f12a1867..0b89c54f543 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -70,7 +70,12 @@ namespace repl { class TopologyCoordinatorImpl : public TopologyCoordinator { public: - explicit TopologyCoordinatorImpl(Seconds maxSyncSourceLagSecs); + /** + * Constructs a Topology Coordinator object. + * @param maxSyncSourceLagSecs a sync source is re-evaluated after it lags behind further + * than this amount. + **/ + TopologyCoordinatorImpl(Seconds maxSyncSourceLagSecs); // TODO(spencer): Can this be made private? virtual void setForceSyncSourceIndex(int index); @@ -170,6 +175,17 @@ namespace repl { // TODO(spencer): Remove this once we can easily call for an election in unit tests to // set the current primary. void _setCurrentPrimaryForTest(int primaryIndex); + + // Retrieves a vector of HostAndPorts containing only nodes that are not DOWN + // and are not ourselves. + virtual std::vector<HostAndPort> getMaybeUpHostAndPorts() const; + + // If the lastVote tracker allows the current node to vote for itself, updates the + // lastVote tracker and returns true. Otherwise, returns false. + virtual bool voteForMyself(Date_t now); + + // Sets _stepDownTime to newTime. newTime must be strictly later than _stepDownTime. + virtual void setStepDownTime(Date_t newTime); private: @@ -224,12 +240,9 @@ namespace repl { // Scans through all members that are 'up' and return the latest known optime OpTime _latestKnownOpTime() const; - // Begins election proceedings - void _electSelf(Date_t now); - // Scans the electable set and returns the highest priority member index int _getHighestPriorityElectableIndex() const; - + // Returns true if "one" member is higher priority than "two" member bool _isMemberHigherPriority(int memberOneIndex, int memberTwoIndex) const; @@ -274,9 +287,6 @@ namespace repl { return _hbmsg; } - // Flag to prevent re-entering election code - bool _busyWithElectSelf; - int _selfIndex; // this node's index in _members and _currentConfig ReplicaSetConfig _currentConfig; // The current config, including a vector of MemberConfigs @@ -330,13 +340,15 @@ namespace repl { // Last vote info from the election struct LastVote { - LastVote() : when(0), whoID(0xffffffff) { } + + static const Seconds leaseTime; + + LastVote() : when(0), whoId(-1) { } Date_t when; - unsigned whoID; + int whoId; HostAndPort whoHostAndPort; } _lastVote; - }; } // namespace repl |