summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/SConscript1
-rw-r--r--src/mongo/db/db.cpp3
-rw-r--r--src/mongo/db/repl/SConscript14
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.cpp1
-rw-r--r--src/mongo/db/repl/repl_settings.cpp47
-rw-r--r--src/mongo/db/repl/repl_settings.h2
-rw-r--r--src/mongo/db/repl/replset_web_handler.cpp1
-rw-r--r--src/mongo/db/repl/rs_sync.cpp8
-rw-r--r--src/mongo/db/repl/topology_coordinator.h1
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp151
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.h39
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_test.cpp74
-rw-r--r--src/mongo/db/repl/topology_coordinator_mock.cpp1
-rw-r--r--src/mongo/db/repl/topology_coordinator_mock.h1
14 files changed, 234 insertions, 110 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index 34267ab293c..a9b8d0ebaae 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -915,6 +915,7 @@ serveronlyLibdeps = ["coreshard",
"db/exec/working_set",
"db/exec/exec",
"db/query/query",
+ "db/repl/repl_settings",
"db/repl/network_interface_impl",
"db/repl/replication_executor",
"db/repl/repl_coordinator_impl",
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 68c8bf36254..4ccc49f3d26 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -279,7 +279,8 @@ namespace mongo {
logStartup();
repl::getGlobalReplicationCoordinator()->startReplication(
- new repl::TopologyCoordinatorImpl(), new repl::NetworkInterfaceImpl());
+ new repl::TopologyCoordinatorImpl(repl::maxSyncSourceLagSecs),
+ new repl::NetworkInterfaceImpl());
if (serverGlobalParams.isHttpInterfaceEnabled)
boost::thread web(stdx::bind(&webServerThread,
new RestAdminAccess())); // takes ownership
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 5266a31302c..0c2e72fc9fb 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -2,6 +2,13 @@
Import("env")
+env.Library('repl_settings',
+ 'repl_settings.cpp',
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base/base',
+ '$BUILD_DIR/mongo/server_parameters'
+ ])
+
env.Library(
'network_interface_impl',
'network_interface_impl.cpp',
@@ -24,7 +31,12 @@ env.Library('topology_coordinator_impl',
['topology_coordinator_impl.cpp',
'member_heartbeat_data.cpp'],
LIBDEPS=['replica_set_config',
- 'replication_executor'])
+ 'replication_executor',
+ 'repl_settings'])
+
+env.CppUnitTest('topology_coordinator_impl_test',
+ 'topology_coordinator_impl_test.cpp',
+ LIBDEPS=['topology_coordinator_impl'])
env.Library('repl_coordinator_impl',
[
diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp
index a134bf7ad57..3a62541e631 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.cpp
+++ b/src/mongo/db/repl/repl_coordinator_impl.cpp
@@ -400,6 +400,7 @@ namespace repl {
stdx::placeholders::_1,
Date_t(curTimeMillis64()),
cmdObj,
+ _settings.ourSetName(),
resultObj,
&result));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
diff --git a/src/mongo/db/repl/repl_settings.cpp b/src/mongo/db/repl/repl_settings.cpp
new file mode 100644
index 00000000000..3b22a3203eb
--- /dev/null
+++ b/src/mongo/db/repl/repl_settings.cpp
@@ -0,0 +1,47 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/repl/repl_settings.h"
+
+#include "mongo/base/init.h"
+#include "mongo/base/status.h"
+#include "mongo/db/server_parameters.h"
+
+namespace mongo {
+namespace repl {
+
+ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(maxSyncSourceLagSecs, int, 30);
+ MONGO_INITIALIZER(maxSyncSourceLagSecsCheck) (InitializerContext*) {
+ if (maxSyncSourceLagSecs < 1) {
+ return Status(ErrorCodes::BadValue, "maxSyncSourceLagSecs must be > 0");
+ }
+ return Status::OK();
+ }
+
+}
+}
diff --git a/src/mongo/db/repl/repl_settings.h b/src/mongo/db/repl/repl_settings.h
index 1cd26081bb3..81aadfb3430 100644
--- a/src/mongo/db/repl/repl_settings.h
+++ b/src/mongo/db/repl/repl_settings.h
@@ -38,6 +38,8 @@
namespace mongo {
namespace repl {
+ extern int maxSyncSourceLagSecs;
+
bool anyReplEnabled();
/* replication slave? (possibly with slave)
diff --git a/src/mongo/db/repl/replset_web_handler.cpp b/src/mongo/db/repl/replset_web_handler.cpp
index 1ab296a59c3..f97384bafb8 100644
--- a/src/mongo/db/repl/replset_web_handler.cpp
+++ b/src/mongo/db/repl/replset_web_handler.cpp
@@ -30,6 +30,7 @@
#include "mongo/db/dbwebserver.h"
#include "mongo/db/jsobj.h"
+#include "mongo/db/repl/health.h"
#include "mongo/db/repl/repl_coordinator_global.h"
#include "mongo/db/repl/rs.h"
#include "mongo/util/mongoutils/html.h"
diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp
index 1b1860f6e72..1e1f48269ee 100644
--- a/src/mongo/db/repl/rs_sync.cpp
+++ b/src/mongo/db/repl/rs_sync.cpp
@@ -58,14 +58,6 @@
namespace mongo {
namespace repl {
- MONGO_EXPORT_STARTUP_SERVER_PARAMETER(maxSyncSourceLagSecs, int, 30);
- MONGO_INITIALIZER(maxSyncSourceLagSecsCheck) (InitializerContext*) {
- if (maxSyncSourceLagSecs < 1) {
- return Status(ErrorCodes::BadValue, "maxSyncSourceLagSecs must be > 0");
- }
- return Status::OK();
- }
-
/* should be in RECOVERING state on arrival here.
readlocks
@return true if transitioned to SECONDARY
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index f3df5133ea9..fb9b353916b 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -115,6 +115,7 @@ namespace repl {
virtual void prepareHeartbeatResponse(const ReplicationExecutor::CallbackData& data,
Date_t now,
const BSONObj& cmdObj,
+ const std::string& ourSetName,
BSONObjBuilder* resultObj,
Status* result) = 0;
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index ccff6bc1914..2b3aa557d46 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -31,11 +31,10 @@
#include "mongo/db/repl/topology_coordinator_impl.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/repl/isself.h"
-#include "mongo/db/repl/repl_coordinator_global.h"
#include "mongo/db/repl/replication_executor.h"
-#include "mongo/db/repl/rs_sync.h" // maxSyncSourceLagSecs
+#include "mongo/db/server_parameters.h"
#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
namespace mongo {
@@ -43,10 +42,11 @@ namespace mongo {
namespace repl {
- TopologyCoordinatorImpl::TopologyCoordinatorImpl() :
+ TopologyCoordinatorImpl::TopologyCoordinatorImpl(int maxSyncSourceLagSecs) :
_currentPrimaryIndex(-1),
_syncSourceIndex(-1),
_forceSyncSourceIndex(-1),
+ _maxSyncSourceLagSecs(maxSyncSourceLagSecs),
_busyWithElectSelf(false),
_selfIndex(0),
_blockSync(false),
@@ -82,14 +82,14 @@ namespace repl {
invariant(_forceSyncSourceIndex < _currentConfig.getNumMembers());
_syncSourceIndex = _forceSyncSourceIndex;
_forceSyncSourceIndex = -1;
- sethbmsg( str::stream() << "syncing from: "
- << _currentConfig.getMemberAt(_syncSourceIndex).getHostAndPort().toString()
- << " by request", 0);
+ _sethbmsg( str::stream() << "syncing from: "
+ << _currentConfig.getMemberAt(_syncSourceIndex).getHostAndPort().toString()
+ << " by request", 0);
return;
}
// wait for 2N pings before choosing a sync target
- int needMorePings = _hbdata.size()*2 - HeartbeatInfo::numPings;
+ int needMorePings = _hbdata.size()*2 - MemberHeartbeatData::numPings;
if (needMorePings > 0) {
OCCASIONALLY log() << "waiting for " << needMorePings
@@ -112,15 +112,15 @@ namespace repl {
primaryOpTime = _hbdata[_currentPrimaryIndex].getOpTime();
else
// choose a time that will exclude no candidates, since we don't see a primary
- primaryOpTime = OpTime(maxSyncSourceLagSecs, 0);
+ primaryOpTime = OpTime(_maxSyncSourceLagSecs, 0);
- if (primaryOpTime.getSecs() < static_cast<unsigned int>(maxSyncSourceLagSecs)) {
+ if (primaryOpTime.getSecs() < static_cast<unsigned int>(_maxSyncSourceLagSecs)) {
// erh - I think this means there was just a new election
// and we don't yet know the new primary's optime
- primaryOpTime = OpTime(maxSyncSourceLagSecs, 0);
+ primaryOpTime = OpTime(_maxSyncSourceLagSecs, 0);
}
- OpTime oldestSyncOpTime(primaryOpTime.getSecs() - maxSyncSourceLagSecs, 0);
+ OpTime oldestSyncOpTime(primaryOpTime.getSecs() - _maxSyncSourceLagSecs, 0);
int closestIndex = -1;
@@ -185,7 +185,7 @@ namespace repl {
if (now % 5 == 0) {
log() << "replSet not trying to sync from " << vetoed->first
<< ", it is vetoed for " << (vetoed->second - now)
- << " more seconds" << rsLog;
+ << " more seconds";
}
continue;
}
@@ -202,7 +202,7 @@ namespace repl {
return;
}
- sethbmsg( str::stream() << "syncing to: " <<
+ _sethbmsg( str::stream() << "syncing to: " <<
_currentConfig.getMemberAt(closestIndex).getHostAndPort().toString(), 0);
_syncSourceIndex = closestIndex;
}
@@ -226,21 +226,15 @@ namespace repl {
}
void TopologyCoordinatorImpl::relinquishPrimary(OperationContext* txn) {
- LOG(2) << "replSet attempting to relinquish" << endl;
- invariant(txn->lockState()->isWriteLocked());
- if (_memberState != MemberState::RS_PRIMARY) {
- // Already relinquished?
- log() << "replSet warning attempted to relinquish but not primary";
- return;
- }
+ invariant(_memberState == MemberState::RS_PRIMARY);
- log() << "replSet relinquishing primary state" << rsLog;
+ log() << "replSet relinquishing primary state";
_changeMemberState(MemberState::RS_SECONDARY);
// close sockets that were talking to us so they don't blithly send many writes that
// will fail with "not master" (of course client could check result code, but in
// case they are not)
- log() << "replSet closing client sockets after relinquishing primary" << rsLog;
+ log() << "replSet closing client sockets after relinquishing primary";
//MessagingPort::closeAllSockets(ScopedConn::keepOpen);
// XXX Eric: what to do here?
}
@@ -249,6 +243,7 @@ namespace repl {
void TopologyCoordinatorImpl::_electSelf(Date_t now) {
verify( !_selfConfig().isArbiter() );
verify( _selfConfig().getSlaveDelay() == Seconds(0) );
+/*
try {
// XXX Eric
// _electSelf(now);
@@ -257,16 +252,15 @@ namespace repl {
throw;
}
catch (VoteException& ) { // Eric: XXX
- log() << "replSet not trying to elect self as responded yea to someone else recently"
- << rsLog;
+ log() << "replSet not trying to elect self as responded yea to someone else recently";
}
catch (const DBException& e) {
- log() << "replSet warning caught unexpected exception in electSelf() " << e.toString()
- << rsLog;
+ log() << "replSet warning caught unexpected exception in electSelf() " << e.toString();
}
catch (...) {
- log() << "replSet warning caught unexpected exception in electSelf()" << rsLog;
+ log() << "replSet warning caught unexpected exception in electSelf()";
}
+*/
}
// Produce a reply to a RAFT-style RequestVote RPC; this is MongoDB ReplSetFresh command
@@ -283,7 +277,7 @@ namespace repl {
bool weAreFresher = false;
if( _currentConfig.getConfigVersion() > cfgver ) {
log() << "replSet member " << who << " is not yet aware its cfg version "
- << cfgver << " is stale" << rsLog;
+ << cfgver << " is stale";
result.append("info", "config version stale");
weAreFresher = true;
}
@@ -371,8 +365,8 @@ namespace repl {
BSONObjBuilder& result) {
//TODO: after eric's checkin, add executer stuff and error if cancelled
- DEV log() << "replSet received elect msg " << cmdObj.toString() << rsLog;
- else LOG(2) << "replSet received elect msg " << cmdObj.toString() << rsLog;
+ DEV log() << "replSet received elect msg " << cmdObj.toString();
+ else LOG(2) << "replSet received elect msg " << cmdObj.toString();
std::string setName = cmdObj["setName"].String();
int whoid = cmdObj["whoid"].Int();
@@ -387,31 +381,31 @@ namespace repl {
if ( setName != _currentConfig.getReplSetName() ) {
log() << "replSet error received an elect request for '" << setName
<< "' but our setName name is '" <<
- _currentConfig.getReplSetName() << "'" << rsLog;
+ _currentConfig.getReplSetName() << "'";
}
else if ( myver < cfgver ) {
// we are stale. don't vote
}
else if ( myver > cfgver ) {
// they are stale!
- log() << "replSet electCmdReceived info got stale version # during election" << rsLog;
+ log() << "replSet electCmdReceived info got stale version # during election";
vote = -10000;
}
else if ( hopefulIndex == -1 ) {
- log() << "replSet electCmdReceived couldn't find member with id " << whoid << rsLog;
+ log() << "replSet electCmdReceived couldn't find member with id " << whoid;
vote = -10000;
}
else if ( _currentPrimaryIndex != -1 && _memberState == MemberState::RS_PRIMARY ) {
log() << "I am already primary, "
<< _currentConfig.getMemberAt(hopefulIndex).getHostAndPort().toString()
- << " can try again once I've stepped down" << rsLog;
+ << " can try again once I've stepped down";
vote = -10000;
}
else if (_currentPrimaryIndex != -1) {
log() << _currentConfig.getMemberAt(hopefulIndex).getHostAndPort().toString()
<< " is trying to elect itself but " <<
_currentConfig.getMemberAt(_currentPrimaryIndex).getHostAndPort().toString()
- << " is already primary" << rsLog;
+ << " is already primary";
vote = -10000;
}
else if ((highestPriorityIndex != -1) &&
@@ -423,26 +417,20 @@ namespace repl {
vote = -10000;
}
else {
- try {
- if (_lastVote.when + LeaseTime >= now && static_cast<int>(_lastVote.who) != whoid) {
- LOG(1) << "replSet not voting yea for " << whoid
- << " voted for " << _lastVote.who << ' ' << now-_lastVote.when
- << " secs ago" << rsLog;
- //TODO: remove exception, and change control flow?
- throw VoteException();
- }
+ if (_lastVote.when + LeaseTime >= now && static_cast<int>(_lastVote.who) != whoid) {
+ log() << "replSet voting no for "
+ << _currentConfig.getMemberAt(hopefulIndex).getHostAndPort().toString()
+ << " voted for " << _lastVote.who << ' ' << now-_lastVote.when
+ << " secs ago";
+ }
+ else {
_lastVote.when = now;
_lastVote.who = whoid;
vote = _selfConfig().getNumVotes();
invariant( _currentConfig.getMemberAt(hopefulIndex).getId() == whoid );
log() << "replSet info voting yea for " <<
_currentConfig.getMemberAt(hopefulIndex).getHostAndPort().toString()
- << " (" << whoid << ')' << rsLog;
- }
- catch(VoteException&) {
- log() << "replSet voting no for "
- << _currentConfig.getMemberAt(hopefulIndex).getHostAndPort().toString()
- << " already voted for another" << rsLog;
+ << " (" << whoid << ')';
}
}
@@ -455,6 +443,7 @@ namespace repl {
const ReplicationExecutor::CallbackData& data,
Date_t now,
const BSONObj& cmdObj,
+ const std::string& ourSetName,
BSONObjBuilder* resultObj,
Status* result) {
if (data.status == ErrorCodes::CallbackCanceled) {
@@ -469,12 +458,10 @@ namespace repl {
// Verify that replica set names match
std::string rshb = std::string(cmdObj.getStringField("replSetHeartbeat"));
- const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings();
- if (replSettings.ourSetName() != rshb) {
+ if (ourSetName != rshb) {
*result = Status(ErrorCodes::BadValue, "repl set names do not match");
- log() << "replSet set names do not match, our cmdline: " << replSettings.replSet
- << rsLog;
- log() << "replSet rshb: " << rshb << rsLog;
+ log() << "replSet set names do not match, ours: " << ourSetName <<
+ "; remote node's: " << rshb;
resultObj->append("mismatch", true);
return;
}
@@ -613,7 +600,7 @@ namespace repl {
<< " and "
<< (latestOp - _hbdata[highestPriorityIndex].getOpTime().getSecs())
<< " seconds behind";
-
+/* logic --
// Are we primary?
if (isSelf(_currentConfig.getMemberAt(_currentPrimaryIndex).getHostAndPort())) {
// replSetStepDown tries to acquire the same lock
@@ -625,24 +612,24 @@ namespace repl {
else {
// We are not primary. Step down the remote node.
BSONObj cmd = BSON( "replSetStepDown" << 1 );
-/* ScopedConn conn(primary->fullName());
- BSONObj result;
- // XXX Eric: schedule stepdown command
-
- try {
- if (!conn.runCommand("admin", cmd, result, 0)) {
- log() << "stepping down " << primary->fullName()
- << " failed: " << result << endl;
- }
- }
- catch (DBException &e) {
- log() << "stepping down " << primary->fullName() << " threw exception: "
- << e.toString() << endl;
- }
+ ScopedConn conn(primary->fullName());
+ BSONObj result;
+ // XXX Eric: schedule stepdown command
+
+ try {
+ if (!conn.runCommand("admin", cmd, result, 0)) {
+ log() << "stepping down " << primary->fullName()
+ << " failed: " << result << endl;
+ }
+ }
+ catch (DBException &e) {
+ log() << "stepping down " << primary->fullName() << " threw exception: "
+ << e.toString() << endl;
+ }
*/
return StepDown;
- }
+
}
}
@@ -672,7 +659,7 @@ namespace repl {
log() << "replset error could not reach/authenticate against any members";
if (_currentPrimaryIndex == _selfIndex) {
- log() << "auth problems, relinquishing primary" << rsLog;
+ log() << "auth problems, relinquishing primary";
// XXX Eric: schedule relinquish
//rs->relinquish();
@@ -712,7 +699,7 @@ namespace repl {
if (remotePrimaryIndex != -1) {
// two other nodes think they are primary (asynchronously polled)
// -- wait for things to settle down.
- log() << "replSet info two primaries (transiently)" << rsLog;
+ log() << "replSet info two primaries (transiently)";
return None;
}
remotePrimaryIndex = it->getConfigIndex();
@@ -767,7 +754,7 @@ namespace repl {
fassert(18505, _currentPrimaryIndex == _selfIndex);
if (_shouldRelinquish()) {
- log() << "can't see a majority of the set, relinquishing primary" << rsLog;
+ log() << "can't see a majority of the set, relinquishing primary";
// XXX Eric: schedule a relinquish
//rs->relinquish();
return StepDown;
@@ -795,7 +782,7 @@ namespace repl {
int ll = 0;
if( ++n > 5 ) ll++;
if( last + 60 > now ) ll++;
- LOG(ll) << "replSet can't see a majority, will not try to elect self" << rsLog;
+ LOG(ll) << "replSet can't see a majority, will not try to elect self";
last = now;
return None;
}
@@ -823,7 +810,7 @@ namespace repl {
requeue();
}
catch(...) {
- log() << "replSet error unexpected assertion in rs manager" << rsLog;
+ log() << "replSet error unexpected assertion in rs manager";
}
}
@@ -860,7 +847,7 @@ namespace repl {
}
if( vTot % 2 == 0 && vTot && complain++ == 0 )
log() << "replSet warning: even number of voting members in replica set config - "
- "add an arbiter or set votes to 0 on one of the existing members" << rsLog;
+ "add an arbiter or set votes to 0 on one of the existing members";
return vTot;
}
@@ -910,7 +897,7 @@ namespace repl {
return;
}
_memberState = newMemberState;
- log() << "replSet " << _memberState.toString() << rsLog;
+ log() << "replSet " << _memberState.toString();
for (std::vector<StateChangeCallbackFn>::const_iterator it = _stateChangeCallbacks.begin();
it != _stateChangeCallbacks.end(); ++it) {
@@ -1035,7 +1022,7 @@ namespace repl {
if (secs == 0) {
_stepDownUntil = now;
- log() << "replSet info 'unfreezing'" << rsLog;
+ log() << "replSet info 'unfreezing'";
result.append("info","unfreezing");
}
else {
@@ -1044,10 +1031,10 @@ namespace repl {
if (_memberState != MemberState::RS_PRIMARY) {
_stepDownUntil = now + secs;
- log() << "replSet info 'freezing' for " << secs << " seconds" << rsLog;
+ log() << "replSet info 'freezing' for " << secs << " seconds";
}
else {
- log() << "replSet info received freeze command but we are primary" << rsLog;
+ log() << "replSet info received freeze command but we are primary";
}
}
}
@@ -1106,7 +1093,7 @@ namespace repl {
}
if (!s.empty()) {
lastLogged = _hbmsgTime;
- LOG(logLevel) << "replSet " << s << rsLog;
+ LOG(logLevel) << "replSet " << s;
}
}
diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h
index 411bfc6fa25..63a7bc5eb73 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.h
+++ b/src/mongo/db/repl/topology_coordinator_impl.h
@@ -48,7 +48,7 @@ namespace repl {
class TopologyCoordinatorImpl : public TopologyCoordinator {
public:
- TopologyCoordinatorImpl();
+ explicit TopologyCoordinatorImpl(int maxSyncSourceLagSecs);
virtual ~TopologyCoordinatorImpl() {};
virtual void setLastApplied(const OpTime& optime);
@@ -59,12 +59,12 @@ namespace repl {
// Looks up syncSource's address and returns it, for use by the Applier
virtual HostAndPort getSyncSourceAddress() const;
// Chooses and sets a new sync source, based on our current knowledge of the world
- virtual void chooseNewSyncSource(Date_t now); // this is basically getMemberToSyncTo()
- // Do not choose a member as a sync source until time given;
+ virtual void chooseNewSyncSource(Date_t now);
+ // Does not choose a member as a sync source until time given;
// call this when we have reason to believe it's a bad choice
virtual void blacklistSyncSource(const HostAndPort& host, Date_t until);
- // Add function pointer to callback list; call function when config changes
+ // Adds function pointer to callback list; calls function when config changes
// Applier needs to know when things like chainingAllowed or slaveDelay change.
// ReplCoord needs to know when things like the tag sets change.
virtual void registerConfigChangeCallback(const ConfigChangeCallbackFn& fn);
@@ -74,44 +74,45 @@ namespace repl {
// Applier calls this to notify that it's now safe to transition from SECONDARY to PRIMARY
virtual void signalDrainComplete();
- // produce a reply to a RAFT-style RequestVote RPC; this is MongoDB ReplSetFresh command
+ // produces a reply to a RAFT-style RequestVote RPC
virtual void prepareRequestVoteResponse(const Date_t now,
const BSONObj& cmdObj,
std::string& errmsg,
BSONObjBuilder& result);
- // produce a reply to a received electCmd
+ // produces a reply to a received electCmd
virtual void prepareElectCmdResponse(const Date_t now,
const BSONObj& cmdObj,
BSONObjBuilder& result);
- // produce a reply to a heartbeat
+ // produces a reply to a heartbeat
virtual void prepareHeartbeatResponse(const ReplicationExecutor::CallbackData& data,
Date_t now,
const BSONObj& cmdObj,
+ const std::string& ourSetName,
BSONObjBuilder* resultObj,
Status* result);
- // update internal state with heartbeat response
+ // updates internal state with heartbeat response
HeartbeatResultAction updateHeartbeatData(Date_t now,
const MemberHeartbeatData& newInfo,
int id);
- // produce a reply to a status request
+ // produces a reply to a status request
virtual void prepareStatusResponse(Date_t now,
const BSONObj& cmdObj,
BSONObjBuilder& result,
unsigned uptime);
- // produce a reply to a freeze request
+ // produces a reply to a freeze request
virtual void prepareFreezeResponse(Date_t now,
const BSONObj& cmdObj,
BSONObjBuilder& result);
- // transition PRIMARY to SECONDARY; caller must already be holding an appropriate dblock
+ // transitions PRIMARY to SECONDARY; caller must already be holding an appropriate dblock
virtual void relinquishPrimary(OperationContext* txn);
- // update internal config with new config (already validated)
+ // updates internal config with new config (already validated)
virtual void updateConfig(const ReplicaSetConfig& newConfig, int selfIndex, Date_t now);
private:
@@ -126,23 +127,23 @@ namespace repl {
// Logic to determine if we should step down as primary
bool _shouldRelinquish() const;
- // See if a majority number of votes are held by members who are currently "up"
+ // Sees if a majority number of votes are held by members who are currently "up"
bool _aMajoritySeemsToBeUp() const;
- // Return the total number of votes in the current config
+ // Returns the total number of votes in the current config
int _totalVotes() const;
- // Scan through all members that are 'up' and return the latest known optime
+ // Scans through all members that are 'up' and return the latest known optime
OpTime _latestKnownOpTime() const;
- // Begin election proceedings
+ // Begins election proceedings
void _electSelf(Date_t now);
// Scans the electable set and returns the highest priority member index
int _getHighestPriorityElectableIndex() const;
- // Change _memberState, if state is different from _memberState.
- // Call all registered callbacks for state changes.
+ // Changes _memberState to newMemberState, then calls all registered callbacks
+ // for state changes.
void _changeMemberState(const MemberState& newMemberState);
OpTime _lastApplied; // the last op that the applier has actually written to the data
@@ -172,6 +173,8 @@ namespace repl {
std::map<HostAndPort, Date_t> _syncSourceBlacklist;
// The next sync source to be chosen, requested via a replSetSyncFrom command
int _forceSyncSourceIndex;
+ // How far this node must fall behind before considering switching sync sources
+ int _maxSyncSourceLagSecs;
// insanity follows
diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
new file mode 100644
index 00000000000..4912c88c56d
--- /dev/null
+++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
@@ -0,0 +1,74 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <limits>
+
+#include "mongo/db/repl/topology_coordinator_impl.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/net/hostandport.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+namespace repl {
+
+namespace {
+
+ ReplicaSetConfig startConfig;
+
+ TEST(TopologyCoordinator, ForceSyncSource) {
+ TopologyCoordinatorImpl topocoord(std::numeric_limits<int>::max());
+ Date_t now = 0;
+ topocoord.updateConfig(startConfig, 0, now++);
+ topocoord.chooseNewSyncSource(now++);
+ // ASSERT(getSyncSourceAddress() == HostAndPort(expected one);
+ topocoord.setForceSyncSourceIndex(2);
+ topocoord.chooseNewSyncSource(now++);
+ // ASSERT(getSyncSourceAddress() == HostAndPort(expected one / id #2);
+ }
+
+ TEST(TopologyCoordinator, BlacklistSyncSource) {
+ TopologyCoordinatorImpl topocoord(std::numeric_limits<int>::max());
+ Date_t now = 0;
+ topocoord.updateConfig(startConfig, 0, now++);
+ topocoord.chooseNewSyncSource(now++);
+ // ASSERT(getSyncSourceAddress() == HostAndPort(expected one);
+
+ Date_t expireTime = 100;
+ topocoord.blacklistSyncSource(HostAndPort() /* the current one */, expireTime);
+ topocoord.chooseNewSyncSource(now++);
+ // ASSERT(getSyncSourceAddress() == HostAndPort(expected one);
+ topocoord.chooseNewSyncSource(expireTime);
+ // ASSERT(getSyncSourceAddress() == HostAndPort(blacklisted one);
+ }
+
+} // namespace
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/topology_coordinator_mock.cpp b/src/mongo/db/repl/topology_coordinator_mock.cpp
index ad786edf24e..f74751dba03 100644
--- a/src/mongo/db/repl/topology_coordinator_mock.cpp
+++ b/src/mongo/db/repl/topology_coordinator_mock.cpp
@@ -69,6 +69,7 @@ namespace repl {
void TopologyCoordinatorMock::prepareHeartbeatResponse(const ReplicationExecutor::CallbackData&,
Date_t now,
const BSONObj& cmdObj,
+ const std::string& ourSetName,
BSONObjBuilder* resultObj,
Status* result) {
}
diff --git a/src/mongo/db/repl/topology_coordinator_mock.h b/src/mongo/db/repl/topology_coordinator_mock.h
index 7fd7c17bd5f..7e27eeb8261 100644
--- a/src/mongo/db/repl/topology_coordinator_mock.h
+++ b/src/mongo/db/repl/topology_coordinator_mock.h
@@ -70,6 +70,7 @@ namespace repl {
virtual void prepareHeartbeatResponse(const ReplicationExecutor::CallbackData& data,
Date_t now,
const BSONObj& cmdObj,
+ const std::string& ourSetName,
BSONObjBuilder* resultObj,
Status* result);