/**
* Copyright (C) 2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
#include "mongo/platform/basic.h"
#include "mongo/db/repl/repl_set_impl.h"
#include "mongo/db/client.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/global_environment_experiment.h"
#include "mongo/db/index_rebuilder.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/connections.h"
#include "mongo/db/repl/isself.h"
#include "mongo/db/repl/minvalid.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplogreader.h"
#include "mongo/db/repl/repl_set_seed_list.h"
#include "mongo/db/repl/repl_coordinator_global.h"
#include "mongo/db/repl/rslog.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/s/d_state.h"
#include "mongo/util/background.h"
#include "mongo/util/exit.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
namespace repl {
void ReplSetImpl::sethbmsg(const std::string& s, int logLevel) {
static time_t lastLogged;
_hbmsgTime = time(0);
if (s == _hbmsg) {
// unchanged
if (_hbmsgTime - lastLogged < 60)
return;
}
unsigned sz = s.size();
if (sz >= 256)
memcpy(_hbmsg, s.c_str(), 255);
else {
_hbmsg[sz] = 0;
memcpy(_hbmsg, s.c_str(), sz);
}
if (!s.empty()) {
lastLogged = _hbmsgTime;
LOG(logLevel) << "replSet " << s << rsLog;
}
}
void ReplSetImpl::goStale(OperationContext* txn, const Member* stale, const BSONObj& oldest) {
log() << "replSet error RS102 too stale to catch up, at least from "
<< stale->fullName() << rsLog;
log() << "replSet our last optime : " << lastOpTimeWritten.toStringLong() << rsLog;
log() << "replSet oldest at " << stale->fullName() << " : "
<< oldest["ts"]._opTime().toStringLong() << rsLog;
log() << "replSet See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember"
<< rsLog;
// reset minvalid so that we can't become primary prematurely
setMinValid(txn, oldest["ts"]._opTime());
sethbmsg("error RS102 too stale to catch up");
changeState(MemberState::RS_RECOVERING);
}
namespace {
static void dropAllTempCollections(OperationContext* txn) {
vector dbNames;
StorageEngine* storageEngine = getGlobalEnvironment()->getGlobalStorageEngine();
storageEngine->listDatabases( &dbNames );
for (vector::const_iterator it = dbNames.begin(); it != dbNames.end(); ++it) {
// The local db is special because it isn't replicated. It is cleared at startup even on
// replica set members.
if (*it == "local")
continue;
Client::Context ctx(txn, *it);
ctx.db()->clearTmpCollections(txn);
}
}
}
void ReplSetImpl::_assumePrimary() {
LOG(1) << "replSet assuming primary" << endl;
verify(iAmPotentiallyHot());
// Wait for replication to stop and buffer to be consumed
LOG(1) << "replSet waiting for replication to finish before becoming primary" << endl;
BackgroundSync::get()->stopReplicationAndFlushBuffer();
// Lock here to prevent stepping down & becoming primary from getting interleaved
LOG(1) << "replSet waiting for global write lock";
OperationContextImpl txn; // XXX?
Lock::GlobalWrite lk(txn.lockState());
initOpTimeFromOplog(&txn, "local.oplog.rs");
// Generate new election unique id
elect.setElectionId(OID::gen());
LOG(1) << "replSet truly becoming primary";
changeState(MemberState::RS_PRIMARY);
// This must be done after becoming primary but before releasing the write lock. This adds
// the dropCollection entries for every temp collection to the opLog since we want it to be
// replicated to secondaries.
dropAllTempCollections(&txn);
}
void ReplSetImpl::changeState(MemberState s) { box.change(s, _self); }
Member* ReplSetImpl::getMostElectable() {
lock lk(this);
Member *max = 0;
set::iterator it = _electableSet.begin();
while (it != _electableSet.end()) {
const Member *temp = findById(*it);
if (!temp) {
log() << "couldn't find member: " << *it << endl;
set::iterator it_delete = it;
it++;
_electableSet.erase(it_delete);
continue;
}
if (!max || max->config().priority < temp->config().priority) {
max = (Member*)temp;
}
it++;
}
return max;
}
void ReplSetImpl::relinquish(OperationContext* txn) {
{
Lock::GlobalWrite writeLock(txn->lockState()); // so we are synchronized with _logOp()
LOG(2) << "replSet attempting to relinquish" << endl;
if (box.getState().primary()) {
log() << "replSet relinquishing primary state" << rsLog;
changeState(MemberState::RS_SECONDARY);
// close sockets that were talking to us so they don't blithly send many writes that
// will fail with "not master" (of course client could check result code, but in
// case they are not)
log() << "replSet closing client sockets after relinquishing primary" << rsLog;
MessagingPort::closeAllSockets(ScopedConn::keepOpen);
}
else if (box.getState().startup2()) {
// This block probably isn't necessary
changeState(MemberState::RS_RECOVERING);
return;
}
}
// now that all connections were closed, strip this mongod from all sharding details if and
// when it gets promoted to a primary again, only then it should reload the sharding state
// the rationale here is that this mongod won't bring stale state when it regains
// primaryhood
shardingState.resetShardingState();
}
// look freshly for who is primary - includes relinquishing ourself.
void ReplSetImpl::forgetPrimary(OperationContext* txn) {
if (box.getState().primary())
relinquish(txn);
else {
box.setOtherPrimary(0);
}
}
// for the replSetStepDown command
bool ReplSetImpl::_stepDown(OperationContext* txn, int secs) {
lock lk(this);
if (box.getState().primary()) {
elect.steppedDown = time(0) + secs;
log() << "replSet info stepping down as primary secs=" << secs << rsLog;
relinquish(txn);
return true;
}
return false;
}
bool ReplSetImpl::_freeze(int secs) {
lock lk(this);
// note if we are primary we remain primary but won't try to elect ourself again until
// this time period expires.
if (secs == 0) {
elect.steppedDown = 0;
log() << "replSet info 'unfreezing'" << rsLog;
}
else {
if (!box.getState().primary()) {
elect.steppedDown = time(0) + secs;
log() << "replSet info 'freezing' for " << secs << " seconds" << rsLog;
}
else {
log() << "replSet info received freeze command but we are primary" << rsLog;
}
}
return true;
}
void ReplSetImpl::msgUpdateHBInfo(HeartbeatInfo h) {
for (Member *m = _members.head(); m; m=m->next()) {
if (static_cast(m->id()) == h.id()) {
m->_hbinfo.updateFromLastPoll(h);
return;
}
}
}
void ReplSetImpl::msgUpdateHBRecv(unsigned id, time_t newTime) {
for (Member *m = _members.head(); m; m = m->next()) {
if (m->id() == id) {
m->_hbinfo.lastHeartbeatRecv = newTime;
return;
}
}
}
list ReplSetImpl::memberHostnames() const {
list L;
L.push_back(_self->h());
for (Member *m = _members.head(); m; m = m->next())
L.push_back(m->h());
return L;
}
void ReplSetImpl::_fillIsMasterHost(const Member *m,
vector& hosts,
vector& passives,
vector& arbiters) {
verify(m);
if (m->config().hidden)
return;
if (m->potentiallyHot()) {
hosts.push_back(m->h().toString());
}
else if (!m->config().arbiterOnly) {
if (m->config().slaveDelay) {
// hmmm - we don't list these as they are stale.
}
else {
passives.push_back(m->h().toString());
}
}
else {
arbiters.push_back(m->h().toString());
}
}
void ReplSetImpl::_fillIsMaster(BSONObjBuilder& b) {
lock lk(this);
const StateBox::SP sp = box.get();
bool isp = sp.state.primary();
b.append("setName", name());
b.append("setVersion", version());
b.append("ismaster", isp);
b.append("secondary", sp.state.secondary());
{
vector hosts, passives, arbiters;
_fillIsMasterHost(_self, hosts, passives, arbiters);
for (Member *m = _members.head(); m; m = m->next()) {
verify(m);
_fillIsMasterHost(m, hosts, passives, arbiters);
}
if (hosts.size() > 0) {
b.append("hosts", hosts);
}
if (passives.size() > 0) {
b.append("passives", passives);
}
if (arbiters.size() > 0) {
b.append("arbiters", arbiters);
}
}
if (!isp) {
const Member *m = sp.primary;
if (m)
b.append("primary", m->h().toString());
}
else {
b.append("primary", _self->fullName());
}
if (myConfig().arbiterOnly)
b.append("arbiterOnly", true);
if (myConfig().priority == 0 && !myConfig().arbiterOnly)
b.append("passive", true);
if (myConfig().slaveDelay)
b.append("slaveDelay", myConfig().slaveDelay);
if (myConfig().hidden)
b.append("hidden", true);
if (!myConfig().buildIndexes)
b.append("buildIndexes", false);
if (!myConfig().tags.empty()) {
BSONObjBuilder a;
for (map::const_iterator i = myConfig().tags.begin();
i != myConfig().tags.end();
i++) {
a.append((*i).first, (*i).second);
}
b.append("tags", a.done());
}
b.append("me", myConfig().h.toString());
}
void ReplSetImpl::init(OperationContext* txn, ReplSetSeedList& replSetSeedList) {
mgr = new Manager(this);
_cfg = 0;
memset(_hbmsg, 0, sizeof(_hbmsg));
strcpy(_hbmsg , "initial startup");
changeState(MemberState::RS_STARTUP);
_seeds = &replSetSeedList.seeds;
LOG(1) << "replSet beginning startup..." << rsLog;
loadConfig(txn);
unsigned sss = replSetSeedList.seedSet.size();
for (Member *m = head(); m; m = m->next()) {
replSetSeedList.seedSet.erase(m->h());
}
for (set::iterator i = replSetSeedList.seedSet.begin();
i != replSetSeedList.seedSet.end();
i++) {
if (isSelf(*i)) {
if (sss == 1) {
LOG(1) << "replSet warning self is listed in the seed list and there are no "
"other seeds listed did you intend that?" << rsLog;
}
}
else {
log() << "replSet warning command line seed " << i->toString()
<< " is not present in the current repl set config" << rsLog;
}
}
}
ReplSetImpl::ReplSetImpl() :
elect(this),
_forceSyncTarget(0),
_hbmsgTime(0),
_self(0),
_maintenanceMode(0),
mgr(0) {
}
void ReplSetImpl::loadLastOpTimeWritten(OperationContext* txn, bool quiet) {
Lock::DBRead lk(txn->lockState(), rsoplog);
BSONObj o;
if (Helpers::getLast(txn, rsoplog, o)) {
OpTime lastOpTime = o["ts"]._opTime();
uassert(13290, "bad replSet oplog entry?", quiet || !lastOpTime.isNull());
getGlobalReplicationCoordinator()->setMyLastOptime(txn, lastOpTime);
}
else {
getGlobalReplicationCoordinator()->setMyLastOptime(txn, OpTime());
}
}
// call after constructing to start - returns fairly quickly after launching its threads
void ReplSetImpl::_go() {
OperationContextImpl txn;
try {
// Note: this sets lastOpTimeWritten, which the Applier uses to determine whether to
// do an initial sync or not.
loadLastOpTimeWritten(&txn);
}
catch (std::exception& e) {
log() << "replSet error fatal couldn't query the local " << rsoplog
<< " collection. Terminating mongod after 30 seconds." << rsLog;
log() << e.what() << rsLog;
sleepsecs(30);
dbexit(EXIT_REPLICATION_ERROR);
return;
}
// initialize _me in SyncSourceFeedback
bool meEnsured = false;
while (!inShutdown() && !meEnsured) {
try {
syncSourceFeedback.ensureMe(&txn);
meEnsured = true;
}
catch (const DBException& e) {
warning() << "failed to write to local.me: " << e.what()
<< " trying again in one second";
sleepsecs(1);
}
}
getGlobalReplicationCoordinator()->setFollowerMode(MemberState::RS_STARTUP2);
startThreads();
newReplUp(); // oplog.cpp
}
void ReplSetImpl::setSelfTo(Member *m) {
// already locked in initFromConfig
_self = m;
_id = m->id();
_config = m->config();
if (m) _buildIndexes = m->config().buildIndexes;
else _buildIndexes = true;
}
// @param reconf true if this is a reconfiguration and not an initial load of the configuration.
// @return true if ok; throws if config really bad; false if config doesn't include self
bool ReplSetImpl::initFromConfig(OperationContext* txn, ReplSetConfig& c, bool reconf) {
// NOTE: haveNewConfig() writes the new config to disk before we get here. So
// we cannot error out at this point, except fatally. Check errors earlier.
lock lk(this);
if (!getLastErrorDefault.isEmpty() || !c.getLastErrorDefaults.isEmpty()) {
getLastErrorDefault = c.getLastErrorDefaults;
}
list newOnes;
// additive short-cuts the new config setup. If we are just adding a
// node/nodes and nothing else is changing, this is additive. If it's
// not a reconfig, we're not adding anything
bool additive = reconf;
bool updateConfigs = false;
{
unsigned nfound = 0;
int me = 0;
for (vector::iterator i = c.members.begin();
i != c.members.end();
i++) {
ReplSetConfig::MemberCfg& m = *i;
if (isSelf(m.h)) {
me++;
}
if (reconf) {
const Member *old = findById(m._id);
if (old) {
nfound++;
verify((int) old->id() == m._id);
if (!old->config().isSameIgnoringTags(m)) {
additive = false;
}
if (!updateConfigs && old->config() != m) {
updateConfigs = true;
}
}
else {
newOnes.push_back(&m);
}
}
}
if (me == 0) { // we're not in the config -- we must have been removed
if (state().removed()) {
// already took note of our ejection from the set
// so just sit tight and poll again
return false;
}
_members.orphanAll();
// kill off rsHealthPoll threads (because they Know Too Much about our past)
endOldHealthTasks();
// clear sync target to avoid faulty sync attempts; we must do this before we
// close sockets, since that will trigger the bgsync thread to reconnect.
BackgroundSync::get()->clearSyncTarget();
// close sockets to force clients to re-evaluate this member
MessagingPort::closeAllSockets(0);
// take note of our ejection
changeState(MemberState::RS_REMOVED);
// go into holding pattern
log() << "replSet info self not present in the repl set configuration:" << rsLog;
log() << c.toString() << rsLog;
loadConfig(txn); // redo config from scratch
return false;
}
uassert(13302, "replSet error self appears twice in the repl set configuration", me<=1);
if (state().removed()) {
// If we were removed and have now been added back in, switch state.
changeState(MemberState::RS_RECOVERING);
}
// if we found different members that the original config, reload everything
if (reconf && config().members.size() != nfound)
additive = false;
}
// If we are changing chaining rules, we don't want this to be an additive reconfig so that
// the primary can step down and the sync targets change.
// TODO: This can be removed once SERVER-5208 is fixed.
if (reconf && config().chainingAllowed() != c.chainingAllowed()) {
additive = false;
}
_cfg = new ReplSetConfig(c);
// config() is same thing but const, so we use that when we can for clarity below
dassert(&config() == _cfg);
verify(config().ok());
verify(_name.empty() || _name == config()._id);
_name = config()._id;
verify(!_name.empty());
// this is a shortcut for simple changes
if (additive) {
log() << "replSet info : additive change to configuration" << rsLog;
if (updateConfigs) {
// we have new configs for existing members, so we need to repopulate _members
// with the most recent configs
_members.orphanAll();
// for logging
string members = "";
// not setting _self to 0 as other threads use _self w/o locking
int me = 0;
for(vector::const_iterator i = config().members.begin();
i != config().members.end(); i++) {
const ReplSetConfig::MemberCfg& m = *i;
Member *mi;
members += (members == "" ? "" : ", ") + m.h.toString();
if (isSelf(m.h)) {
verify(me++ == 0);
mi = new Member(m.h, m._id, &m, true);
setSelfTo(mi);
}
else {
mi = new Member(m.h, m._id, &m, false);
_members.push(mi);
}
}
// trigger a handshake to update the syncSource of our writeconcern information
syncSourceFeedback.forwardSlaveHandshake();
}
// add any new members
for (list::const_iterator i = newOnes.begin();
i != newOnes.end();
i++) {
ReplSetConfig::MemberCfg *m = *i;
Member *mi = new Member(m->h, m->_id, m, false);
// we will indicate that new members are up() initially so that we don't relinquish
// our primary state because we can't (transiently) see a majority. they should be
// up as we check that new members are up before getting here on reconfig anyway.
mi->get_hbinfo().health = 0.1;
_members.push(mi);
startHealthTaskFor(mi);
}
// if we aren't creating new members, we may have to update the
// groups for the current ones
_cfg->updateMembers(_members);
return true;
}
// start with no members. if this is a reconfig, drop the old ones.
_members.orphanAll();
endOldHealthTasks();
int oldPrimaryId = -1;
{
const Member *p = box.getPrimary();
if (p)
oldPrimaryId = p->id();
}
forgetPrimary(txn);
// not setting _self to 0 as other threads use _self w/o locking
int me = 0;
// For logging
string members = "";
for (vector::const_iterator i = config().members.begin();
i != config().members.end();
i++) {
const ReplSetConfig::MemberCfg& m = *i;
Member *mi;
members += (members == "" ? "" : ", ") + m.h.toString();
if (isSelf(m.h)) {
verify(me++ == 0);
mi = new Member(m.h, m._id, &m, true);
if (!reconf) {
log() << "replSet I am " << m.h.toString() << rsLog;
}
setSelfTo(mi);
if ((int)mi->id() == oldPrimaryId)
box.setSelfPrimary(mi);
}
else {
mi = new Member(m.h, m._id, &m, false);
_members.push(mi);
if ((int)mi->id() == oldPrimaryId)
box.setOtherPrimary(mi);
}
}
if (me == 0){
log() << "replSet warning did not detect own host in full reconfig, members "
<< members << " config: " << c << rsLog;
}
else {
// Do this after we've found ourselves, since _self needs
// to be set before we can start the heartbeat tasks
for (Member *mb = _members.head(); mb; mb=mb->next()) {
startHealthTaskFor(mb);
}
}
return true;
}
// Our own config must be the first one.
bool ReplSetImpl::_loadConfigFinish(OperationContext* txn, vector& cfgs) {
int v = -1;
ReplSetConfig *highest = 0;
int myVersion = -2000;
int n = 0;
for (vector::iterator i = cfgs.begin(); i != cfgs.end(); i++) {
ReplSetConfig* cfg = *i;
DEV { LOG(1) << n+1 << " config shows version " << cfg->version << rsLog; }
if (++n == 1) myVersion = cfg->version;
if (cfg->ok() && cfg->version > v) {
highest = cfg;
v = cfg->version;
}
}
verify(highest);
if (!initFromConfig(txn, *highest))
return false;
if (highest->version > myVersion && highest->version >= 0) {
log() << "replSet got config version " << highest->version
<< " from a remote, saving locally" << rsLog;
highest->saveConfigLocally(txn, BSONObj());
}
return true;
}
void ReplSetImpl::loadConfig(OperationContext* txn) {
startupStatus = LOADINGCONFIG;
startupStatusMsg.set("loading " + rsConfigNs + " config (LOADINGCONFIG)");
LOG(1) << "loadConfig() " << rsConfigNs << endl;
while (1) {
try {
OwnedPointerVector configs;
try {
configs.mutableVector().push_back(ReplSetConfig::makeDirect(txn));
}
catch (DBException& e) {
log() << "replSet exception loading our local replset configuration object : "
<< e.toString() << rsLog;
}
for (vector::const_iterator i = _seeds->begin();
i != _seeds->end();
i++) {
try {
configs.mutableVector().push_back(ReplSetConfig::make(txn, *i));
}
catch (DBException& e) {
log() << "replSet exception trying to load config from " << *i
<< " : " << e.toString() << rsLog;
}
}
ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings();
{
scoped_lock lck(replSettings.discoveredSeeds_mx);
if (replSettings.discoveredSeeds.size() > 0) {
for (set::iterator i = replSettings.discoveredSeeds.begin();
i != replSettings.discoveredSeeds.end();
i++) {
try {
configs.mutableVector().push_back(
ReplSetConfig::make(txn, HostAndPort(*i)));
}
catch (DBException&) {
LOG(1) << "replSet exception trying to load config from discovered "
"seed " << *i << rsLog;
replSettings.discoveredSeeds.erase(*i);
}
}
}
}
if (!replSettings.reconfig.isEmpty()) {
try {
configs.mutableVector().push_back(ReplSetConfig::make(txn, replSettings.reconfig,
true));
}
catch (DBException& re) {
log() << "replSet couldn't load reconfig: " << re.what() << rsLog;
replSettings.reconfig = BSONObj();
}
}
int nok = 0;
int nempty = 0;
for (vector::iterator i = configs.mutableVector().begin();
i != configs.mutableVector().end(); i++) {
if ((*i)->ok())
nok++;
if ((*i)->empty())
nempty++;
}
if (nok == 0) {
if (nempty == (int) configs.mutableVector().size()) {
startupStatus = EMPTYCONFIG;
startupStatusMsg.set("can't get " + rsConfigNs +
" config from self or any seed (EMPTYCONFIG)");
log() << "replSet can't get " << rsConfigNs
<< " config from self or any seed (EMPTYCONFIG)" << rsLog;
static unsigned once;
if (++once == 1) {
log() << "replSet info you may need to run replSetInitiate -- rs.initia"
"te() in the shell -- if that is not already done" << rsLog;
}
if (_seeds->size() == 0) {
LOG(1) << "replSet info no seed hosts were specified on the --replSet "
"command line" << rsLog;
}
}
else {
startupStatus = EMPTYUNREACHABLE;
startupStatusMsg.set("can't currently get " + rsConfigNs +
" config from self or any seed (EMPTYUNREACHABLE)");
log() << "replSet can't get " << rsConfigNs
<< " config from self or any seed (yet)" << rsLog;
}
sleepsecs(1);
continue;
}
if (!_loadConfigFinish(txn, configs.mutableVector())) {
log() << "replSet info Couldn't load config yet. Sleeping 3 sec and will try "
"again." << rsLog;
sleepsecs(3);
continue;
}
}
catch (DBException& e) {
startupStatus = BADCONFIG;
startupStatusMsg.set("replSet error loading set config (BADCONFIG)");
log() << "replSet error loading configurations " << e.toString() << rsLog;
log() << "replSet error replication will not start" << rsLog;
sethbmsg("error loading set config");
fassertFailedNoTrace(18754);
throw;
}
break;
}
startupStatusMsg.set("? started");
startupStatus = STARTED;
}
const Member* ReplSetImpl::getMemberToSyncTo() {
lock lk(this);
// if we have a target we've requested to sync from, use it
if (_forceSyncTarget) {
Member* target = _forceSyncTarget;
_forceSyncTarget = 0;
sethbmsg( str::stream() << "syncing to: " << target->fullName() << " by request", 0);
return target;
}
const Member* primary = box.getPrimary();
// wait for 2N pings before choosing a sync target
if (_cfg) {
int needMorePings = config().members.size()*2 - HeartbeatInfo::numPings;
if (needMorePings > 0) {
OCCASIONALLY log() << "waiting for " << needMorePings << " pings from other members before syncing" << endl;
return NULL;
}
// If we are only allowed to sync from the primary, return that
if (!_cfg->chainingAllowed()) {
// Returns NULL if we cannot reach the primary
return primary;
}
}
// find the member with the lowest ping time that has more data than me
// Find primary's oplog time. Reject sync candidates that are more than
// maxSyncSourceLagSecs seconds behind.
OpTime primaryOpTime;
if (primary)
primaryOpTime = primary->hbinfo().opTime;
else
// choose a time that will exclude no candidates, since we don't see a primary
primaryOpTime = OpTime(maxSyncSourceLagSecs, 0);
if (primaryOpTime.getSecs() < static_cast(maxSyncSourceLagSecs)) {
// erh - I think this means there was just a new election
// and we don't yet know the new primary's optime
primaryOpTime = OpTime(maxSyncSourceLagSecs, 0);
}
OpTime oldestSyncOpTime(primaryOpTime.getSecs() - maxSyncSourceLagSecs, 0);
Member *closest = 0;
time_t now = 0;
// Make two attempts. The first attempt, we ignore those nodes with
// slave delay higher than our own. The second attempt includes such
// nodes, in case those are the only ones we can reach.
// This loop attempts to set 'closest'.
for (int attempts = 0; attempts < 2; ++attempts) {
for (Member *m = _members.head(); m; m = m->next()) {
if (!m->syncable())
continue;
if (m->state() == MemberState::RS_SECONDARY) {
// only consider secondaries that are ahead of where we are
if (m->hbinfo().opTime <= lastOpTimeWritten)
continue;
// omit secondaries that are excessively behind, on the first attempt at least.
if (attempts == 0 &&
m->hbinfo().opTime < oldestSyncOpTime)
continue;
}
// omit nodes that are more latent than anything we've already considered
if (closest &&
(m->hbinfo().ping > closest->hbinfo().ping))
continue;
if (attempts == 0 &&
(myConfig().slaveDelay < m->config().slaveDelay || m->config().hidden)) {
continue; // skip this one in the first attempt
}
map::iterator vetoed = _veto.find(m->fullName());
if (vetoed != _veto.end()) {
// Do some veto housekeeping
if (now == 0) {
now = time(0);
}
// if this was on the veto list, check if it was vetoed in the last "while".
// if it was, skip.
if (vetoed->second >= now) {
if (time(0) % 5 == 0) {
log() << "replSet not trying to sync from " << (*vetoed).first
<< ", it is vetoed for " << ((*vetoed).second - now) << " more seconds" << rsLog;
}
continue;
}
_veto.erase(vetoed);
// fall through, this is a valid candidate now
}
// This candidate has passed all tests; set 'closest'
closest = m;
}
if (closest) break; // no need for second attempt
}
if (!closest) {
return NULL;
}
sethbmsg( str::stream() << "syncing to: " << closest->fullName(), 0);
return closest;
}
void ReplSetImpl::veto(const string& host, const Date_t until) {
lock lk(this);
_veto[host] = until.toTimeT();
}
} // namespace repl
} // namespace mongo