/** * Copyright (C) 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication #include "mongo/platform/basic.h" #include "mongo/db/repl/repl_coordinator_impl.h" #include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/db/repl/elect_cmd_runner.h" #include "mongo/db/repl/freshness_checker.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" namespace mongo { namespace repl { void ReplicationCoordinatorImpl::testElection() { // Make a new event for tracking this election attempt. StatusWith finishEvh = _replExecutor.makeEvent(); fassert(18680, finishEvh.getStatus()); StatusWith cbh = _replExecutor.scheduleWork( stdx::bind(&ReplicationCoordinatorImpl::_startElectSelf, this, stdx::placeholders::_1, finishEvh.getValue())); fassert(18672, cbh.getStatus()); _replExecutor.waitForEvent(finishEvh.getValue()); } void ReplicationCoordinatorImpl::_startElectSelf( const ReplicationExecutor::CallbackData& cbData, const ReplicationExecutor::EventHandle& finishEvh) { // Signal finish event upon early exit. ScopeGuard finishEvhGuard(MakeGuard(&ReplicationExecutor::signalEvent, cbData.executor, finishEvh)); if (cbData.status == ErrorCodes::CallbackCanceled) return; boost::unique_lock lk(_mutex); invariant(_rsConfig.getMemberAt(_thisMembersConfigIndex).isElectable()); OpTime lastOpTimeApplied(_getLastOpApplied_inlock()); if (lastOpTimeApplied == 0) { log() << "replSet info not trying to elect self, " "do not yet have a complete set of data from any point in time"; return; } if (_freshnessChecker) { // If an attempt to elect self is currently in progress, don't interrupt it. return; // Note that the old code, in addition to prohibiting multiple in-flight election // attempts, used to omit processing *any* incoming knowledge about // primaries in the cluster while an election was occurring. This seemed like // overkill, so it has been removed. } _freshnessChecker.reset(new FreshnessChecker); StatusWith nextPhaseEvh = _freshnessChecker->start( cbData.executor, lastOpTimeApplied, _rsConfig, _thisMembersConfigIndex, _topCoord->getMaybeUpHostAndPorts(), stdx::bind(&ReplicationCoordinatorImpl::_onFreshnessCheckComplete, this, finishEvh)); if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { return; } fassert(18681, nextPhaseEvh.getStatus()); finishEvhGuard.Dismiss(); } void ReplicationCoordinatorImpl::_onFreshnessCheckComplete( const ReplicationExecutor::EventHandle& finishEvh) { // Signal finish event upon early exit. ScopeGuard finishEvhGuard(MakeGuard(&ReplicationExecutor::signalEvent, &_replExecutor, finishEvh)); // Make sure to reset our state on all error exit paths ScopeGuard freshnessCheckerDeleter = MakeObjGuard(_freshnessChecker, &boost::scoped_ptr::reset, static_cast(NULL)); Date_t now(_replExecutor.now()); bool weAreFreshest; bool tied; _freshnessChecker->getResults(&weAreFreshest, &tied); // need to not sleep after last time sleeping, if (tied) { boost::unique_lock lk(_mutex); if ((_thisMembersConfigIndex != 0) && !_sleptLastElection) { long long ms = _replExecutor.nextRandomInt64(1000) + 50; log() << "replSet possible election tie; sleeping a little " << ms << "ms"; _topCoord->setStepDownTime(now + ms); _sleptLastElection = true; return; } _sleptLastElection = false; } if (!weAreFreshest) { log() << "not electing self, we are not freshest"; return; } log() << "replSet info electSelf"; // Secure our vote for ourself first if (!_topCoord->voteForMyself(now)) { return; } _electCmdRunner.reset(new ElectCmdRunner); StatusWith nextPhaseEvh = _electCmdRunner->start( &_replExecutor, _rsConfig, _thisMembersConfigIndex, _topCoord->getMaybeUpHostAndPorts(), stdx::bind(&ReplicationCoordinatorImpl::_onElectCmdRunnerComplete, this, finishEvh)); if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) { return; } fassert(18685, nextPhaseEvh.getStatus()); freshnessCheckerDeleter.Dismiss(); finishEvhGuard.Dismiss(); } void ReplicationCoordinatorImpl::_onElectCmdRunnerComplete( const ReplicationExecutor::EventHandle& finishEvh) { // Signal finish event and cleanup, upon function exit in all cases. ON_BLOCK_EXIT(&ReplicationExecutor::signalEvent, &_replExecutor, finishEvh); ON_BLOCK_EXIT_OBJ(_freshnessChecker, &boost::scoped_ptr::reset, static_cast(NULL)); ON_BLOCK_EXIT_OBJ(_electCmdRunner, &boost::scoped_ptr::reset, static_cast(NULL)); int receivedVotes = _electCmdRunner->getReceivedVotes(); if (receivedVotes < _rsConfig.getMajorityVoteCount()) { log() << "replSet couldn't elect self, only received " << receivedVotes << " votes, but needed at least " << _rsConfig.getMajorityVoteCount(); return; } if (_rsConfig.getConfigVersion() != _freshnessChecker->getOriginalConfigVersion()) { log() << "replSet config version changed during our election, ignoring result"; return; } log() << "replSet election succeeded, assuming primary role"; // // TODO: setElectionTime(getNextGlobalOptime()), ask Applier to pause, wait for // applier's signal that it's done flushing ops (signalDrainComplete) // and then _changememberstate to PRIMARY. } } }