/**
* Copyright (C) 2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
#include "mongo/platform/basic.h"
#include "mongo/base/disallow_copying.h"
#include "mongo/db/repl/elect_cmd_runner.h"
#include "mongo/db/repl/freshness_checker.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
namespace repl {
namespace {
class LoseElectionGuard {
MONGO_DISALLOW_COPYING(LoseElectionGuard);
public:
LoseElectionGuard(TopologyCoordinator* topCoord,
executor::TaskExecutor* executor,
std::unique_ptr* freshnessChecker,
std::unique_ptr* electCmdRunner,
executor::TaskExecutor::EventHandle* electionFinishedEvent)
: _topCoord(topCoord),
_executor(executor),
_freshnessChecker(freshnessChecker),
_electCmdRunner(electCmdRunner),
_electionFinishedEvent(electionFinishedEvent),
_dismissed(false) {}
~LoseElectionGuard() {
if (_dismissed) {
return;
}
_topCoord->processLoseElection();
_freshnessChecker->reset(NULL);
_electCmdRunner->reset(NULL);
if (_electionFinishedEvent->isValid()) {
_executor->signalEvent(*_electionFinishedEvent);
}
}
void dismiss() {
_dismissed = true;
}
private:
TopologyCoordinator* const _topCoord;
executor::TaskExecutor* const _executor;
std::unique_ptr* const _freshnessChecker;
std::unique_ptr* const _electCmdRunner;
const executor::TaskExecutor::EventHandle* _electionFinishedEvent;
bool _dismissed;
};
} // namespace
void ReplicationCoordinatorImpl::_startElectSelf_inlock() {
invariant(!_freshnessChecker);
invariant(!_electCmdRunner);
switch (_rsConfigState) {
case kConfigSteady:
break;
case kConfigInitiating:
case kConfigReconfiguring:
case kConfigHBReconfiguring:
LOG(2) << "Not standing for election; processing a configuration change";
// Transition out of candidate role.
_topCoord->processLoseElection();
return;
default:
severe() << "Entered replica set election code while in illegal config state "
<< int(_rsConfigState);
fassertFailed(18913);
}
log() << "Standing for election";
const StatusWith finishEvh = _replExecutor->makeEvent();
if (finishEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
fassert(18680, finishEvh.getStatus());
_electionFinishedEvent = finishEvh.getValue();
LoseElectionGuard lossGuard(_topCoord.get(),
_replExecutor.get(),
&_freshnessChecker,
&_electCmdRunner,
&_electionFinishedEvent);
invariant(_rsConfig.getMemberAt(_selfIndex).isElectable());
OpTime lastOpTimeApplied(_getMyLastAppliedOpTime_inlock());
if (lastOpTimeApplied.isNull()) {
log() << "not trying to elect self, "
"do not yet have a complete set of data from any point in time"
" -- lastAppliedOpTime is null";
return;
}
_freshnessChecker.reset(new FreshnessChecker);
StatusWith nextPhaseEvh =
_freshnessChecker->start(_replExecutor.get(),
lastOpTimeApplied.getTimestamp(),
_rsConfig,
_selfIndex,
_topCoord->getMaybeUpHostAndPorts());
if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
fassert(18681, nextPhaseEvh.getStatus());
_replExecutor
->onEvent(nextPhaseEvh.getValue(),
[this](const mongo::executor::TaskExecutor::CallbackArgs&) {
_onFreshnessCheckComplete();
})
.status_with_transitional_ignore();
lossGuard.dismiss();
}
void ReplicationCoordinatorImpl::_onFreshnessCheckComplete() {
stdx::lock_guard lk(_mutex);
invariant(_freshnessChecker);
invariant(!_electCmdRunner);
LoseElectionGuard lossGuard(_topCoord.get(),
_replExecutor.get(),
&_freshnessChecker,
&_electCmdRunner,
&_electionFinishedEvent);
if (_freshnessChecker->isCanceled()) {
LOG(2) << "Election canceled during freshness check phase";
return;
}
const Date_t now(_replExecutor->now());
const FreshnessChecker::ElectionAbortReason abortReason =
_freshnessChecker->shouldAbortElection();
// need to not sleep after last time sleeping,
switch (abortReason) {
case FreshnessChecker::None:
break;
case FreshnessChecker::FreshnessTie:
if ((_selfIndex != 0) && !_sleptLastElection) {
const auto ms = Milliseconds(_nextRandomInt64_inlock(1000) + 50);
const Date_t nextCandidateTime = now + ms;
log() << "possible election tie; sleeping " << ms << " until "
<< dateToISOStringLocal(nextCandidateTime);
_topCoord->setElectionSleepUntil(nextCandidateTime);
_scheduleWorkAt(nextCandidateTime,
[=](const executor::TaskExecutor::CallbackArgs& cbData) {
_recoverFromElectionTie(cbData);
});
_sleptLastElection = true;
return;
}
_sleptLastElection = false;
break;
case FreshnessChecker::FresherNodeFound:
log() << "not electing self, we are not freshest";
return;
case FreshnessChecker::QuorumUnreachable:
log() << "not electing self, we could not contact enough voting members";
return;
default:
log() << "not electing self due to election abort message :"
<< static_cast(abortReason);
return;
}
log() << "running for election"
<< (abortReason == FreshnessChecker::FreshnessTie
? "; slept last election, so running regardless of possible tie"
: "");
// Secure our vote for ourself first
if (!_topCoord->voteForMyself(now)) {
return;
}
_electCmdRunner.reset(new ElectCmdRunner);
StatusWith nextPhaseEvh = _electCmdRunner->start(
_replExecutor.get(), _rsConfig, _selfIndex, _topCoord->getMaybeUpHostAndPorts());
if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
fassert(18685, nextPhaseEvh.getStatus());
_replExecutor
->onEvent(nextPhaseEvh.getValue(),
[=](const executor::TaskExecutor::CallbackArgs&) { _onElectCmdRunnerComplete(); })
.status_with_transitional_ignore();
lossGuard.dismiss();
}
void ReplicationCoordinatorImpl::_onElectCmdRunnerComplete() {
stdx::unique_lock lk(_mutex);
LoseElectionGuard lossGuard(_topCoord.get(),
_replExecutor.get(),
&_freshnessChecker,
&_electCmdRunner,
&_electionFinishedEvent);
invariant(_freshnessChecker);
invariant(_electCmdRunner);
if (_electCmdRunner->isCanceled()) {
LOG(2) << "Election canceled during elect self phase";
return;
}
const int receivedVotes = _electCmdRunner->getReceivedVotes();
if (receivedVotes < _rsConfig.getMajorityVoteCount()) {
log() << "couldn't elect self, only received " << receivedVotes
<< " votes, but needed at least " << _rsConfig.getMajorityVoteCount();
// Suppress ourselves from standing for election again, giving other nodes a chance
// to win their elections.
const auto ms = Milliseconds(_nextRandomInt64_inlock(1000) + 50);
const Date_t now(_replExecutor->now());
const Date_t nextCandidateTime = now + ms;
log() << "waiting until " << nextCandidateTime << " before standing for election again";
_topCoord->setElectionSleepUntil(nextCandidateTime);
_scheduleWorkAt(nextCandidateTime, [=](const executor::TaskExecutor::CallbackArgs& cbData) {
_recoverFromElectionTie(cbData);
});
return;
}
if (_rsConfig.getConfigVersion() != _freshnessChecker->getOriginalConfigVersion()) {
log() << "config version changed during our election, ignoring result";
return;
}
log() << "election succeeded, assuming primary role";
lossGuard.dismiss();
_freshnessChecker.reset(NULL);
_electCmdRunner.reset(NULL);
auto electionFinishedEvent = _electionFinishedEvent;
lk.unlock();
_performPostMemberStateUpdateAction(kActionWinElection);
_replExecutor->signalEvent(electionFinishedEvent);
}
void ReplicationCoordinatorImpl::_recoverFromElectionTie(
const executor::TaskExecutor::CallbackArgs& cbData) {
stdx::unique_lock lk(_mutex);
auto now = _replExecutor->now();
const auto status = _topCoord->checkShouldStandForElection(now);
if (!status.isOK()) {
LOG(2) << "ReplicationCoordinatorImpl::_recoverFromElectionTie -- " << status.reason();
} else {
fassert(28817,
_topCoord->becomeCandidateIfElectable(
now, TopologyCoordinator::StartElectionReason::kElectionTimeout));
_startElectSelf_inlock();
}
}
} // namespace repl
} // namespace mongo