/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
#include "mongo/platform/basic.h"
#include
#include "mongo/base/disallow_copying.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include "mongo/db/repl/topology_coordinator_impl.h"
#include "mongo/db/repl/vote_requester.h"
#include "mongo/platform/unordered_set.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
namespace repl {
using LockGuard = stdx::lock_guard;
class ReplicationCoordinatorImpl::LoseElectionGuardV1 {
MONGO_DISALLOW_COPYING(LoseElectionGuardV1);
public:
LoseElectionGuardV1(ReplicationCoordinatorImpl* replCoord) : _replCoord(replCoord) {}
virtual ~LoseElectionGuardV1() {
if (_dismissed) {
return;
}
_replCoord->_topCoord->processLoseElection();
_replCoord->_voteRequester.reset(nullptr);
if (_isDryRun && _replCoord->_electionDryRunFinishedEvent.isValid()) {
_replCoord->_replExecutor.signalEvent(_replCoord->_electionDryRunFinishedEvent);
}
if (_replCoord->_electionFinishedEvent.isValid()) {
_replCoord->_replExecutor.signalEvent(_replCoord->_electionFinishedEvent);
}
}
void dismiss() {
_dismissed = true;
}
protected:
ReplicationCoordinatorImpl* const _replCoord;
bool _isDryRun = false;
bool _dismissed = false;
};
class ReplicationCoordinatorImpl::LoseElectionDryRunGuardV1 : public LoseElectionGuardV1 {
MONGO_DISALLOW_COPYING(LoseElectionDryRunGuardV1);
public:
LoseElectionDryRunGuardV1(ReplicationCoordinatorImpl* replCoord)
: LoseElectionGuardV1(replCoord) {
_isDryRun = true;
}
};
void ReplicationCoordinatorImpl::_startElectSelfV1() {
invariant(!_voteRequester);
invariant(!_freshnessChecker);
stdx::unique_lock lk(_mutex);
switch (_rsConfigState) {
case kConfigSteady:
break;
case kConfigInitiating:
case kConfigReconfiguring:
case kConfigHBReconfiguring:
LOG(2) << "Not standing for election; processing a configuration change";
// Transition out of candidate role.
_topCoord->processLoseElection();
return;
default:
severe() << "Entered replica set election code while in illegal config state "
<< int(_rsConfigState);
fassertFailed(28641);
}
auto finishedEvent = _makeEvent();
if (!finishedEvent) {
return;
}
_electionFinishedEvent = finishedEvent;
auto dryRunFinishedEvent = _makeEvent();
if (!dryRunFinishedEvent) {
return;
}
_electionDryRunFinishedEvent = dryRunFinishedEvent;
LoseElectionDryRunGuardV1 lossGuard(this);
invariant(_rsConfig.getMemberAt(_selfIndex).isElectable());
// Note: If we aren't durable, send last applied.
const auto lastOpTime = _isDurableStorageEngine() ? _getMyLastDurableOpTime_inlock()
: _getMyLastAppliedOpTime_inlock();
if (lastOpTime == OpTime()) {
log() << "not trying to elect self, "
"do not yet have a complete set of data from any point in time";
return;
}
log() << "conducting a dry run election to see if we could be elected";
_voteRequester.reset(new VoteRequester);
// This is necessary because the voteRequester may call directly into winning an
// election, if there are no other MaybeUp nodes. Winning an election attempts to lock
// _mutex again.
lk.unlock();
long long term = _topCoord->getTerm();
StatusWith nextPhaseEvh =
_voteRequester->start(&_replExecutor,
_rsConfig,
_selfIndex,
_topCoord->getTerm(),
true, // dry run
lastOpTime);
if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
fassert(28685, nextPhaseEvh.getStatus());
_replExecutor.onEvent(nextPhaseEvh.getValue(),
stdx::bind(&ReplicationCoordinatorImpl::_onDryRunComplete, this, term));
lossGuard.dismiss();
}
void ReplicationCoordinatorImpl::_onDryRunComplete(long long originalTerm) {
invariant(_voteRequester);
LoseElectionDryRunGuardV1 lossGuard(this);
LockGuard lk(_topoMutex);
if (_topCoord->getTerm() != originalTerm) {
log() << "not running for primary, we have been superceded already";
return;
}
const VoteRequester::Result endResult = _voteRequester->getResult();
if (endResult == VoteRequester::Result::kInsufficientVotes) {
log() << "not running for primary, we received insufficient votes";
return;
} else if (endResult == VoteRequester::Result::kStaleTerm) {
log() << "not running for primary, we have been superceded already";
return;
} else if (endResult != VoteRequester::Result::kSuccessfullyElected) {
log() << "not running for primary, we received an unexpected problem";
return;
}
log() << "dry election run succeeded, running for election";
// Stepdown is impossible from this term update.
TopologyCoordinator::UpdateTermResult updateTermResult;
_updateTerm_incallback(originalTerm + 1, &updateTermResult);
invariant(updateTermResult == TopologyCoordinator::UpdateTermResult::kUpdatedTerm);
// Secure our vote for ourself first
_topCoord->voteForMyselfV1();
// Store the vote in persistent storage.
LastVote lastVote;
lastVote.setTerm(originalTerm + 1);
lastVote.setCandidateIndex(_selfIndex);
auto cbStatus = _replExecutor.scheduleDBWork(
[this, lastVote](const ReplicationExecutor::CallbackArgs& cbData) {
_writeLastVoteForMyElection(lastVote, cbData);
});
if (cbStatus.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
fassert(34421, cbStatus.getStatus());
lossGuard.dismiss();
}
void ReplicationCoordinatorImpl::_writeLastVoteForMyElection(
LastVote lastVote, const ReplicationExecutor::CallbackArgs& cbData) {
invariant(_voteRequester);
LoseElectionDryRunGuardV1 lossGuard(this);
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
}
invariant(cbData.txn);
Status status = _externalState->storeLocalLastVoteDocument(cbData.txn, lastVote);
if (!status.isOK()) {
error() << "failed to store LastVote document when voting for myself: " << status;
return;
}
_startVoteRequester(lastVote.getTerm());
_replExecutor.signalEvent(_electionDryRunFinishedEvent);
lossGuard.dismiss();
}
void ReplicationCoordinatorImpl::_startVoteRequester(long long newTerm) {
invariant(_voteRequester);
LoseElectionGuardV1 lossGuard(this);
LockGuard lk(_topoMutex);
const auto lastOpTime =
_isDurableStorageEngine() ? getMyLastDurableOpTime() : getMyLastAppliedOpTime();
_voteRequester.reset(new VoteRequester);
StatusWith nextPhaseEvh = _voteRequester->start(
&_replExecutor, _rsConfig, _selfIndex, _topCoord->getTerm(), false, lastOpTime);
if (nextPhaseEvh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
fassert(28643, nextPhaseEvh.getStatus());
_replExecutor.onEvent(
nextPhaseEvh.getValue(),
stdx::bind(&ReplicationCoordinatorImpl::_onVoteRequestComplete, this, newTerm));
lossGuard.dismiss();
}
void ReplicationCoordinatorImpl::_onVoteRequestComplete(long long originalTerm) {
invariant(_voteRequester);
LoseElectionGuardV1 lossGuard(this);
LockGuard lk(_topoMutex);
if (_topCoord->getTerm() != originalTerm) {
log() << "not becoming primary, we have been superceded already";
return;
}
const VoteRequester::Result endResult = _voteRequester->getResult();
switch (endResult) {
case VoteRequester::Result::kInsufficientVotes:
log() << "not becoming primary, we received insufficient votes";
return;
case VoteRequester::Result::kStaleTerm:
log() << "not becoming primary, we have been superceded already";
return;
case VoteRequester::Result::kSuccessfullyElected:
log() << "election succeeded, assuming primary role in term " << _topCoord->getTerm();
break;
}
{
// Mark all nodes that responded to our vote request as up to avoid immediately
// relinquishing primary.
stdx::lock_guard lk(_mutex);
Date_t now = _replExecutor.now();
const unordered_set liveNodes = _voteRequester->getResponders();
for (auto& nodeInfo : _slaveInfo) {
if (liveNodes.count(nodeInfo.hostAndPort)) {
nodeInfo.down = false;
nodeInfo.lastUpdate = now;
}
}
}
// Prevent last committed optime from updating until we finish draining.
_setFirstOpTimeOfMyTerm(
OpTime(Timestamp(std::numeric_limits::max(), 0), std::numeric_limits::max()));
_performPostMemberStateUpdateAction(kActionWinElection);
_voteRequester.reset(nullptr);
_replExecutor.signalEvent(_electionFinishedEvent);
lossGuard.dismiss();
}
} // namespace repl
} // namespace mongo