diff options
-rw-r--r-- | src/mongo/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/db.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 14 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_settings.cpp | 47 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_settings.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replset_web_handler.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_sync.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.cpp | 151 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.h | 39 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl_test.cpp | 74 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_mock.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_mock.h | 1 |
14 files changed, 234 insertions, 110 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index 34267ab293c..a9b8d0ebaae 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -915,6 +915,7 @@ serveronlyLibdeps = ["coreshard", "db/exec/working_set", "db/exec/exec", "db/query/query", + "db/repl/repl_settings", "db/repl/network_interface_impl", "db/repl/replication_executor", "db/repl/repl_coordinator_impl", diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 68c8bf36254..4ccc49f3d26 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -279,7 +279,8 @@ namespace mongo { logStartup(); repl::getGlobalReplicationCoordinator()->startReplication( - new repl::TopologyCoordinatorImpl(), new repl::NetworkInterfaceImpl()); + new repl::TopologyCoordinatorImpl(repl::maxSyncSourceLagSecs), + new repl::NetworkInterfaceImpl()); if (serverGlobalParams.isHttpInterfaceEnabled) boost::thread web(stdx::bind(&webServerThread, new RestAdminAccess())); // takes ownership diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 5266a31302c..0c2e72fc9fb 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -2,6 +2,13 @@ Import("env") +env.Library('repl_settings', + 'repl_settings.cpp', + LIBDEPS=[ + '$BUILD_DIR/mongo/base/base', + '$BUILD_DIR/mongo/server_parameters' + ]) + env.Library( 'network_interface_impl', 'network_interface_impl.cpp', @@ -24,7 +31,12 @@ env.Library('topology_coordinator_impl', ['topology_coordinator_impl.cpp', 'member_heartbeat_data.cpp'], LIBDEPS=['replica_set_config', - 'replication_executor']) + 'replication_executor', + 'repl_settings']) + +env.CppUnitTest('topology_coordinator_impl_test', + 'topology_coordinator_impl_test.cpp', + LIBDEPS=['topology_coordinator_impl']) env.Library('repl_coordinator_impl', [ diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp index a134bf7ad57..3a62541e631 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl.cpp @@ -400,6 +400,7 @@ namespace repl { stdx::placeholders::_1, Date_t(curTimeMillis64()), cmdObj, + _settings.ourSetName(), resultObj, &result)); if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { diff --git a/src/mongo/db/repl/repl_settings.cpp b/src/mongo/db/repl/repl_settings.cpp new file mode 100644 index 00000000000..3b22a3203eb --- /dev/null +++ b/src/mongo/db/repl/repl_settings.cpp @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/repl/repl_settings.h" + +#include "mongo/base/init.h" +#include "mongo/base/status.h" +#include "mongo/db/server_parameters.h" + +namespace mongo { +namespace repl { + + MONGO_EXPORT_STARTUP_SERVER_PARAMETER(maxSyncSourceLagSecs, int, 30); + MONGO_INITIALIZER(maxSyncSourceLagSecsCheck) (InitializerContext*) { + if (maxSyncSourceLagSecs < 1) { + return Status(ErrorCodes::BadValue, "maxSyncSourceLagSecs must be > 0"); + } + return Status::OK(); + } + +} +} diff --git a/src/mongo/db/repl/repl_settings.h b/src/mongo/db/repl/repl_settings.h index 1cd26081bb3..81aadfb3430 100644 --- a/src/mongo/db/repl/repl_settings.h +++ b/src/mongo/db/repl/repl_settings.h @@ -38,6 +38,8 @@ namespace mongo { namespace repl { + extern int maxSyncSourceLagSecs; + bool anyReplEnabled(); /* replication slave? (possibly with slave) diff --git a/src/mongo/db/repl/replset_web_handler.cpp b/src/mongo/db/repl/replset_web_handler.cpp index 1ab296a59c3..f97384bafb8 100644 --- a/src/mongo/db/repl/replset_web_handler.cpp +++ b/src/mongo/db/repl/replset_web_handler.cpp @@ -30,6 +30,7 @@ #include "mongo/db/dbwebserver.h" #include "mongo/db/jsobj.h" +#include "mongo/db/repl/health.h" #include "mongo/db/repl/repl_coordinator_global.h" #include "mongo/db/repl/rs.h" #include "mongo/util/mongoutils/html.h" diff --git a/src/mongo/db/repl/rs_sync.cpp b/src/mongo/db/repl/rs_sync.cpp index 1b1860f6e72..1e1f48269ee 100644 --- a/src/mongo/db/repl/rs_sync.cpp +++ b/src/mongo/db/repl/rs_sync.cpp @@ -58,14 +58,6 @@ namespace mongo { namespace repl { - MONGO_EXPORT_STARTUP_SERVER_PARAMETER(maxSyncSourceLagSecs, int, 30); - MONGO_INITIALIZER(maxSyncSourceLagSecsCheck) (InitializerContext*) { - if (maxSyncSourceLagSecs < 1) { - return Status(ErrorCodes::BadValue, "maxSyncSourceLagSecs must be > 0"); - } - return Status::OK(); - } - /* should be in RECOVERING state on arrival here. readlocks @return true if transitioned to SECONDARY diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index f3df5133ea9..fb9b353916b 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -115,6 +115,7 @@ namespace repl { virtual void prepareHeartbeatResponse(const ReplicationExecutor::CallbackData& data, Date_t now, const BSONObj& cmdObj, + const std::string& ourSetName, BSONObjBuilder* resultObj, Status* result) = 0; diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index ccff6bc1914..2b3aa557d46 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -31,11 +31,10 @@ #include "mongo/db/repl/topology_coordinator_impl.h" #include "mongo/db/operation_context.h" -#include "mongo/db/repl/isself.h" -#include "mongo/db/repl/repl_coordinator_global.h" #include "mongo/db/repl/replication_executor.h" -#include "mongo/db/repl/rs_sync.h" // maxSyncSourceLagSecs +#include "mongo/db/server_parameters.h" #include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" namespace mongo { @@ -43,10 +42,11 @@ namespace mongo { namespace repl { - TopologyCoordinatorImpl::TopologyCoordinatorImpl() : + TopologyCoordinatorImpl::TopologyCoordinatorImpl(int maxSyncSourceLagSecs) : _currentPrimaryIndex(-1), _syncSourceIndex(-1), _forceSyncSourceIndex(-1), + _maxSyncSourceLagSecs(maxSyncSourceLagSecs), _busyWithElectSelf(false), _selfIndex(0), _blockSync(false), @@ -82,14 +82,14 @@ namespace repl { invariant(_forceSyncSourceIndex < _currentConfig.getNumMembers()); _syncSourceIndex = _forceSyncSourceIndex; _forceSyncSourceIndex = -1; - sethbmsg( str::stream() << "syncing from: " - << _currentConfig.getMemberAt(_syncSourceIndex).getHostAndPort().toString() - << " by request", 0); + _sethbmsg( str::stream() << "syncing from: " + << _currentConfig.getMemberAt(_syncSourceIndex).getHostAndPort().toString() + << " by request", 0); return; } // wait for 2N pings before choosing a sync target - int needMorePings = _hbdata.size()*2 - HeartbeatInfo::numPings; + int needMorePings = _hbdata.size()*2 - MemberHeartbeatData::numPings; if (needMorePings > 0) { OCCASIONALLY log() << "waiting for " << needMorePings @@ -112,15 +112,15 @@ namespace repl { primaryOpTime = _hbdata[_currentPrimaryIndex].getOpTime(); else // choose a time that will exclude no candidates, since we don't see a primary - primaryOpTime = OpTime(maxSyncSourceLagSecs, 0); + primaryOpTime = OpTime(_maxSyncSourceLagSecs, 0); - if (primaryOpTime.getSecs() < static_cast<unsigned int>(maxSyncSourceLagSecs)) { + if (primaryOpTime.getSecs() < static_cast<unsigned int>(_maxSyncSourceLagSecs)) { // erh - I think this means there was just a new election // and we don't yet know the new primary's optime - primaryOpTime = OpTime(maxSyncSourceLagSecs, 0); + primaryOpTime = OpTime(_maxSyncSourceLagSecs, 0); } - OpTime oldestSyncOpTime(primaryOpTime.getSecs() - maxSyncSourceLagSecs, 0); + OpTime oldestSyncOpTime(primaryOpTime.getSecs() - _maxSyncSourceLagSecs, 0); int closestIndex = -1; @@ -185,7 +185,7 @@ namespace repl { if (now % 5 == 0) { log() << "replSet not trying to sync from " << vetoed->first << ", it is vetoed for " << (vetoed->second - now) - << " more seconds" << rsLog; + << " more seconds"; } continue; } @@ -202,7 +202,7 @@ namespace repl { return; } - sethbmsg( str::stream() << "syncing to: " << + _sethbmsg( str::stream() << "syncing to: " << _currentConfig.getMemberAt(closestIndex).getHostAndPort().toString(), 0); _syncSourceIndex = closestIndex; } @@ -226,21 +226,15 @@ namespace repl { } void TopologyCoordinatorImpl::relinquishPrimary(OperationContext* txn) { - LOG(2) << "replSet attempting to relinquish" << endl; - invariant(txn->lockState()->isWriteLocked()); - if (_memberState != MemberState::RS_PRIMARY) { - // Already relinquished? - log() << "replSet warning attempted to relinquish but not primary"; - return; - } + invariant(_memberState == MemberState::RS_PRIMARY); - log() << "replSet relinquishing primary state" << rsLog; + log() << "replSet relinquishing primary state"; _changeMemberState(MemberState::RS_SECONDARY); // close sockets that were talking to us so they don't blithly send many writes that // will fail with "not master" (of course client could check result code, but in // case they are not) - log() << "replSet closing client sockets after relinquishing primary" << rsLog; + log() << "replSet closing client sockets after relinquishing primary"; //MessagingPort::closeAllSockets(ScopedConn::keepOpen); // XXX Eric: what to do here? } @@ -249,6 +243,7 @@ namespace repl { void TopologyCoordinatorImpl::_electSelf(Date_t now) { verify( !_selfConfig().isArbiter() ); verify( _selfConfig().getSlaveDelay() == Seconds(0) ); +/* try { // XXX Eric // _electSelf(now); @@ -257,16 +252,15 @@ namespace repl { throw; } catch (VoteException& ) { // Eric: XXX - log() << "replSet not trying to elect self as responded yea to someone else recently" - << rsLog; + log() << "replSet not trying to elect self as responded yea to someone else recently"; } catch (const DBException& e) { - log() << "replSet warning caught unexpected exception in electSelf() " << e.toString() - << rsLog; + log() << "replSet warning caught unexpected exception in electSelf() " << e.toString(); } catch (...) { - log() << "replSet warning caught unexpected exception in electSelf()" << rsLog; + log() << "replSet warning caught unexpected exception in electSelf()"; } +*/ } // Produce a reply to a RAFT-style RequestVote RPC; this is MongoDB ReplSetFresh command @@ -283,7 +277,7 @@ namespace repl { bool weAreFresher = false; if( _currentConfig.getConfigVersion() > cfgver ) { log() << "replSet member " << who << " is not yet aware its cfg version " - << cfgver << " is stale" << rsLog; + << cfgver << " is stale"; result.append("info", "config version stale"); weAreFresher = true; } @@ -371,8 +365,8 @@ namespace repl { BSONObjBuilder& result) { //TODO: after eric's checkin, add executer stuff and error if cancelled - DEV log() << "replSet received elect msg " << cmdObj.toString() << rsLog; - else LOG(2) << "replSet received elect msg " << cmdObj.toString() << rsLog; + DEV log() << "replSet received elect msg " << cmdObj.toString(); + else LOG(2) << "replSet received elect msg " << cmdObj.toString(); std::string setName = cmdObj["setName"].String(); int whoid = cmdObj["whoid"].Int(); @@ -387,31 +381,31 @@ namespace repl { if ( setName != _currentConfig.getReplSetName() ) { log() << "replSet error received an elect request for '" << setName << "' but our setName name is '" << - _currentConfig.getReplSetName() << "'" << rsLog; + _currentConfig.getReplSetName() << "'"; } else if ( myver < cfgver ) { // we are stale. don't vote } else if ( myver > cfgver ) { // they are stale! - log() << "replSet electCmdReceived info got stale version # during election" << rsLog; + log() << "replSet electCmdReceived info got stale version # during election"; vote = -10000; } else if ( hopefulIndex == -1 ) { - log() << "replSet electCmdReceived couldn't find member with id " << whoid << rsLog; + log() << "replSet electCmdReceived couldn't find member with id " << whoid; vote = -10000; } else if ( _currentPrimaryIndex != -1 && _memberState == MemberState::RS_PRIMARY ) { log() << "I am already primary, " << _currentConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() - << " can try again once I've stepped down" << rsLog; + << " can try again once I've stepped down"; vote = -10000; } else if (_currentPrimaryIndex != -1) { log() << _currentConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() << " is trying to elect itself but " << _currentConfig.getMemberAt(_currentPrimaryIndex).getHostAndPort().toString() - << " is already primary" << rsLog; + << " is already primary"; vote = -10000; } else if ((highestPriorityIndex != -1) && @@ -423,26 +417,20 @@ namespace repl { vote = -10000; } else { - try { - if (_lastVote.when + LeaseTime >= now && static_cast<int>(_lastVote.who) != whoid) { - LOG(1) << "replSet not voting yea for " << whoid - << " voted for " << _lastVote.who << ' ' << now-_lastVote.when - << " secs ago" << rsLog; - //TODO: remove exception, and change control flow? - throw VoteException(); - } + if (_lastVote.when + LeaseTime >= now && static_cast<int>(_lastVote.who) != whoid) { + log() << "replSet voting no for " + << _currentConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() + << " voted for " << _lastVote.who << ' ' << now-_lastVote.when + << " secs ago"; + } + else { _lastVote.when = now; _lastVote.who = whoid; vote = _selfConfig().getNumVotes(); invariant( _currentConfig.getMemberAt(hopefulIndex).getId() == whoid ); log() << "replSet info voting yea for " << _currentConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() - << " (" << whoid << ')' << rsLog; - } - catch(VoteException&) { - log() << "replSet voting no for " - << _currentConfig.getMemberAt(hopefulIndex).getHostAndPort().toString() - << " already voted for another" << rsLog; + << " (" << whoid << ')'; } } @@ -455,6 +443,7 @@ namespace repl { const ReplicationExecutor::CallbackData& data, Date_t now, const BSONObj& cmdObj, + const std::string& ourSetName, BSONObjBuilder* resultObj, Status* result) { if (data.status == ErrorCodes::CallbackCanceled) { @@ -469,12 +458,10 @@ namespace repl { // Verify that replica set names match std::string rshb = std::string(cmdObj.getStringField("replSetHeartbeat")); - const ReplSettings& replSettings = getGlobalReplicationCoordinator()->getSettings(); - if (replSettings.ourSetName() != rshb) { + if (ourSetName != rshb) { *result = Status(ErrorCodes::BadValue, "repl set names do not match"); - log() << "replSet set names do not match, our cmdline: " << replSettings.replSet - << rsLog; - log() << "replSet rshb: " << rshb << rsLog; + log() << "replSet set names do not match, ours: " << ourSetName << + "; remote node's: " << rshb; resultObj->append("mismatch", true); return; } @@ -613,7 +600,7 @@ namespace repl { << " and " << (latestOp - _hbdata[highestPriorityIndex].getOpTime().getSecs()) << " seconds behind"; - +/* logic -- // Are we primary? if (isSelf(_currentConfig.getMemberAt(_currentPrimaryIndex).getHostAndPort())) { // replSetStepDown tries to acquire the same lock @@ -625,24 +612,24 @@ namespace repl { else { // We are not primary. Step down the remote node. BSONObj cmd = BSON( "replSetStepDown" << 1 ); -/* ScopedConn conn(primary->fullName()); - BSONObj result; - // XXX Eric: schedule stepdown command - - try { - if (!conn.runCommand("admin", cmd, result, 0)) { - log() << "stepping down " << primary->fullName() - << " failed: " << result << endl; - } - } - catch (DBException &e) { - log() << "stepping down " << primary->fullName() << " threw exception: " - << e.toString() << endl; - } + ScopedConn conn(primary->fullName()); + BSONObj result; + // XXX Eric: schedule stepdown command + + try { + if (!conn.runCommand("admin", cmd, result, 0)) { + log() << "stepping down " << primary->fullName() + << " failed: " << result << endl; + } + } + catch (DBException &e) { + log() << "stepping down " << primary->fullName() << " threw exception: " + << e.toString() << endl; + } */ return StepDown; - } + } } @@ -672,7 +659,7 @@ namespace repl { log() << "replset error could not reach/authenticate against any members"; if (_currentPrimaryIndex == _selfIndex) { - log() << "auth problems, relinquishing primary" << rsLog; + log() << "auth problems, relinquishing primary"; // XXX Eric: schedule relinquish //rs->relinquish(); @@ -712,7 +699,7 @@ namespace repl { if (remotePrimaryIndex != -1) { // two other nodes think they are primary (asynchronously polled) // -- wait for things to settle down. - log() << "replSet info two primaries (transiently)" << rsLog; + log() << "replSet info two primaries (transiently)"; return None; } remotePrimaryIndex = it->getConfigIndex(); @@ -767,7 +754,7 @@ namespace repl { fassert(18505, _currentPrimaryIndex == _selfIndex); if (_shouldRelinquish()) { - log() << "can't see a majority of the set, relinquishing primary" << rsLog; + log() << "can't see a majority of the set, relinquishing primary"; // XXX Eric: schedule a relinquish //rs->relinquish(); return StepDown; @@ -795,7 +782,7 @@ namespace repl { int ll = 0; if( ++n > 5 ) ll++; if( last + 60 > now ) ll++; - LOG(ll) << "replSet can't see a majority, will not try to elect self" << rsLog; + LOG(ll) << "replSet can't see a majority, will not try to elect self"; last = now; return None; } @@ -823,7 +810,7 @@ namespace repl { requeue(); } catch(...) { - log() << "replSet error unexpected assertion in rs manager" << rsLog; + log() << "replSet error unexpected assertion in rs manager"; } } @@ -860,7 +847,7 @@ namespace repl { } if( vTot % 2 == 0 && vTot && complain++ == 0 ) log() << "replSet warning: even number of voting members in replica set config - " - "add an arbiter or set votes to 0 on one of the existing members" << rsLog; + "add an arbiter or set votes to 0 on one of the existing members"; return vTot; } @@ -910,7 +897,7 @@ namespace repl { return; } _memberState = newMemberState; - log() << "replSet " << _memberState.toString() << rsLog; + log() << "replSet " << _memberState.toString(); for (std::vector<StateChangeCallbackFn>::const_iterator it = _stateChangeCallbacks.begin(); it != _stateChangeCallbacks.end(); ++it) { @@ -1035,7 +1022,7 @@ namespace repl { if (secs == 0) { _stepDownUntil = now; - log() << "replSet info 'unfreezing'" << rsLog; + log() << "replSet info 'unfreezing'"; result.append("info","unfreezing"); } else { @@ -1044,10 +1031,10 @@ namespace repl { if (_memberState != MemberState::RS_PRIMARY) { _stepDownUntil = now + secs; - log() << "replSet info 'freezing' for " << secs << " seconds" << rsLog; + log() << "replSet info 'freezing' for " << secs << " seconds"; } else { - log() << "replSet info received freeze command but we are primary" << rsLog; + log() << "replSet info received freeze command but we are primary"; } } } @@ -1106,7 +1093,7 @@ namespace repl { } if (!s.empty()) { lastLogged = _hbmsgTime; - LOG(logLevel) << "replSet " << s << rsLog; + LOG(logLevel) << "replSet " << s; } } diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index 411bfc6fa25..63a7bc5eb73 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -48,7 +48,7 @@ namespace repl { class TopologyCoordinatorImpl : public TopologyCoordinator { public: - TopologyCoordinatorImpl(); + explicit TopologyCoordinatorImpl(int maxSyncSourceLagSecs); virtual ~TopologyCoordinatorImpl() {}; virtual void setLastApplied(const OpTime& optime); @@ -59,12 +59,12 @@ namespace repl { // Looks up syncSource's address and returns it, for use by the Applier virtual HostAndPort getSyncSourceAddress() const; // Chooses and sets a new sync source, based on our current knowledge of the world - virtual void chooseNewSyncSource(Date_t now); // this is basically getMemberToSyncTo() - // Do not choose a member as a sync source until time given; + virtual void chooseNewSyncSource(Date_t now); + // Does not choose a member as a sync source until time given; // call this when we have reason to believe it's a bad choice virtual void blacklistSyncSource(const HostAndPort& host, Date_t until); - // Add function pointer to callback list; call function when config changes + // Adds function pointer to callback list; calls function when config changes // Applier needs to know when things like chainingAllowed or slaveDelay change. // ReplCoord needs to know when things like the tag sets change. virtual void registerConfigChangeCallback(const ConfigChangeCallbackFn& fn); @@ -74,44 +74,45 @@ namespace repl { // Applier calls this to notify that it's now safe to transition from SECONDARY to PRIMARY virtual void signalDrainComplete(); - // produce a reply to a RAFT-style RequestVote RPC; this is MongoDB ReplSetFresh command + // produces a reply to a RAFT-style RequestVote RPC virtual void prepareRequestVoteResponse(const Date_t now, const BSONObj& cmdObj, std::string& errmsg, BSONObjBuilder& result); - // produce a reply to a received electCmd + // produces a reply to a received electCmd virtual void prepareElectCmdResponse(const Date_t now, const BSONObj& cmdObj, BSONObjBuilder& result); - // produce a reply to a heartbeat + // produces a reply to a heartbeat virtual void prepareHeartbeatResponse(const ReplicationExecutor::CallbackData& data, Date_t now, const BSONObj& cmdObj, + const std::string& ourSetName, BSONObjBuilder* resultObj, Status* result); - // update internal state with heartbeat response + // updates internal state with heartbeat response HeartbeatResultAction updateHeartbeatData(Date_t now, const MemberHeartbeatData& newInfo, int id); - // produce a reply to a status request + // produces a reply to a status request virtual void prepareStatusResponse(Date_t now, const BSONObj& cmdObj, BSONObjBuilder& result, unsigned uptime); - // produce a reply to a freeze request + // produces a reply to a freeze request virtual void prepareFreezeResponse(Date_t now, const BSONObj& cmdObj, BSONObjBuilder& result); - // transition PRIMARY to SECONDARY; caller must already be holding an appropriate dblock + // transitions PRIMARY to SECONDARY; caller must already be holding an appropriate dblock virtual void relinquishPrimary(OperationContext* txn); - // update internal config with new config (already validated) + // updates internal config with new config (already validated) virtual void updateConfig(const ReplicaSetConfig& newConfig, int selfIndex, Date_t now); private: @@ -126,23 +127,23 @@ namespace repl { // Logic to determine if we should step down as primary bool _shouldRelinquish() const; - // See if a majority number of votes are held by members who are currently "up" + // Sees if a majority number of votes are held by members who are currently "up" bool _aMajoritySeemsToBeUp() const; - // Return the total number of votes in the current config + // Returns the total number of votes in the current config int _totalVotes() const; - // Scan through all members that are 'up' and return the latest known optime + // Scans through all members that are 'up' and return the latest known optime OpTime _latestKnownOpTime() const; - // Begin election proceedings + // Begins election proceedings void _electSelf(Date_t now); // Scans the electable set and returns the highest priority member index int _getHighestPriorityElectableIndex() const; - // Change _memberState, if state is different from _memberState. - // Call all registered callbacks for state changes. + // Changes _memberState to newMemberState, then calls all registered callbacks + // for state changes. void _changeMemberState(const MemberState& newMemberState); OpTime _lastApplied; // the last op that the applier has actually written to the data @@ -172,6 +173,8 @@ namespace repl { std::map<HostAndPort, Date_t> _syncSourceBlacklist; // The next sync source to be chosen, requested via a replSetSyncFrom command int _forceSyncSourceIndex; + // How far this node must fall behind before considering switching sync sources + int _maxSyncSourceLagSecs; // insanity follows diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp new file mode 100644 index 00000000000..4912c88c56d --- /dev/null +++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <limits> + +#include "mongo/db/repl/topology_coordinator_impl.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/assert_util.h" +#include "mongo/util/net/hostandport.h" +#include "mongo/util/time_support.h" + +namespace mongo { +namespace repl { + +namespace { + + ReplicaSetConfig startConfig; + + TEST(TopologyCoordinator, ForceSyncSource) { + TopologyCoordinatorImpl topocoord(std::numeric_limits<int>::max()); + Date_t now = 0; + topocoord.updateConfig(startConfig, 0, now++); + topocoord.chooseNewSyncSource(now++); + // ASSERT(getSyncSourceAddress() == HostAndPort(expected one); + topocoord.setForceSyncSourceIndex(2); + topocoord.chooseNewSyncSource(now++); + // ASSERT(getSyncSourceAddress() == HostAndPort(expected one / id #2); + } + + TEST(TopologyCoordinator, BlacklistSyncSource) { + TopologyCoordinatorImpl topocoord(std::numeric_limits<int>::max()); + Date_t now = 0; + topocoord.updateConfig(startConfig, 0, now++); + topocoord.chooseNewSyncSource(now++); + // ASSERT(getSyncSourceAddress() == HostAndPort(expected one); + + Date_t expireTime = 100; + topocoord.blacklistSyncSource(HostAndPort() /* the current one */, expireTime); + topocoord.chooseNewSyncSource(now++); + // ASSERT(getSyncSourceAddress() == HostAndPort(expected one); + topocoord.chooseNewSyncSource(expireTime); + // ASSERT(getSyncSourceAddress() == HostAndPort(blacklisted one); + } + +} // namespace +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/topology_coordinator_mock.cpp b/src/mongo/db/repl/topology_coordinator_mock.cpp index ad786edf24e..f74751dba03 100644 --- a/src/mongo/db/repl/topology_coordinator_mock.cpp +++ b/src/mongo/db/repl/topology_coordinator_mock.cpp @@ -69,6 +69,7 @@ namespace repl { void TopologyCoordinatorMock::prepareHeartbeatResponse(const ReplicationExecutor::CallbackData&, Date_t now, const BSONObj& cmdObj, + const std::string& ourSetName, BSONObjBuilder* resultObj, Status* result) { } diff --git a/src/mongo/db/repl/topology_coordinator_mock.h b/src/mongo/db/repl/topology_coordinator_mock.h index 7fd7c17bd5f..7e27eeb8261 100644 --- a/src/mongo/db/repl/topology_coordinator_mock.h +++ b/src/mongo/db/repl/topology_coordinator_mock.h @@ -70,6 +70,7 @@ namespace repl { virtual void prepareHeartbeatResponse(const ReplicationExecutor::CallbackData& data, Date_t now, const BSONObj& cmdObj, + const std::string& ourSetName, BSONObjBuilder* resultObj, Status* result); |