/**
* Copyright (C) 2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
#include "mongo/platform/basic.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
#include
#include
#include "mongo/base/status.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/global_timestamp.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/operation_context_noop.h"
#include "mongo/db/repl/check_quorum_for_config_change.h"
#include "mongo/db/repl/elect_cmd_runner.h"
#include "mongo/db/repl/freshness_checker.h"
#include "mongo/db/repl/handshake_args.h"
#include "mongo/db/repl/is_master_response.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/repl_set_heartbeat_args.h"
#include "mongo/db/repl/repl_set_heartbeat_response.h"
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/replica_set_config_checks.h"
#include "mongo/db/repl/replication_executor.h"
#include "mongo/db/repl/rslog.h"
#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/db/repl/update_position_args.h"
#include "mongo/db/server_options.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
#include "mongo/util/time_support.h"
#include "mongo/util/timer.h"
namespace mongo {
namespace repl {
namespace {
typedef StatusWith CBHStatus;
void lockAndCall(boost::unique_lock* lk, const stdx::function& fn) {
if (!lk->owns_lock()) {
lk->lock();
}
fn();
}
/**
* Implements the force-reconfig behavior of incrementing config version by a large random
* number.
*/
BSONObj incrementConfigVersionByRandom(BSONObj config) {
BSONObjBuilder builder;
for (BSONObjIterator iter(config); iter.more(); iter.next()) {
BSONElement elem = *iter;
if (elem.fieldNameStringData() == ReplicaSetConfig::kVersionFieldName &&
elem.isNumber()) {
boost::scoped_ptr generator(SecureRandom::create());
const int random = std::abs(static_cast(generator->nextInt64()) % 100000);
builder.appendIntOrLL(ReplicaSetConfig::kVersionFieldName,
elem.numberLong() + 10000 + random);
}
else {
builder.append(elem);
}
}
return builder.obj();
}
} //namespace
struct ReplicationCoordinatorImpl::WaiterInfo {
/**
* Constructor takes the list of waiters and enqueues itself on the list, removing itself
* in the destructor.
*/
WaiterInfo(std::vector* _list,
unsigned int _opID,
const Timestamp* _opTime,
const WriteConcernOptions* _writeConcern,
boost::condition_variable* _condVar) : list(_list),
master(true),
opID(_opID),
opTime(_opTime),
writeConcern(_writeConcern),
condVar(_condVar) {
list->push_back(this);
}
~WaiterInfo() {
list->erase(std::remove(list->begin(), list->end(), this), list->end());
}
std::vector* list;
bool master; // Set to false to indicate that stepDown was called while waiting
const unsigned int opID;
const Timestamp* opTime;
const WriteConcernOptions* writeConcern;
boost::condition_variable* condVar;
};
namespace {
ReplicationCoordinator::Mode getReplicationModeFromSettings(const ReplSettings& settings) {
if (settings.usingReplSets()) {
return ReplicationCoordinator::modeReplSet;
}
if (settings.master || settings.slave) {
return ReplicationCoordinator::modeMasterSlave;
}
return ReplicationCoordinator::modeNone;
}
} // namespace
ReplicationCoordinatorImpl::ReplicationCoordinatorImpl(
const ReplSettings& settings,
ReplicationCoordinatorExternalState* externalState,
ReplicationExecutor::NetworkInterface* network,
TopologyCoordinator* topCoord,
int64_t prngSeed) :
_settings(settings),
_replMode(getReplicationModeFromSettings(settings)),
_topCoord(topCoord),
_replExecutor(network, prngSeed),
_externalState(externalState),
_inShutdown(false),
_memberState(MemberState::RS_STARTUP),
_isWaitingForDrainToComplete(false),
_rsConfigState(kConfigPreStart),
_selfIndex(-1),
_sleptLastElection(false),
_canAcceptNonLocalWrites(!(settings.usingReplSets() || settings.slave)),
_canServeNonLocalReads(0U) {
if (!isReplEnabled()) {
return;
}
boost::scoped_ptr rbidGenerator(SecureRandom::create());
_rbid = static_cast(rbidGenerator->nextInt64());
if (_rbid < 0) {
// Ensure _rbid is always positive
_rbid = -_rbid;
}
// Make sure there is always an entry in _slaveInfo for ourself.
SlaveInfo selfInfo;
selfInfo.self = true;
_slaveInfo.push_back(selfInfo);
}
ReplicationCoordinatorImpl::~ReplicationCoordinatorImpl() {}
void ReplicationCoordinatorImpl::waitForStartUpComplete() {
boost::unique_lock lk(_mutex);
while (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) {
_rsConfigStateChange.wait(lk);
}
}
ReplicaSetConfig ReplicationCoordinatorImpl::getReplicaSetConfig_forTest() {
boost::lock_guard lk(_mutex);
return _rsConfig;
}
bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* txn) {
StatusWith cfg = _externalState->loadLocalConfigDocument(txn);
if (!cfg.isOK()) {
log() << "Did not find local replica set configuration document at startup; " <<
cfg.getStatus();
return true;
}
ReplicaSetConfig localConfig;
Status status = localConfig.initialize(cfg.getValue());
if (!status.isOK()) {
error() << "Locally stored replica set configuration does not parse; See "
"http://www.mongodb.org/dochub/core/recover-replica-set-from-invalid-config "
"for information on how to recover from this. Got \"" <<
status << "\" while parsing " << cfg.getValue();
fassertFailedNoTrace(28545);
}
StatusWith lastOpTimeStatus = _externalState->loadLastOpTime(txn);
// Use a callback here, because _finishLoadLocalConfig calls isself() which requires
// that the server's networking layer be up and running and accepting connections, which
// doesn't happen until startReplication finishes.
_replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_finishLoadLocalConfig,
this,
stdx::placeholders::_1,
localConfig,
lastOpTimeStatus));
return false;
}
void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
const ReplicationExecutor::CallbackData& cbData,
const ReplicaSetConfig& localConfig,
const StatusWith& lastOpTimeStatus) {
if (!cbData.status.isOK()) {
LOG(1) << "Loading local replica set configuration failed due to " << cbData.status;
return;
}
StatusWith myIndex = validateConfigForStartUp(_externalState.get(),
_rsConfig,
localConfig);
if (!myIndex.isOK()) {
if (myIndex.getStatus() == ErrorCodes::NodeNotFound ||
myIndex.getStatus() == ErrorCodes::DuplicateKey) {
warning() << "Locally stored replica set configuration does not have a valid entry "
"for the current node; waiting for reconfig or remote heartbeat; Got \"" <<
myIndex.getStatus() << "\" while validating " << localConfig.toBSON();
myIndex = StatusWith(-1);
}
else {
error() << "Locally stored replica set configuration is invalid; See "
"http://www.mongodb.org/dochub/core/recover-replica-set-from-invalid-config"
" for information on how to recover from this. Got \"" <<
myIndex.getStatus() << "\" while validating " << localConfig.toBSON();
fassertFailedNoTrace(28544);
}
}
if (localConfig.getReplSetName() != _settings.ourSetName()) {
warning() << "Local replica set configuration document reports set name of " <<
localConfig.getReplSetName() << ", but command line reports " <<
_settings.ourSetName() << "; waitng for reconfig or remote heartbeat";
myIndex = StatusWith(-1);
}
// Do not check optime, if this node is an arbiter.
bool isArbiter = myIndex.getValue() != -1 &&
localConfig.getMemberAt(myIndex.getValue()).isArbiter();
Timestamp lastOpTime(0, 0);
if (!isArbiter) {
if (!lastOpTimeStatus.isOK()) {
warning() << "Failed to load timestamp of most recently applied operation; " <<
lastOpTimeStatus.getStatus();
}
else {
lastOpTime = lastOpTimeStatus.getValue();
}
}
boost::unique_lock lk(_mutex);
invariant(_rsConfigState == kConfigStartingUp);
const PostMemberStateUpdateAction action =
_setCurrentRSConfig_inlock(localConfig, myIndex.getValue());
_setMyLastOptime_inlock(&lk, lastOpTime, false);
_externalState->setGlobalTimestamp(lastOpTime);
if (lk.owns_lock()) {
lk.unlock();
}
_performPostMemberStateUpdateAction(action);
_externalState->startThreads();
}
void ReplicationCoordinatorImpl::startReplication(OperationContext* txn) {
if (!isReplEnabled()) {
boost::lock_guard lk(_mutex);
_setConfigState_inlock(kConfigReplicationDisabled);
return;
}
{
OID rid = _externalState->ensureMe(txn);
boost::lock_guard lk(_mutex);
fassert(18822, !_inShutdown);
_setConfigState_inlock(kConfigStartingUp);
_myRID = rid;
_slaveInfo[_getMyIndexInSlaveInfo_inlock()].rid = rid;
}
if (!_settings.usingReplSets()) {
// Must be Master/Slave
invariant(_settings.master || _settings.slave);
_externalState->startMasterSlave(txn);
return;
}
_topCoordDriverThread.reset(new boost::thread(stdx::bind(&ReplicationExecutor::run,
&_replExecutor)));
bool doneLoadingConfig = _startLoadLocalConfig(txn);
if (doneLoadingConfig) {
// If we're not done loading the config, then the config state will be set by
// _finishLoadLocalConfig.
boost::lock_guard lk(_mutex);
invariant(!_rsConfig.isInitialized());
_setConfigState_inlock(kConfigUninitialized);
}
}
void ReplicationCoordinatorImpl::shutdown() {
// Shutdown must:
// * prevent new threads from blocking in awaitReplication
// * wake up all existing threads blocking in awaitReplication
// * tell the ReplicationExecutor to shut down
// * wait for the thread running the ReplicationExecutor to finish
if (!_settings.usingReplSets()) {
return;
}
boost::thread* hbReconfigThread = NULL;
{
boost::lock_guard lk(_mutex);
fassert(28533, !_inShutdown);
_inShutdown = true;
if (_rsConfigState == kConfigPreStart) {
warning() << "ReplicationCoordinatorImpl::shutdown() called before "
"startReplication() finished. Shutting down without cleaning up the "
"replication system";
return;
}
fassert(18823, _rsConfigState != kConfigStartingUp);
for (std::vector::iterator it = _replicationWaiterList.begin();
it != _replicationWaiterList.end(); ++it) {
WaiterInfo* waiter = *it;
waiter->condVar->notify_all();
}
// Since we've set _inShutdown we know that _heartbeatReconfigThread will not be
// changed again, which makes it safe to store the pointer to it to be accessed outside
// of _mutex.
hbReconfigThread = _heartbeatReconfigThread.get();
}
if (hbReconfigThread) {
hbReconfigThread->join();
}
_replExecutor.shutdown();
_topCoordDriverThread->join(); // must happen outside _mutex
_externalState->shutdown();
}
const ReplSettings& ReplicationCoordinatorImpl::getSettings() const {
return _settings;
}
ReplicationCoordinator::Mode ReplicationCoordinatorImpl::getReplicationMode() const {
return _getReplicationMode_inlock();
}
ReplicationCoordinator::Mode ReplicationCoordinatorImpl::_getReplicationMode_inlock() const {
return _replMode;
}
MemberState ReplicationCoordinatorImpl::getMemberState() const {
boost::lock_guard lk(_mutex);
return _getMemberState_inlock();
}
MemberState ReplicationCoordinatorImpl::_getMemberState_inlock() const {
return _memberState;
}
Seconds ReplicationCoordinatorImpl::getSlaveDelaySecs() const {
boost::lock_guard lk(_mutex);
invariant(_rsConfig.isInitialized());
uassert(28524,
"Node not a member of the current set configuration",
_selfIndex != -1);
return _rsConfig.getMemberAt(_selfIndex).getSlaveDelay();
}
void ReplicationCoordinatorImpl::clearSyncSourceBlacklist() {
CBHStatus cbh = _replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_clearSyncSourceBlacklist_finish,
this,
stdx::placeholders::_1));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
fassert(18907, cbh.getStatus());
_replExecutor.wait(cbh.getValue());
}
void ReplicationCoordinatorImpl::_clearSyncSourceBlacklist_finish(
const ReplicationExecutor::CallbackData& cbData) {
if (cbData.status == ErrorCodes::CallbackCanceled)
return;
_topCoord->clearSyncSourceBlacklist();
}
bool ReplicationCoordinatorImpl::setFollowerMode(const MemberState& newState) {
StatusWith finishedSettingFollowerState =
_replExecutor.makeEvent();
if (finishedSettingFollowerState.getStatus() == ErrorCodes::ShutdownInProgress) {
return false;
}
fassert(18812, finishedSettingFollowerState.getStatus());
bool success = false;
CBHStatus cbh = _replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_setFollowerModeFinish,
this,
stdx::placeholders::_1,
newState,
finishedSettingFollowerState.getValue(),
&success));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return false;
}
fassert(18699, cbh.getStatus());
_replExecutor.waitForEvent(finishedSettingFollowerState.getValue());
return success;
}
void ReplicationCoordinatorImpl::_setFollowerModeFinish(
const ReplicationExecutor::CallbackData& cbData,
const MemberState& newState,
const ReplicationExecutor::EventHandle& finishedSettingFollowerMode,
bool* success) {
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
}
if (newState == _topCoord->getMemberState()) {
*success = true;
_replExecutor.signalEvent(finishedSettingFollowerMode);
return;
}
if (_topCoord->getRole() == TopologyCoordinator::Role::leader) {
*success = false;
_replExecutor.signalEvent(finishedSettingFollowerMode);
return;
}
if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) {
// We are a candidate, which means _topCoord believs us to be in state RS_SECONDARY, and
// we know that newState != RS_SECONDARY because we would have returned early, above if
// the old and new state were equal. So, cancel the running election and try again to
// finish setting the follower mode.
invariant(_freshnessChecker);
_freshnessChecker->cancel(&_replExecutor);
if (_electCmdRunner) {
_electCmdRunner->cancel(&_replExecutor);
}
_replExecutor.onEvent(
_electionFinishedEvent,
stdx::bind(&ReplicationCoordinatorImpl::_setFollowerModeFinish,
this,
stdx::placeholders::_1,
newState,
finishedSettingFollowerMode,
success));
return;
}
boost::unique_lock lk(_mutex);
_topCoord->setFollowerMode(newState.s);
const PostMemberStateUpdateAction action =
_updateMemberStateFromTopologyCoordinator_inlock();
*success = true;
_replExecutor.signalEvent(finishedSettingFollowerMode);
lk.unlock();
_performPostMemberStateUpdateAction(action);
}
bool ReplicationCoordinatorImpl::isWaitingForApplierToDrain() {
boost::lock_guard lk(_mutex);
return _isWaitingForDrainToComplete;
}
void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
// This logic is a little complicated in order to avoid acquiring the global exclusive lock
// unnecessarily. This is important because the applier may call signalDrainComplete()
// whenever it wants, not only when the ReplicationCoordinator is expecting it.
//
// The steps are:
// 1.) Check to see if we're waiting for this signal. If not, return early.
// 2.) Otherwise, release the mutex while acquiring the global exclusive lock,
// since that might take a while (NB there's a deadlock cycle otherwise, too).
// 3.) Re-check to see if we've somehow left drain mode. If we have not, clear
// _isWaitingForDrainToComplete, set the flag allowing non-local database writes and
// drop the mutex. At this point, no writes can occur from other threads, due to the
// global exclusive lock.
// 4.) Drop all temp collections.
// 5.) Drop the global exclusive lock.
//
// Because replicatable writes are forbidden while in drain mode, and we don't exit drain
// mode until we have the global exclusive lock, which forbids all other threads from making
// writes, we know that from the time that _isWaitingForDrainToComplete is set in
// _performPostMemberStateUpdateAction(kActionWinElection) until this method returns, no
// external writes will be processed. This is important so that a new temp collection isn't
// introduced on the new primary before we drop all the temp collections.
boost::unique_lock lk(_mutex);
if (!_isWaitingForDrainToComplete) {
return;
}
lk.unlock();
ScopedTransaction transaction(txn, MODE_X);
Lock::GlobalWrite globalWriteLock(txn->lockState());
lk.lock();
if (!_isWaitingForDrainToComplete) {
return;
}
_isWaitingForDrainToComplete = false;
_canAcceptNonLocalWrites = true;
lk.unlock();
_externalState->dropAllTempCollections(txn);
log() << "transition to primary complete; database writes are now permitted" << rsLog;
}
void ReplicationCoordinatorImpl::signalUpstreamUpdater() {
_externalState->forwardSlaveProgress();
}
ReplicationCoordinatorImpl::SlaveInfo*
ReplicationCoordinatorImpl::_findSlaveInfoByMemberID_inlock(int memberId) {
for (SlaveInfoVector::iterator it = _slaveInfo.begin(); it != _slaveInfo.end(); ++it) {
if (it->memberId == memberId) {
return &(*it);
}
}
return NULL;
}
ReplicationCoordinatorImpl::SlaveInfo*
ReplicationCoordinatorImpl::_findSlaveInfoByRID_inlock(const OID& rid) {
for (SlaveInfoVector::iterator it = _slaveInfo.begin(); it != _slaveInfo.end(); ++it) {
if (it->rid == rid) {
return &(*it);
}
}
return NULL;
}
void ReplicationCoordinatorImpl::_addSlaveInfo_inlock(const SlaveInfo& slaveInfo) {
invariant(_getReplicationMode_inlock() == modeMasterSlave);
_slaveInfo.push_back(slaveInfo);
// Wake up any threads waiting for replication that now have their replication
// check satisfied
_wakeReadyWaiters_inlock();
}
void ReplicationCoordinatorImpl::_updateSlaveInfoOptime_inlock(SlaveInfo* slaveInfo,
Timestamp ts) {
slaveInfo->opTime = ts;
// Wake up any threads waiting for replication that now have their replication
// check satisfied
_wakeReadyWaiters_inlock();
}
void ReplicationCoordinatorImpl::_updateSlaveInfoFromConfig_inlock() {
invariant(_settings.usingReplSets());
SlaveInfoVector oldSlaveInfos;
_slaveInfo.swap(oldSlaveInfos);
if (_selfIndex == -1) {
// If we aren't in the config then the only data we care about is for ourself
for (SlaveInfoVector::const_iterator it = oldSlaveInfos.begin();
it != oldSlaveInfos.end(); ++it) {
if (it->self) {
SlaveInfo slaveInfo = *it;
slaveInfo.memberId = -1;
_slaveInfo.push_back(slaveInfo);
return;
}
}
invariant(false); // There should always have been an entry for ourself
}
for (int i = 0; i < _rsConfig.getNumMembers(); ++i) {
const MemberConfig& memberConfig = _rsConfig.getMemberAt(i);
int memberId = memberConfig.getId();
const HostAndPort& memberHostAndPort = memberConfig.getHostAndPort();
SlaveInfo slaveInfo;
// Check if the node existed with the same member ID and hostname in the old data
for (SlaveInfoVector::const_iterator it = oldSlaveInfos.begin();
it != oldSlaveInfos.end(); ++it) {
if ((it->memberId == memberId && it->hostAndPort == memberHostAndPort)
|| (i == _selfIndex && it->self)) {
slaveInfo = *it;
}
}
// Make sure you have the most up-to-date info for member ID and hostAndPort.
slaveInfo.memberId = memberId;
slaveInfo.hostAndPort = memberHostAndPort;
_slaveInfo.push_back(slaveInfo);
}
invariant(static_cast(_slaveInfo.size()) == _rsConfig.getNumMembers());
}
size_t ReplicationCoordinatorImpl::_getMyIndexInSlaveInfo_inlock() const {
if (_getReplicationMode_inlock() == modeMasterSlave) {
// Self data always lives in the first entry in _slaveInfo for master/slave
return 0;
}
else {
invariant(_settings.usingReplSets());
if (_selfIndex == -1) {
invariant(_slaveInfo.size() == 1);
return 0;
}
else {
return _selfIndex;
}
}
}
Status ReplicationCoordinatorImpl::setLastOptimeForSlave(const OID& rid,
const Timestamp& ts) {
boost::unique_lock lock(_mutex);
massert(28576,
"Received an old style replication progress update, which is only used for Master/"
"Slave replication now, but this node is not using Master/Slave replication. "
"This is likely caused by an old (pre-2.6) member syncing from this node.",
_getReplicationMode_inlock() == modeMasterSlave);
SlaveInfo* slaveInfo = _findSlaveInfoByRID_inlock(rid);
if (slaveInfo) {
if (slaveInfo->opTime < ts) {
_updateSlaveInfoOptime_inlock(slaveInfo, ts);
}
}
else {
SlaveInfo newSlaveInfo;
newSlaveInfo.rid = rid;
newSlaveInfo.opTime = ts;
_addSlaveInfo_inlock(newSlaveInfo);
}
return Status::OK();
}
void ReplicationCoordinatorImpl::setMyHeartbeatMessage(const std::string& msg) {
CBHStatus cbh = _replExecutor.scheduleWork(
stdx::bind(&TopologyCoordinator::setMyHeartbeatMessage,
_topCoord.get(),
_replExecutor.now(),
msg));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
fassert(28540, cbh.getStatus());
_replExecutor.wait(cbh.getValue());
}
void ReplicationCoordinatorImpl::setMyLastOptime(const Timestamp& ts) {
boost::unique_lock lock(_mutex);
_setMyLastOptime_inlock(&lock, ts, false);
}
void ReplicationCoordinatorImpl::resetMyLastOptime() {
boost::unique_lock lock(_mutex);
_setMyLastOptime_inlock(&lock, Timestamp(), true);
}
void ReplicationCoordinatorImpl::_setMyLastOptime_inlock(
boost::unique_lock* lock, const Timestamp& ts, bool isRollbackAllowed) {
invariant(lock->owns_lock());
SlaveInfo* mySlaveInfo = &_slaveInfo[_getMyIndexInSlaveInfo_inlock()];
invariant(isRollbackAllowed || mySlaveInfo->opTime <= ts);
_updateSlaveInfoOptime_inlock(mySlaveInfo, ts);
if (_getReplicationMode_inlock() != modeReplSet) {
return;
}
if (_getMemberState_inlock().primary()) {
return;
}
lock->unlock();
_externalState->forwardSlaveProgress(); // Must do this outside _mutex
}
Timestamp ReplicationCoordinatorImpl::getMyLastOptime() const {
boost::lock_guard lock(_mutex);
return _getMyLastOptime_inlock();
}
Timestamp ReplicationCoordinatorImpl::_getMyLastOptime_inlock() const {
return _slaveInfo[_getMyIndexInSlaveInfo_inlock()].opTime;
}
Status ReplicationCoordinatorImpl::setLastOptime_forTest(long long cfgVer,
long long memberId,
const Timestamp& ts) {
boost::lock_guard lock(_mutex);
invariant(_getReplicationMode_inlock() == modeReplSet);
const UpdatePositionArgs::UpdateInfo update(OID(), ts, cfgVer, memberId);
long long configVersion;
return _setLastOptime_inlock(update, &configVersion);
}
Status ReplicationCoordinatorImpl::_setLastOptime_inlock(
const UpdatePositionArgs::UpdateInfo& args, long long* configVersion) {
if (_selfIndex == -1) {
// Ignore updates when we're in state REMOVED
return Status(ErrorCodes::NotMasterOrSecondaryCode,
"Received replSetUpdatePosition command but we are in state REMOVED");
}
invariant(_getReplicationMode_inlock() == modeReplSet);
if (args.memberId < 0) {
std::string errmsg = str::stream()
<< "Received replSetUpdatePosition for node with memberId "
<< args.memberId << " which is negative and therefore invalid";
LOG(1) << errmsg;
return Status(ErrorCodes::NodeNotFound, errmsg);
}
if (args.rid == _getMyRID_inlock() ||
args.memberId == _rsConfig.getMemberAt(_selfIndex).getId()) {
// Do not let remote nodes tell us what our optime is.
return Status::OK();
}
LOG(2) << "received notification that node with memberID " << args.memberId <<
" in config with version " << args.cfgver << " has reached optime: " << args.ts;
SlaveInfo* slaveInfo = NULL;
if (args.cfgver != _rsConfig.getConfigVersion()) {
std::string errmsg = str::stream()
<< "Received replSetUpdatePosition for node with memberId "
<< args.memberId << " whose config version of " << args.cfgver
<< " doesn't match our config version of "
<< _rsConfig.getConfigVersion();
LOG(1) << errmsg;
*configVersion = _rsConfig.getConfigVersion();
return Status(ErrorCodes::InvalidReplicaSetConfig, errmsg);
}
slaveInfo = _findSlaveInfoByMemberID_inlock(args.memberId);
if (!slaveInfo) {
invariant(!_rsConfig.findMemberByID(args.memberId));
std::string errmsg = str::stream()
<< "Received replSetUpdatePosition for node with memberId "
<< args.memberId << " which doesn't exist in our config";
LOG(1) << errmsg;
return Status(ErrorCodes::NodeNotFound, errmsg);
}
invariant(args.memberId == slaveInfo->memberId);
LOG(3) << "Node with memberID " << args.memberId << " currently has optime " <<
slaveInfo->opTime << "; updating to " << args.ts;
// Only update remote optimes if they increase.
if (slaveInfo->opTime < args.ts) {
_updateSlaveInfoOptime_inlock(slaveInfo, args.ts);
}
_updateLastCommittedOpTime_inlock();
return Status::OK();
}
void ReplicationCoordinatorImpl::interrupt(unsigned opId) {
boost::lock_guard lk(_mutex);
for (std::vector::iterator it = _replicationWaiterList.begin();
it != _replicationWaiterList.end(); ++it) {
WaiterInfo* info = *it;
if (info->opID == opId) {
info->condVar->notify_all();
return;
}
}
_replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaitersFromCallback,
this,
stdx::placeholders::_1));
}
void ReplicationCoordinatorImpl::interruptAll() {
boost::lock_guard lk(_mutex);
for (std::vector::iterator it = _replicationWaiterList.begin();
it != _replicationWaiterList.end(); ++it) {
WaiterInfo* info = *it;
info->condVar->notify_all();
}
_replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaitersFromCallback,
this,
stdx::placeholders::_1));
}
bool ReplicationCoordinatorImpl::_doneWaitingForReplication_inlock(
const Timestamp& opTime, const WriteConcernOptions& writeConcern) {
Status status = _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern);
if (!status.isOK()) {
return true;
}
if (!writeConcern.wMode.empty()) {
StringData patternName;
if (writeConcern.wMode == "majority") {
patternName = ReplicaSetConfig::kMajorityWriteConcernModeName;
}
else {
patternName = writeConcern.wMode;
}
StatusWith tagPattern =
_rsConfig.findCustomWriteMode(patternName);
if (!tagPattern.isOK()) {
return true;
}
return _haveTaggedNodesReachedOpTime_inlock(opTime, tagPattern.getValue());
}
else {
return _haveNumNodesReachedOpTime_inlock(opTime, writeConcern.wNumNodes);
}
}
bool ReplicationCoordinatorImpl::_haveNumNodesReachedOpTime_inlock(const Timestamp& opTime,
int numNodes) {
if (_getMyLastOptime_inlock() < opTime) {
// Secondaries that are for some reason ahead of us should not allow us to
// satisfy a write concern if we aren't caught up ourselves.
return false;
}
for (SlaveInfoVector::iterator it = _slaveInfo.begin();
it != _slaveInfo.end(); ++it) {
const Timestamp& slaveTime = it->opTime;
if (slaveTime >= opTime) {
--numNodes;
}
if (numNodes <= 0) {
return true;
}
}
return false;
}
bool ReplicationCoordinatorImpl::_haveTaggedNodesReachedOpTime_inlock(
const Timestamp& opTime, const ReplicaSetTagPattern& tagPattern) {
ReplicaSetTagMatch matcher(tagPattern);
for (SlaveInfoVector::iterator it = _slaveInfo.begin();
it != _slaveInfo.end(); ++it) {
const Timestamp& slaveTime = it->opTime;
if (slaveTime >= opTime) {
// This node has reached the desired optime, now we need to check if it is a part
// of the tagPattern.
const MemberConfig* memberConfig = _rsConfig.findMemberByID(it->memberId);
invariant(memberConfig);
for (MemberConfig::TagIterator it = memberConfig->tagsBegin();
it != memberConfig->tagsEnd(); ++it) {
if (matcher.update(*it)) {
return true;
}
}
}
}
return false;
}
ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::awaitReplication(
const OperationContext* txn,
const Timestamp& opTime,
const WriteConcernOptions& writeConcern) {
Timer timer;
boost::unique_lock lock(_mutex);
return _awaitReplication_inlock(&timer, &lock, txn, opTime, writeConcern);
}
ReplicationCoordinator::StatusAndDuration
ReplicationCoordinatorImpl::awaitReplicationOfLastOpForClient(
const OperationContext* txn,
const WriteConcernOptions& writeConcern) {
Timer timer;
boost::unique_lock lock(_mutex);
return _awaitReplication_inlock(
&timer,
&lock,
txn,
ReplClientInfo::forClient(txn->getClient()).getLastOp(),
writeConcern);
}
ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitReplication_inlock(
const Timer* timer,
boost::unique_lock* lock,
const OperationContext* txn,
const Timestamp& opTime,
const WriteConcernOptions& writeConcern) {
const Mode replMode = _getReplicationMode_inlock();
if (replMode == modeNone || serverGlobalParams.configsvr) {
// no replication check needed (validated above)
return StatusAndDuration(Status::OK(), Milliseconds(timer->millis()));
}
if (replMode == modeMasterSlave && writeConcern.wMode == "majority") {
// with master/slave, majority is equivalent to w=1
return StatusAndDuration(Status::OK(), Milliseconds(timer->millis()));
}
if (opTime.isNull()) {
// If waiting for the empty optime, always say it's been replicated.
return StatusAndDuration(Status::OK(), Milliseconds(timer->millis()));
}
if (replMode == modeReplSet && !_memberState.primary()) {
return StatusAndDuration(Status(ErrorCodes::NotMaster,
"Not master while waiting for replication"),
Milliseconds(timer->millis()));
}
if (writeConcern.wMode.empty()) {
if (writeConcern.wNumNodes < 1) {
return StatusAndDuration(Status::OK(), Milliseconds(timer->millis()));
}
else if (writeConcern.wNumNodes == 1 && _getMyLastOptime_inlock() >= opTime) {
return StatusAndDuration(Status::OK(), Milliseconds(timer->millis()));
}
}
// Must hold _mutex before constructing waitInfo as it will modify _replicationWaiterList
boost::condition_variable condVar;
WaiterInfo waitInfo(
&_replicationWaiterList, txn->getOpID(), &opTime, &writeConcern, &condVar);
while (!_doneWaitingForReplication_inlock(opTime, writeConcern)) {
const int elapsed = timer->millis();
Status interruptedStatus = txn->checkForInterruptNoAssert();
if (!interruptedStatus.isOK()) {
return StatusAndDuration(interruptedStatus, Milliseconds(elapsed));
}
if (!waitInfo.master) {
return StatusAndDuration(Status(ErrorCodes::NotMaster,
"Not master anymore while waiting for replication"
" - this most likely means that a step down"
" occurred while waiting for replication"),
Milliseconds(elapsed));
}
if (writeConcern.wTimeout != WriteConcernOptions::kNoTimeout &&
elapsed > writeConcern.wTimeout) {
return StatusAndDuration(Status(ErrorCodes::ExceededTimeLimit,
"waiting for replication timed out"),
Milliseconds(elapsed));
}
if (_inShutdown) {
return StatusAndDuration(Status(ErrorCodes::ShutdownInProgress,
"Replication is being shut down"),
Milliseconds(elapsed));
}
try {
if (writeConcern.wTimeout == WriteConcernOptions::kNoTimeout) {
condVar.wait(*lock);
}
else {
condVar.timed_wait(*lock, Milliseconds(writeConcern.wTimeout - elapsed));
}
} catch (const boost::thread_interrupted&) {}
}
Status status = _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern);
if (!status.isOK()) {
return StatusAndDuration(status, Milliseconds(timer->millis()));
}
return StatusAndDuration(Status::OK(), Milliseconds(timer->millis()));
}
Status ReplicationCoordinatorImpl::stepDown(OperationContext* txn,
bool force,
const Milliseconds& waitTime,
const Milliseconds& stepdownTime) {
const Date_t startTime = _replExecutor.now();
const Date_t stepDownUntil(startTime.millis + stepdownTime.total_milliseconds());
const Date_t waitUntil(startTime.millis + waitTime.total_milliseconds());
if (!getMemberState().primary()) {
// Note this check is inherently racy - it's always possible for the node to
// stepdown from some other path before we acquire the global shared lock, but
// that's okay because we are resiliant to that happening in _stepDownContinue.
return Status(ErrorCodes::NotMaster, "not primary so can't step down");
}
LockResult lockState = txn->lockState()->lockGlobalBegin(MODE_S);
// We've requested the global shared lock which will stop new writes from coming in,
// but existing writes could take a long time to finish, so kill all user operations
// to help us get the global lock faster.
_externalState->killAllUserOperations(txn);
if (lockState == LOCK_WAITING) {
lockState = txn->lockState()->lockGlobalComplete(stepdownTime.total_milliseconds());
if (lockState == LOCK_TIMEOUT) {
return Status(ErrorCodes::ExceededTimeLimit,
"Could not acquire the global shared lock within the amount of time "
"specified that we should step down for");
}
}
invariant(lockState == LOCK_OK);
ON_BLOCK_EXIT(&Locker::unlockAll, txn->lockState());
// From this point onward we are guaranteed to be holding the global shared lock.
StatusWith finishedEvent = _replExecutor.makeEvent();
if (finishedEvent.getStatus() == ErrorCodes::ShutdownInProgress) {
return finishedEvent.getStatus();
}
fassert(26000, finishedEvent.getStatus());
Status result(ErrorCodes::InternalError, "didn't set status in _stepDownContinue");
CBHStatus cbh = _replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_stepDownContinue,
this,
stdx::placeholders::_1,
finishedEvent.getValue(),
txn,
waitUntil,
stepDownUntil,
force,
&result));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return cbh.getStatus();
}
fassert(18809, cbh.getStatus());
cbh = _replExecutor.scheduleWorkAt(
waitUntil,
stdx::bind(&ReplicationCoordinatorImpl::_signalStepDownWaitersFromCallback,
this,
stdx::placeholders::_1));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return cbh.getStatus();
}
fassert(26001, cbh.getStatus());
_replExecutor.waitForEvent(finishedEvent.getValue());
return result;
}
void ReplicationCoordinatorImpl::_signalStepDownWaitersFromCallback(
const ReplicationExecutor::CallbackData& cbData) {
if (!cbData.status.isOK()) {
return;
}
_signalStepDownWaiters();
}
void ReplicationCoordinatorImpl::_signalStepDownWaiters() {
std::for_each(_stepDownWaiters.begin(),
_stepDownWaiters.end(),
stdx::bind(&ReplicationExecutor::signalEvent,
&_replExecutor,
stdx::placeholders::_1));
_stepDownWaiters.clear();
}
void ReplicationCoordinatorImpl::_stepDownContinue(
const ReplicationExecutor::CallbackData& cbData,
const ReplicationExecutor::EventHandle finishedEvent,
OperationContext* txn,
const Date_t waitUntil,
const Date_t stepDownUntil,
bool force,
Status* result) {
if (cbData.status == ErrorCodes::CallbackCanceled) {
// Cancelation only occurs on shutdown, which will also handle signaling the event.
*result = Status(ErrorCodes::ShutdownInProgress, "Shutting down replication");
return;
}
ScopeGuard allFinishedGuard = MakeGuard(
stdx::bind(&ReplicationExecutor::signalEvent, &_replExecutor, finishedEvent));
if (!cbData.status.isOK()) {
*result = cbData.status;
return;
}
Status interruptedStatus = txn->checkForInterruptNoAssert();
if (!interruptedStatus.isOK()) {
*result = interruptedStatus;
return;
}
if (_topCoord->getRole() != TopologyCoordinator::Role::leader) {
*result = Status(ErrorCodes::NotMaster,
"Already stepped down from primary while processing step down "
"request");
return;
}
const Date_t now = _replExecutor.now();
if (now >= stepDownUntil) {
*result = Status(ErrorCodes::ExceededTimeLimit,
"By the time we were ready to step down, we were already past the "
"time we were supposed to step down until");
return;
}
bool forceNow = now >= waitUntil ? force : false;
if (_topCoord->stepDown(stepDownUntil, forceNow, getMyLastOptime())) {
// Schedule work to (potentially) step back up once the stepdown period has ended.
_replExecutor.scheduleWorkAt(stepDownUntil,
stdx::bind(&ReplicationCoordinatorImpl::_handleTimePassing,
this,
stdx::placeholders::_1));
boost::unique_lock lk(_mutex);
const PostMemberStateUpdateAction action =
_updateMemberStateFromTopologyCoordinator_inlock();
lk.unlock();
_performPostMemberStateUpdateAction(action);
*result = Status::OK();
return;
}
// Step down failed. Keep waiting if we can, otherwise finish.
if (now >= waitUntil) {
*result = Status(ErrorCodes::ExceededTimeLimit, str::stream() <<
"No electable secondaries caught up as of " <<
dateToISOStringLocal(now) <<
". Please use {force: true} to force node to step down.");
return;
}
if (_stepDownWaiters.empty()) {
StatusWith reschedEvent =
_replExecutor.makeEvent();
if (!reschedEvent.isOK()) {
*result = reschedEvent.getStatus();
return;
}
_stepDownWaiters.push_back(reschedEvent.getValue());
}
CBHStatus cbh = _replExecutor.onEvent(
_stepDownWaiters.back(),
stdx::bind(&ReplicationCoordinatorImpl::_stepDownContinue,
this,
stdx::placeholders::_1,
finishedEvent,
txn,
waitUntil,
stepDownUntil,
force,
result));
if (!cbh.isOK()) {
*result = cbh.getStatus();
return;
}
allFinishedGuard.Dismiss();
}
void ReplicationCoordinatorImpl::_handleTimePassing(
const ReplicationExecutor::CallbackData& cbData) {
if (!cbData.status.isOK()) {
return;
}
if (_topCoord->becomeCandidateIfStepdownPeriodOverAndSingleNodeSet(_replExecutor.now())) {
_performPostMemberStateUpdateAction(kActionWinElection);
}
}
bool ReplicationCoordinatorImpl::isMasterForReportingPurposes() {
if (_settings.usingReplSets()) {
boost::lock_guard lock(_mutex);
if (_getReplicationMode_inlock() == modeReplSet &&
_getMemberState_inlock().primary()) {
return true;
}
return false;
}
if (!_settings.slave)
return true;
// TODO(dannenberg) replAllDead is bad and should be removed when master slave is removed
if (replAllDead) {
return false;
}
if (_settings.master) {
// if running with --master --slave, allow.
return true;
}
return false;
}
bool ReplicationCoordinatorImpl::canAcceptWritesForDatabase(StringData dbName) {
// _canAcceptNonLocalWrites is always true for standalone nodes, always false for nodes
// started with --slave, and adjusted based on primary+drain state in replica sets.
//
// That is, stand-alone nodes, non-slave nodes and drained replica set primaries can always
// accept writes. Similarly, writes are always permitted to the "local" database. Finally,
// in the event that a node is started with --slave and --master, we allow writes unless the
// master/slave system has set the replAllDead flag.
if (_canAcceptNonLocalWrites) {
return true;
}
if (dbName == "local") {
return true;
}
return !replAllDead && _settings.master;
}
Status ReplicationCoordinatorImpl::checkCanServeReadsFor(OperationContext* txn,
const NamespaceString& ns,
bool slaveOk) {
if (txn->getClient()->isInDirectClient()) {
return Status::OK();
}
if (canAcceptWritesForDatabase(ns.db())) {
return Status::OK();
}
if (_settings.slave || _settings.master) {
return Status::OK();
}
if (slaveOk) {
if (_canServeNonLocalReads.loadRelaxed()) {
return Status::OK();
}
return Status(
ErrorCodes::NotMasterOrSecondaryCode,
"not master or secondary; cannot currently read from this replSet member");
}
return Status(ErrorCodes::NotMasterNoSlaveOkCode, "not master and slaveOk=false");
}
bool ReplicationCoordinatorImpl::isInPrimaryOrSecondaryState() const {
return _canServeNonLocalReads.loadRelaxed();
}
bool ReplicationCoordinatorImpl::shouldIgnoreUniqueIndex(const IndexDescriptor* idx) {
if (!idx->unique()) {
return false;
}
// Never ignore _id index
if (idx->isIdIndex()) {
return false;
}
if (nsToDatabaseSubstring(idx->parentNS()) == "local" ) {
// always enforce on local
return false;
}
boost::lock_guard lock(_mutex);
if (_getReplicationMode_inlock() != modeReplSet) {
return false;
}
// see SERVER-6671
MemberState ms = _getMemberState_inlock();
switch ( ms.s ) {
case MemberState::RS_SECONDARY:
case MemberState::RS_RECOVERING:
case MemberState::RS_ROLLBACK:
case MemberState::RS_STARTUP2:
return true;
default:
return false;
}
}
OID ReplicationCoordinatorImpl::getElectionId() {
boost::lock_guard lock(_mutex);
return _electionId;
}
OID ReplicationCoordinatorImpl::getMyRID() const {
boost::lock_guard lock(_mutex);
return _getMyRID_inlock();
}
OID ReplicationCoordinatorImpl::_getMyRID_inlock() const {
return _myRID;
}
int ReplicationCoordinatorImpl::getMyId() const {
boost::lock_guard lock(_mutex);
return _getMyId_inlock();
}
int ReplicationCoordinatorImpl::_getMyId_inlock() const {
const MemberConfig& self = _rsConfig.getMemberAt(_selfIndex);
return self.getId();
}
bool ReplicationCoordinatorImpl::prepareReplSetUpdatePositionCommand(
BSONObjBuilder* cmdBuilder) {
boost::lock_guard lock(_mutex);
invariant(_rsConfig.isInitialized());
// do not send updates if we have been removed from the config
if (_selfIndex == -1) {
return false;
}
cmdBuilder->append("replSetUpdatePosition", 1);
// create an array containing objects each member connected to us and for ourself
BSONArrayBuilder arrayBuilder(cmdBuilder->subarrayStart("optimes"));
{
for (SlaveInfoVector::const_iterator itr = _slaveInfo.begin();
itr != _slaveInfo.end(); ++itr) {
if (itr->opTime.isNull()) {
// Don't include info on members we haven't heard from yet.
continue;
}
BSONObjBuilder entry(arrayBuilder.subobjStart());
entry.append("_id", itr->rid);
entry.append("optime", itr->opTime);
entry.append("memberId", itr->memberId);
entry.append("cfgver", _rsConfig.getConfigVersion());
// SERVER-14550 Even though the "config" field isn't used on the other end in 3.0,
// we need to keep sending it for 2.6 compatibility.
// TODO(spencer): Remove this after 3.0 is released.
const MemberConfig* member = _rsConfig.findMemberByID(itr->memberId);
fassert(18651, member);
entry.append("config", member->toBSON(_rsConfig.getTagConfig()));
}
}
return true;
}
Status ReplicationCoordinatorImpl::processReplSetGetStatus(BSONObjBuilder* response) {
Status result(ErrorCodes::InternalError, "didn't set status in prepareStatusResponse");
CBHStatus cbh = _replExecutor.scheduleWork(
stdx::bind(&TopologyCoordinator::prepareStatusResponse,
_topCoord.get(),
stdx::placeholders::_1,
_replExecutor.now(),
time(0) - serverGlobalParams.started,
getMyLastOptime(),
response,
&result));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress");
}
fassert(18640, cbh.getStatus());
_replExecutor.wait(cbh.getValue());
return result;
}
void ReplicationCoordinatorImpl::fillIsMasterForReplSet(IsMasterResponse* response) {
invariant(getSettings().usingReplSets());
CBHStatus cbh = _replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_fillIsMasterForReplSet_finish,
this,
stdx::placeholders::_1,
response));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
response->markAsShutdownInProgress();
return;
}
fassert(28602, cbh.getStatus());
_replExecutor.wait(cbh.getValue());
if (isWaitingForApplierToDrain()) {
// Report that we are secondary to ismaster callers until drain completes.
response->setIsMaster(false);
response->setIsSecondary(true);
}
}
void ReplicationCoordinatorImpl::_fillIsMasterForReplSet_finish(
const ReplicationExecutor::CallbackData& cbData, IsMasterResponse* response) {
if (cbData.status == ErrorCodes::CallbackCanceled) {
response->markAsShutdownInProgress();
return;
}
_topCoord->fillIsMasterForReplSet(response);
}
void ReplicationCoordinatorImpl::appendSlaveInfoData(BSONObjBuilder* result) {
boost::lock_guard lock(_mutex);
BSONArrayBuilder replicationProgress(result->subarrayStart("replicationProgress"));
{
for (SlaveInfoVector::const_iterator itr = _slaveInfo.begin();
itr != _slaveInfo.end(); ++itr) {
BSONObjBuilder entry(replicationProgress.subobjStart());
entry.append("rid", itr->rid);
entry.append("optime", itr->opTime);
entry.append("host", itr->hostAndPort.toString());
if (_getReplicationMode_inlock() == modeReplSet) {
if (_selfIndex == -1) {
continue;
}
invariant(itr->memberId >= 0);
entry.append("memberId", itr->memberId);
}
}
}
}
ReplicaSetConfig ReplicationCoordinatorImpl::getConfig() const {
boost::lock_guard lock(_mutex);
return _rsConfig;
}
void ReplicationCoordinatorImpl::processReplSetGetConfig(BSONObjBuilder* result) {
boost::lock_guard lock(_mutex);
result->append("config", _rsConfig.toBSON());
}
bool ReplicationCoordinatorImpl::getMaintenanceMode() {
bool maintenanceMode(false);
CBHStatus cbh = _replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_getMaintenanceMode_helper,
this,
stdx::placeholders::_1,
&maintenanceMode));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return false;
}
fassert(18811, cbh.getStatus());
_replExecutor.wait(cbh.getValue());
return maintenanceMode;
}
void ReplicationCoordinatorImpl::_getMaintenanceMode_helper(
const ReplicationExecutor::CallbackData& cbData,
bool* maintenanceMode) {
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
}
*maintenanceMode = _topCoord->getMaintenanceCount() > 0;
}
Status ReplicationCoordinatorImpl::setMaintenanceMode(bool activate) {
if (_getReplicationMode_inlock() != modeReplSet) {
return Status(ErrorCodes::NoReplicationEnabled,
"can only set maintenance mode on replica set members");
}
Status result(ErrorCodes::InternalError, "didn't set status in _setMaintenanceMode_helper");
CBHStatus cbh = _replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_setMaintenanceMode_helper,
this,
stdx::placeholders::_1,
activate,
&result));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return cbh.getStatus();
}
fassert(18698, cbh.getStatus());
_replExecutor.wait(cbh.getValue());
return result;
}
void ReplicationCoordinatorImpl::_setMaintenanceMode_helper(
const ReplicationExecutor::CallbackData& cbData,
bool activate,
Status* result) {
if (cbData.status == ErrorCodes::CallbackCanceled) {
*result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down");
return;
}
boost::unique_lock lk(_mutex);
if (_getMemberState_inlock().primary()) {
*result = Status(ErrorCodes::NotSecondary, "primaries can't modify maintenance mode");
return;
}
int curMaintenanceCalls = _topCoord->getMaintenanceCount();
if (activate) {
log() << "going into maintenance mode with " << curMaintenanceCalls
<< " other maintenance mode tasks in progress" << rsLog;
_topCoord->adjustMaintenanceCountBy(1);
}
else if (curMaintenanceCalls > 0) {
invariant(_topCoord->getRole() == TopologyCoordinator::Role::follower);
_topCoord->adjustMaintenanceCountBy(-1);
log() << "leaving maintenance mode (" << curMaintenanceCalls-1
<< " other maintenance mode tasks ongoing)" << rsLog;
} else {
warning() << "Attempted to leave maintenance mode but it is not currently active";
*result = Status(ErrorCodes::OperationFailed, "already out of maintenance mode");
return;
}
const PostMemberStateUpdateAction action =
_updateMemberStateFromTopologyCoordinator_inlock();
*result = Status::OK();
lk.unlock();
_performPostMemberStateUpdateAction(action);
}
Status ReplicationCoordinatorImpl::processReplSetSyncFrom(const HostAndPort& target,
BSONObjBuilder* resultObj) {
Status result(ErrorCodes::InternalError, "didn't set status in prepareSyncFromResponse");
CBHStatus cbh = _replExecutor.scheduleWork(
stdx::bind(&TopologyCoordinator::prepareSyncFromResponse,
_topCoord.get(),
stdx::placeholders::_1,
target,
_getMyLastOptime_inlock(),
resultObj,
&result));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress");
}
fassert(18649, cbh.getStatus());
_replExecutor.wait(cbh.getValue());
return result;
}
Status ReplicationCoordinatorImpl::processReplSetFreeze(int secs, BSONObjBuilder* resultObj) {
Status result(ErrorCodes::InternalError, "didn't set status in prepareFreezeResponse");
CBHStatus cbh = _replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_processReplSetFreeze_finish,
this,
stdx::placeholders::_1,
secs,
resultObj,
&result));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return cbh.getStatus();
}
fassert(18641, cbh.getStatus());
_replExecutor.wait(cbh.getValue());
return result;
}
void ReplicationCoordinatorImpl::_processReplSetFreeze_finish(
const ReplicationExecutor::CallbackData& cbData,
int secs,
BSONObjBuilder* response,
Status* result) {
if (cbData.status == ErrorCodes::CallbackCanceled) {
*result = Status(ErrorCodes::ShutdownInProgress, "replication system is shutting down");
return;
}
_topCoord->prepareFreezeResponse(_replExecutor.now(), secs, response);
if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) {
// If we just unfroze and ended our stepdown period and we are a one node replica set,
// the topology coordinator will have gone into the candidate role to signal that we
// need to elect ourself.
_performPostMemberStateUpdateAction(kActionWinElection);
}
*result = Status::OK();
}
Status ReplicationCoordinatorImpl::processHeartbeat(const ReplSetHeartbeatArgs& args,
ReplSetHeartbeatResponse* response) {
{
boost::lock_guard lock(_mutex);
if (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) {
return Status(ErrorCodes::NotYetInitialized,
"Received heartbeat while still initializing replication system");
}
}
Status result(ErrorCodes::InternalError, "didn't set status in prepareHeartbeatResponse");
CBHStatus cbh = _replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_processHeartbeatFinish,
this,
stdx::placeholders::_1,
args,
response,
&result));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress");
}
fassert(18508, cbh.getStatus());
_replExecutor.wait(cbh.getValue());
return result;
}
void ReplicationCoordinatorImpl::_processHeartbeatFinish(
const ReplicationExecutor::CallbackData& cbData,
const ReplSetHeartbeatArgs& args,
ReplSetHeartbeatResponse* response,
Status* outStatus) {
if (cbData.status == ErrorCodes::CallbackCanceled) {
*outStatus = Status(ErrorCodes::ShutdownInProgress, "Replication shutdown in progress");
return;
}
fassert(18910, cbData.status);
const Date_t now = _replExecutor.now();
*outStatus = _topCoord->prepareHeartbeatResponse(
now,
args,
_settings.ourSetName(),
getMyLastOptime(),
response);
if ((outStatus->isOK() || *outStatus == ErrorCodes::InvalidReplicaSetConfig) &&
_selfIndex < 0) {
// If this node does not belong to the configuration it knows about, send heartbeats
// back to any node that sends us a heartbeat, in case one of those remote nodes has
// a configuration that contains us. Chances are excellent that it will, since that
// is the only reason for a remote node to send this node a heartbeat request.
if (!args.getSenderHost().empty() && _seedList.insert(args.getSenderHost()).second) {
_scheduleHeartbeatToTarget(args.getSenderHost(), -1, now);
}
}
}
Status ReplicationCoordinatorImpl::processReplSetReconfig(OperationContext* txn,
const ReplSetReconfigArgs& args,
BSONObjBuilder* resultObj) {
log() << "replSetReconfig admin command received from client";
boost::unique_lock lk(_mutex);
while (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) {
_rsConfigStateChange.wait(lk);
}
switch (_rsConfigState) {
case kConfigSteady:
break;
case kConfigUninitialized:
return Status(ErrorCodes::NotYetInitialized,
"Node not yet initialized; use the replSetInitiate command");
case kConfigReplicationDisabled:
invariant(false); // should be unreachable due to !_settings.usingReplSets() check above
case kConfigInitiating:
case kConfigReconfiguring:
case kConfigHBReconfiguring:
return Status(ErrorCodes::ConfigurationInProgress,
"Cannot run replSetReconfig because the node is currently updating "
"its configuration");
default:
severe() << "Unexpected _rsConfigState " << int(_rsConfigState);
fassertFailed(18914);
}
invariant(_rsConfig.isInitialized());
if (!args.force && !_getMemberState_inlock().primary()) {
return Status(ErrorCodes::NotMaster, str::stream() <<
"replSetReconfig should only be run on PRIMARY, but my state is " <<
_getMemberState_inlock().toString() <<
"; use the \"force\" argument to override");
}
_setConfigState_inlock(kConfigReconfiguring);
ScopeGuard configStateGuard = MakeGuard(
lockAndCall,
&lk,
stdx::bind(&ReplicationCoordinatorImpl::_setConfigState_inlock,
this,
kConfigSteady));
ReplicaSetConfig oldConfig = _rsConfig;
lk.unlock();
ReplicaSetConfig newConfig;
BSONObj newConfigObj = args.newConfigObj;
if (args.force) {
newConfigObj = incrementConfigVersionByRandom(newConfigObj);
}
Status status = newConfig.initialize(newConfigObj);
if (!status.isOK()) {
error() << "replSetReconfig got " << status << " while parsing " << newConfigObj;
return Status(ErrorCodes::InvalidReplicaSetConfig, status.reason());;
}
if (newConfig.getReplSetName() != _settings.ourSetName()) {
str::stream errmsg;
errmsg << "Attempting to reconfigure a replica set with name " <<
newConfig.getReplSetName() << ", but command line reports " <<
_settings.ourSetName() << "; rejecting";
error() << std::string(errmsg);
return Status(ErrorCodes::InvalidReplicaSetConfig, errmsg);
}
StatusWith myIndex = validateConfigForReconfig(
_externalState.get(),
oldConfig,
newConfig,
args.force);
if (!myIndex.isOK()) {
error() << "replSetReconfig got " << myIndex.getStatus() << " while validating " <<
newConfigObj;
return Status(ErrorCodes::NewReplicaSetConfigurationIncompatible,
myIndex.getStatus().reason());
}
log() << "replSetReconfig config object with " << newConfig.getNumMembers() <<
" members parses ok";
if (!args.force) {
status = checkQuorumForReconfig(&_replExecutor,
newConfig,
myIndex.getValue());
if (!status.isOK()) {
error() << "replSetReconfig failed; " << status;
return status;
}
}
status = _externalState->storeLocalConfigDocument(txn, newConfig.toBSON());
if (!status.isOK()) {
error() << "replSetReconfig failed to store config document; " << status;
return status;
}
const stdx::function reconfigFinishFn(
stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetReconfig,
this,
stdx::placeholders::_1,
newConfig,
myIndex.getValue()));
// If it's a force reconfig, the primary node may not be electable after the configuration
// change. In case we are that primary node, finish the reconfig under the global lock,
// so that the step down occurs safely.
CBHStatus cbh =
args.force ?
_replExecutor.scheduleWorkWithGlobalExclusiveLock(reconfigFinishFn) :
_replExecutor.scheduleWork(reconfigFinishFn);
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return status;
}
fassert(18824, cbh.getStatus());
configStateGuard.Dismiss();
_replExecutor.wait(cbh.getValue());
return Status::OK();
}
void ReplicationCoordinatorImpl::_finishReplSetReconfig(
const ReplicationExecutor::CallbackData& cbData,
const ReplicaSetConfig& newConfig,
int myIndex) {
boost::unique_lock lk(_mutex);
invariant(_rsConfigState == kConfigReconfiguring);
invariant(_rsConfig.isInitialized());
const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(newConfig, myIndex);
lk.unlock();
_performPostMemberStateUpdateAction(action);
}
Status ReplicationCoordinatorImpl::processReplSetInitiate(OperationContext* txn,
const BSONObj& configObj,
BSONObjBuilder* resultObj) {
log() << "replSetInitiate admin command received from client";
boost::unique_lock lk(_mutex);
if (!_settings.usingReplSets()) {
return Status(ErrorCodes::NoReplicationEnabled, "server is not running with --replSet");
}
while (_rsConfigState == kConfigPreStart || _rsConfigState == kConfigStartingUp) {
_rsConfigStateChange.wait(lk);
}
if (_rsConfigState != kConfigUninitialized) {
resultObj->append("info",
"try querying local.system.replset to see current configuration");
return Status(ErrorCodes::AlreadyInitialized, "already initialized");
}
invariant(!_rsConfig.isInitialized());
_setConfigState_inlock(kConfigInitiating);
ScopeGuard configStateGuard = MakeGuard(
lockAndCall,
&lk,
stdx::bind(&ReplicationCoordinatorImpl::_setConfigState_inlock,
this,
kConfigUninitialized));
lk.unlock();
ReplicaSetConfig newConfig;
Status status = newConfig.initialize(configObj);
if (!status.isOK()) {
error() << "replSet initiate got " << status << " while parsing " << configObj;
return Status(ErrorCodes::InvalidReplicaSetConfig, status.reason());;
}
if (newConfig.getReplSetName() != _settings.ourSetName()) {
str::stream errmsg;
errmsg << "Attempting to initiate a replica set with name " <<
newConfig.getReplSetName() << ", but command line reports " <<
_settings.ourSetName() << "; rejecting";
error() << std::string(errmsg);
return Status(ErrorCodes::InvalidReplicaSetConfig, errmsg);
}
StatusWith myIndex = validateConfigForInitiate(_externalState.get(), newConfig);
if (!myIndex.isOK()) {
error() << "replSet initiate got " << myIndex.getStatus() << " while validating " <<
configObj;
return Status(ErrorCodes::InvalidReplicaSetConfig, myIndex.getStatus().reason());
}
log() << "replSetInitiate config object with " << newConfig.getNumMembers() <<
" members parses ok";
status = checkQuorumForInitiate(
&_replExecutor,
newConfig,
myIndex.getValue());
if (!status.isOK()) {
error() << "replSetInitiate failed; " << status;
return status;
}
status = _externalState->storeLocalConfigDocument(txn, newConfig.toBSON());
if (!status.isOK()) {
error() << "replSetInitiate failed to store config document; " << status;
return status;
}
CBHStatus cbh = _replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_finishReplSetInitiate,
this,
stdx::placeholders::_1,
newConfig,
myIndex.getValue()));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return status;
}
configStateGuard.Dismiss();
fassert(18654, cbh.getStatus());
_replExecutor.wait(cbh.getValue());
if (status.isOK()) {
// Create the oplog with the first entry, and start repl threads.
_externalState->initiateOplog(txn);
_externalState->startThreads();
}
return status;
}
void ReplicationCoordinatorImpl::_finishReplSetInitiate(
const ReplicationExecutor::CallbackData& cbData,
const ReplicaSetConfig& newConfig,
int myIndex) {
boost::unique_lock lk(_mutex);
invariant(_rsConfigState == kConfigInitiating);
invariant(!_rsConfig.isInitialized());
const PostMemberStateUpdateAction action = _setCurrentRSConfig_inlock(newConfig, myIndex);
lk.unlock();
_performPostMemberStateUpdateAction(action);
}
void ReplicationCoordinatorImpl::_setConfigState_inlock(ConfigState newState) {
if (newState != _rsConfigState) {
_rsConfigState = newState;
_rsConfigStateChange.notify_all();
}
}
ReplicationCoordinatorImpl::PostMemberStateUpdateAction
ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() {
const MemberState newState = _topCoord->getMemberState();
if (newState == _memberState) {
if (_topCoord->getRole() == TopologyCoordinator::Role::candidate) {
invariant(_rsConfig.getNumMembers() == 1 &&
_selfIndex == 0 &&
_rsConfig.getMemberAt(0).isElectable());
return kActionWinElection;
}
return kActionNone;
}
PostMemberStateUpdateAction result;
if (_memberState.primary() || newState.removed()) {
// Wake up any threads blocked in awaitReplication, close connections, etc.
for (std::vector::iterator it = _replicationWaiterList.begin();
it != _replicationWaiterList.end(); ++it) {
WaiterInfo* info = *it;
info->master = false;
info->condVar->notify_all();
}
_isWaitingForDrainToComplete = false;
_canAcceptNonLocalWrites = false;
result = kActionCloseAllConnections;
}
else {
if (_memberState.secondary() && !newState.primary()) {
// Switching out of SECONDARY, but not to PRIMARY.
_canServeNonLocalReads.store(0U);
}
else if (newState.secondary()) {
// Switching into SECONDARY, but not from PRIMARY.
_canServeNonLocalReads.store(1U);
}
result = kActionChooseNewSyncSource;
}
if (newState.secondary() && _topCoord->getRole() == TopologyCoordinator::Role::candidate) {
// When transitioning to SECONDARY, the only way for _topCoord to report the candidate
// role is if the configuration represents a single-node replica set. In that case, the
// overriding requirement is to elect this singleton node primary.
invariant(_rsConfig.getNumMembers() == 1 &&
_selfIndex == 0 &&
_rsConfig.getMemberAt(0).isElectable());
result = kActionWinElection;
}
_memberState = newState;
log() << "transition to " << newState.toString() << rsLog;
return result;
}
void ReplicationCoordinatorImpl::_performPostMemberStateUpdateAction(
PostMemberStateUpdateAction action) {
switch (action) {
case kActionNone:
break;
case kActionChooseNewSyncSource:
_externalState->signalApplierToChooseNewSyncSource();
break;
case kActionCloseAllConnections:
_externalState->closeConnections();
_externalState->clearShardingState();
break;
case kActionWinElection: {
boost::unique_lock lk(_mutex);
_electionId = OID::gen();
_topCoord->processWinElection(_electionId, getNextGlobalTimestamp());
_isWaitingForDrainToComplete = true;
const PostMemberStateUpdateAction nextAction =
_updateMemberStateFromTopologyCoordinator_inlock();
invariant(nextAction != kActionWinElection);
lk.unlock();
_performPostMemberStateUpdateAction(nextAction);
break;
}
default:
severe() << "Unknown post member state update action " << static_cast(action);
fassertFailed(26010);
}
}
Status ReplicationCoordinatorImpl::processReplSetGetRBID(BSONObjBuilder* resultObj) {
boost::lock_guard lk(_mutex);
resultObj->append("rbid", _rbid);
return Status::OK();
}
void ReplicationCoordinatorImpl::incrementRollbackID() {
boost::lock_guard lk(_mutex);
++_rbid;
}
Status ReplicationCoordinatorImpl::processReplSetFresh(const ReplSetFreshArgs& args,
BSONObjBuilder* resultObj) {
Status result(ErrorCodes::InternalError, "didn't set status in prepareFreshResponse");
CBHStatus cbh = _replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_processReplSetFresh_finish,
this,
stdx::placeholders::_1,
args,
resultObj,
&result));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress");
}
fassert(18652, cbh.getStatus());
_replExecutor.wait(cbh.getValue());
return result;
}
void ReplicationCoordinatorImpl::_processReplSetFresh_finish(
const ReplicationExecutor::CallbackData& cbData,
const ReplSetFreshArgs& args,
BSONObjBuilder* response,
Status* result) {
if (cbData.status == ErrorCodes::CallbackCanceled) {
*result = Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress");
return;
}
_topCoord->prepareFreshResponse(
args, _replExecutor.now(), getMyLastOptime(), response, result);
}
Status ReplicationCoordinatorImpl::processReplSetElect(const ReplSetElectArgs& args,
BSONObjBuilder* responseObj) {
Status result = Status(ErrorCodes::InternalError, "status not set by callback");
CBHStatus cbh = _replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_processReplSetElect_finish,
this,
stdx::placeholders::_1,
args,
responseObj,
&result));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress");
}
fassert(18657, cbh.getStatus());
_replExecutor.wait(cbh.getValue());
return result;
}
void ReplicationCoordinatorImpl::_processReplSetElect_finish(
const ReplicationExecutor::CallbackData& cbData,
const ReplSetElectArgs& args,
BSONObjBuilder* response,
Status* result) {
if (cbData.status == ErrorCodes::CallbackCanceled) {
*result = Status(ErrorCodes::ShutdownInProgress, "replication shutdown in progress");
return;
}
_topCoord->prepareElectResponse(
args, _replExecutor.now(), getMyLastOptime(), response, result);
}
ReplicationCoordinatorImpl::PostMemberStateUpdateAction
ReplicationCoordinatorImpl::_setCurrentRSConfig_inlock(
const ReplicaSetConfig& newConfig,
int myIndex) {
invariant(_settings.usingReplSets());
_cancelHeartbeats();
_setConfigState_inlock(kConfigSteady);
Timestamp myOptime = _getMyLastOptime_inlock(); // Must get this before changing our config.
_topCoord->updateConfig(
newConfig,
myIndex,
_replExecutor.now(),
myOptime);
_rsConfig = newConfig;
log() << "New replica set config in use: " << _rsConfig.toBSON() << rsLog;
_selfIndex = myIndex;
if (_selfIndex >= 0) {
log() << "This node is " <<
_rsConfig.getMemberAt(_selfIndex).getHostAndPort() << " in the config";
}
else {
log() << "This node is not a member of the config";
}
const PostMemberStateUpdateAction action =
_updateMemberStateFromTopologyCoordinator_inlock();
_updateSlaveInfoFromConfig_inlock();
if (_selfIndex >= 0) {
// Don't send heartbeats if we're not in the config, if we get re-added one of the
// nodes in the set will contact us.
_startHeartbeats();
}
_wakeReadyWaiters_inlock();
return action;
}
void ReplicationCoordinatorImpl::_wakeReadyWaiters_inlock(){
for (std::vector::iterator it = _replicationWaiterList.begin();
it != _replicationWaiterList.end(); ++it) {
WaiterInfo* info = *it;
if (_doneWaitingForReplication_inlock(*info->opTime, *info->writeConcern)) {
info->condVar->notify_all();
}
}
}
Status ReplicationCoordinatorImpl::processReplSetUpdatePosition(
const UpdatePositionArgs& updates, long long* configVersion) {
boost::unique_lock lock(_mutex);
Status status = Status::OK();
bool somethingChanged = false;
for (UpdatePositionArgs::UpdateIterator update = updates.updatesBegin();
update != updates.updatesEnd();
++update) {
status = _setLastOptime_inlock(*update, configVersion);
if (!status.isOK()) {
break;
}
somethingChanged = true;
}
if (somethingChanged && !_getMemberState_inlock().primary()) {
lock.unlock();
_externalState->forwardSlaveProgress(); // Must do this outside _mutex
}
return status;
}
Status ReplicationCoordinatorImpl::processHandshake(OperationContext* txn,
const HandshakeArgs& handshake) {
LOG(2) << "Received handshake " << handshake.toBSON();
boost::unique_lock lock(_mutex);
if (_getReplicationMode_inlock() != modeMasterSlave) {
return Status(ErrorCodes::IllegalOperation,
"The handshake command is only used for master/slave replication");
}
SlaveInfo* slaveInfo = _findSlaveInfoByRID_inlock(handshake.getRid());
if (slaveInfo) {
return Status::OK(); // nothing to do
}
SlaveInfo newSlaveInfo;
newSlaveInfo.rid = handshake.getRid();
newSlaveInfo.memberId = -1;
newSlaveInfo.hostAndPort = _externalState->getClientHostAndPort(txn);
// Don't call _addSlaveInfo_inlock as that would wake sleepers unnecessarily.
_slaveInfo.push_back(newSlaveInfo);
return Status::OK();
}
bool ReplicationCoordinatorImpl::buildsIndexes() {
boost::lock_guard lk(_mutex);
if (_selfIndex == -1) {
return true;
}
const MemberConfig& self = _rsConfig.getMemberAt(_selfIndex);
return self.shouldBuildIndexes();
}
std::vector ReplicationCoordinatorImpl::getHostsWrittenTo(const Timestamp& op) {
std::vector hosts;
boost::lock_guard lk(_mutex);
for (size_t i = 0; i < _slaveInfo.size(); ++i) {
const SlaveInfo& slaveInfo = _slaveInfo[i];
if (slaveInfo.opTime < op) {
continue;
}
if (_getReplicationMode_inlock() == modeMasterSlave &&
slaveInfo.rid == _getMyRID_inlock()) {
// Master-slave doesn't know the HostAndPort for itself at this point.
continue;
}
hosts.push_back(slaveInfo.hostAndPort);
}
return hosts;
}
std::vector ReplicationCoordinatorImpl::getOtherNodesInReplSet() const {
boost::lock_guard lk(_mutex);
invariant(_settings.usingReplSets());
std::vector nodes;
if (_selfIndex == -1) {
return nodes;
}
for (int i = 0; i < _rsConfig.getNumMembers(); ++i) {
if (i == _selfIndex)
continue;
nodes.push_back(_rsConfig.getMemberAt(i).getHostAndPort());
}
return nodes;
}
Status ReplicationCoordinatorImpl::checkIfWriteConcernCanBeSatisfied(
const WriteConcernOptions& writeConcern) const {
boost::lock_guard lock(_mutex);
return _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern);
}
Status ReplicationCoordinatorImpl::_checkIfWriteConcernCanBeSatisfied_inlock(
const WriteConcernOptions& writeConcern) const {
if (_getReplicationMode_inlock() == modeNone) {
return Status(ErrorCodes::NoReplicationEnabled,
"No replication enabled when checking if write concern can be satisfied");
}
if (_getReplicationMode_inlock() == modeMasterSlave) {
if (!writeConcern.wMode.empty()) {
return Status(ErrorCodes::UnknownReplWriteConcern,
"Cannot use named write concern modes in master-slave");
}
// No way to know how many slaves there are, so assume any numeric mode is possible.
return Status::OK();
}
invariant(_getReplicationMode_inlock() == modeReplSet);
return _rsConfig.checkIfWriteConcernCanBeSatisfied(writeConcern);
}
WriteConcernOptions ReplicationCoordinatorImpl::getGetLastErrorDefault() {
boost::lock_guard lock(_mutex);
if (_rsConfig.isInitialized()) {
return _rsConfig.getDefaultWriteConcern();
}
return WriteConcernOptions();
}
Status ReplicationCoordinatorImpl::checkReplEnabledForCommand(BSONObjBuilder* result) {
if (!_settings.usingReplSets()) {
if (serverGlobalParams.configsvr) {
result->append("info", "configsvr"); // for shell prompt
}
return Status(ErrorCodes::NoReplicationEnabled, "not running with --replSet");
}
if (getMemberState().startup()) {
result->append("info", "run rs.initiate(...) if not yet done for the set");
return Status(ErrorCodes::NotYetInitialized, "no replset config has been received");
}
return Status::OK();
}
bool ReplicationCoordinatorImpl::isReplEnabled() const {
return getReplicationMode() != modeNone;
}
void ReplicationCoordinatorImpl::_chooseNewSyncSource(
const ReplicationExecutor::CallbackData& cbData,
HostAndPort* newSyncSource) {
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
}
*newSyncSource = _topCoord->chooseNewSyncSource(_replExecutor.now(), getMyLastOptime());
}
HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource() {
HostAndPort newSyncSource;
CBHStatus cbh = _replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_chooseNewSyncSource,
this,
stdx::placeholders::_1,
&newSyncSource));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return newSyncSource; // empty
}
fassert(18740, cbh.getStatus());
_replExecutor.wait(cbh.getValue());
return newSyncSource;
}
void ReplicationCoordinatorImpl::_blacklistSyncSource(
const ReplicationExecutor::CallbackData& cbData,
const HostAndPort& host,
Date_t until) {
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
}
_topCoord->blacklistSyncSource(host, until);
CBHStatus cbh = _replExecutor.scheduleWorkAt(
until,
stdx::bind(&ReplicationCoordinatorImpl::_unblacklistSyncSource,
this,
stdx::placeholders::_1,
host));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
fassert(28610, cbh.getStatus());
}
void ReplicationCoordinatorImpl::_unblacklistSyncSource(
const ReplicationExecutor::CallbackData& cbData,
const HostAndPort& host) {
if (cbData.status == ErrorCodes::CallbackCanceled)
return;
_topCoord->unblacklistSyncSource(host, _replExecutor.now());
}
void ReplicationCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) {
CBHStatus cbh = _replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_blacklistSyncSource,
this,
stdx::placeholders::_1,
host,
until));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return;
}
fassert(18741, cbh.getStatus());
_replExecutor.wait(cbh.getValue());
}
void ReplicationCoordinatorImpl::resetLastOpTimeFromOplog(OperationContext* txn) {
StatusWith lastOpTimeStatus = _externalState->loadLastOpTime(txn);
Timestamp lastOpTime(0, 0);
if (!lastOpTimeStatus.isOK()) {
warning() << "Failed to load timestamp of most recently applied operation; " <<
lastOpTimeStatus.getStatus();
}
else {
lastOpTime = lastOpTimeStatus.getValue();
}
boost::unique_lock lk(_mutex);
_setMyLastOptime_inlock(&lk, lastOpTime, true);
_externalState->setGlobalTimestamp(lastOpTime);
}
void ReplicationCoordinatorImpl::_shouldChangeSyncSource(
const ReplicationExecutor::CallbackData& cbData,
const HostAndPort& currentSource,
bool* shouldChange) {
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
}
*shouldChange = _topCoord->shouldChangeSyncSource(currentSource, _replExecutor.now());
}
bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentSource) {
bool shouldChange(false);
CBHStatus cbh = _replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_shouldChangeSyncSource,
this,
stdx::placeholders::_1,
currentSource,
&shouldChange));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return false;
}
fassert(18906, cbh.getStatus());
_replExecutor.wait(cbh.getValue());
return shouldChange;
}
void ReplicationCoordinatorImpl::_updateLastCommittedOpTime_inlock() {
if (!_getMemberState_inlock().primary()) {
return;
}
StatusWith tagPattern =
_rsConfig.findCustomWriteMode(ReplicaSetConfig::kMajorityWriteConcernModeName);
invariant(tagPattern.isOK());
ReplicaSetTagMatch matcher{tagPattern.getValue()};
std::vector votingNodesOpTimes;
for (const auto& sI : _slaveInfo) {
auto memberConfig = _rsConfig.findMemberByID(sI.memberId);
invariant(memberConfig);
for (auto tagIt = memberConfig->tagsBegin();
tagIt != memberConfig->tagsEnd(); ++tagIt) {
if (matcher.update(*tagIt)) {
votingNodesOpTimes.push_back(sI.opTime);
break;
}
}
}
invariant(votingNodesOpTimes.size() > 0);
std::sort(votingNodesOpTimes.begin(), votingNodesOpTimes.end());
// Use the index of the minimum quorum in the vector of nodes.
_lastCommittedOpTime = votingNodesOpTimes[(votingNodesOpTimes.size() - 1) / 2];
}
Timestamp ReplicationCoordinatorImpl::getLastCommittedOpTime() const {
boost::unique_lock lk(_mutex);
return _lastCommittedOpTime;
}
} // namespace repl
} // namespace mongo