diff options
author | Eric Milkie <milkie@10gen.com> | 2014-09-08 11:37:42 -0400 |
---|---|---|
committer | Eric Milkie <milkie@10gen.com> | 2014-09-12 10:39:11 -0400 |
commit | ff1ee391747092e2d03765402c6ab25ba7e1d538 (patch) | |
tree | d0650ad040b4b63ed75de9a0d5a349558dd8658f /src/mongo | |
parent | ce737ebed71bc4485180b86832e907d820858664 (diff) | |
download | mongo-ff1ee391747092e2d03765402c6ab25ba7e1d538.tar.gz |
SERVER-15089 chooseNewSyncSource hooked up in replication Applier
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 155 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.h | 24 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator.h | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_hybrid.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_hybrid.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl.cpp | 58 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_impl.h | 40 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_legacy.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_legacy.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_mock.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/repl_coordinator_mock.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.cpp | 63 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl.h | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/topology_coordinator_impl_test.cpp | 6 |
15 files changed, 336 insertions, 74 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 0e3ad68352b..1a941483984 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -30,20 +30,23 @@ #include "mongo/platform/basic.h" +#include "mongo/db/repl/bgsync.h" + +#include "mongo/base/counter.h" #include "mongo/db/client.h" #include "mongo/db/commands/fsync.h" #include "mongo/db/commands/server_status_metric.h" +#include "mongo/db/dbhelpers.h" #include "mongo/db/operation_context_impl.h" -#include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_coordinator_global.h" -#include "mongo/db/repl/rs_sync.h" +#include "mongo/db/repl/repl_coordinator_impl.h" #include "mongo/db/repl/rs.h" +#include "mongo/db/repl/rs_sync.h" #include "mongo/db/repl/rslog.h" +#include "mongo/db/stats/timer_stats.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" -#include "mongo/base/counter.h" -#include "mongo/db/stats/timer_stats.h" namespace mongo { @@ -98,7 +101,8 @@ namespace repl { _pause(true), _appliedBuffer(true), _assumingPrimary(false), - _currentSyncTarget(NULL) { + _currentSyncTarget(NULL), + _replCoord(getGlobalReplicationCoordinator()) { } BackgroundSync* BackgroundSync::get() { @@ -188,14 +192,37 @@ namespace repl { // this oplog reader does not do a handshake because we don't want the server it's syncing // from to track how far it has synced OplogReader r; - OpTime lastOpTimeFetched; - // find a target to sync from the last op time written - getOplogReader(txn, r); - // no server found { boost::unique_lock<boost::mutex> lock(_mutex); + if (_lastOpTimeFetched.isNull()) { + // then we're initial syncing and we're still waiting for this to be set + _currentSyncTarget = NULL; + lock.unlock(); + sleepsecs(1); + // if there is no one to sync from + return; + } + // Wait until we've applied the ops we have before we choose a sync target + while (!_appliedBuffer) { + _condvar.wait(lock); + } + } + + while (MONGO_FAIL_POINT(rsBgSyncProduce)) { + sleepmillis(0); + } + + + // find a target to sync from the last op time written + _replCoord->connectOplogReader(txn, this, &r); + + OpTime lastOpTimeFetched; + + { + boost::unique_lock<boost::mutex> lock(_mutex); + // no server found if (_currentSyncTarget == NULL) { lock.unlock(); sleepsecs(1); @@ -364,43 +391,25 @@ namespace repl { return true; } - void BackgroundSync::getOplogReader(OperationContext* txn, OplogReader& r) { + void BackgroundSync::getOplogReaderLegacy(OperationContext* txn, OplogReader* r) { const Member *target = NULL, *stale = NULL; BSONObj oldest; - { - boost::unique_lock<boost::mutex> lock(_mutex); - if (_lastOpTimeFetched.isNull()) { - // then we're initial syncing and we're still waiting for this to be set - _currentSyncTarget = NULL; - return; - } - - // Wait until we've applied the ops we have before we choose a sync target - while (!_appliedBuffer) { - _condvar.wait(lock); - } - } - - while (MONGO_FAIL_POINT(rsBgSyncProduce)) { - sleepmillis(0); - } - - verify(r.conn() == NULL); + verify(r->conn() == NULL); while ((target = theReplSet->getMemberToSyncTo()) != NULL) { string current = target->fullName(); - if (!r.connect(target->h())) { + if (!r->connect(target->h())) { LOG(2) << "replSet can't connect to " << current << " to read operations" << rsLog; - r.resetConnection(); + r->resetConnection(); theReplSet->veto(current); sleepsecs(1); continue; } - if (isStale(r, oldest)) { - r.resetConnection(); + if (isStale(*r, oldest)) { + r->resetConnection(); theReplSet->veto(current, 600); stale = target; continue; @@ -426,6 +435,86 @@ namespace repl { boost::unique_lock<boost::mutex> lock(_mutex); _currentSyncTarget = NULL; } + + } + + void BackgroundSync::connectOplogReader(OperationContext* txn, + ReplicationCoordinatorImpl* replCoordImpl, + OplogReader* reader) { + OpTime lastOpTimeFetched; + { + boost::unique_lock<boost::mutex> lock(_mutex); + lastOpTimeFetched = _lastOpTimeFetched; + } + Date_t now(curTimeMillis64()); + OpTime oldestOpTimeSeen(now,0); + + while (true) { + HostAndPort candidate = replCoordImpl->chooseNewSyncSource(); + + if (candidate.empty()) { + if (oldestOpTimeSeen == OpTime(now,0)) { + // If, in this invocation of connectOplogReader(), we did not successfully + // connect to any node ahead of us, + // we apparently have no sync sources to connect to. + // This situation is common; e.g. if there are no writes to the primary at + // the moment. + return; + } + + // Connected to at least one member, but in all cases we were too stale to use them + // as a sync source. + log() << "replSet error RS102 too stale to catch up" << rsLog; + log() << "replSet our last optime : " << lastOpTimeFetched.toStringLong() << rsLog; + log() << "replSet oldest available is " << oldestOpTimeSeen.toStringLong() << + rsLog; + log() << "replSet " + "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember" + << rsLog; + sethbmsg("error RS102 too stale to catch up"); + theReplSet->setMinValid(txn, oldestOpTimeSeen); + replCoordImpl->setFollowerMode(MemberState::RS_RECOVERING); + return; + } + + if (!reader->connect(candidate)) { + LOG(2) << "replSet can't connect to " << candidate.toString() << + " to read operations" << rsLog; + reader->resetConnection(); + replCoordImpl->blacklistSyncSource(candidate, Date_t(curTimeMillis64() + 10*1000)); + continue; + } + // Read the first (oldest) op and confirm that it's not newer than our last + // fetched op. Otherwise, we have fallen off the back of that source's oplog. + BSONObj remoteOldestOp(reader->findOne(rsoplog, Query())); + BSONElement tsElem(remoteOldestOp["ts"]); + if (tsElem.type() != Timestamp) { + // This member's got a bad op in its oplog. + warning() << "oplog invalid format on node " << candidate.toString(); + reader->resetConnection(); + replCoordImpl->blacklistSyncSource(candidate, + Date_t(curTimeMillis64() + 600*1000)); + continue; + } + OpTime remoteOldOpTime = tsElem._opTime(); + + if (lastOpTimeFetched < remoteOldOpTime) { + // We're too stale to use this sync source. + reader->resetConnection(); + replCoordImpl->blacklistSyncSource(candidate, + Date_t(curTimeMillis64() + 600*1000)); + if (oldestOpTimeSeen > remoteOldOpTime) { + warning() << "we are too stale to use " << candidate.toString() << + " as a sync source"; + oldestOpTimeSeen = remoteOldOpTime; + } + continue; + } + + // Got a valid sync source. + return; + } // while (true) + } bool BackgroundSync::isRollbackRequired(OperationContext* txn, OplogReader& r) { diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index e8886f1b722..4a8a14a7bc3 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -32,12 +32,15 @@ #include "mongo/util/queue.h" #include "mongo/db/repl/oplogreader.h" -#include "mongo/db/repl/rs.h" #include "mongo/db/jsobj.h" namespace mongo { namespace repl { + class Member; + class ReplicationCoordinator; + class ReplicationCoordinatorImpl; + // This interface exists to facilitate easier testing; // the test infrastructure implements these functions with stubs. class BackgroundSyncInterface { @@ -71,6 +74,7 @@ namespace repl { * 3. BackgroundSync::_mutex */ class BackgroundSync : public BackgroundSyncInterface { + private: static BackgroundSync *s_instance; // protects creation of s_instance static boost::mutex s_mutex; @@ -101,7 +105,7 @@ namespace repl { void produce(OperationContext* txn); // Check if rollback is necessary bool isRollbackRequired(OperationContext* txn, OplogReader& r); - void getOplogReader(OperationContext* txn, OplogReader& r); + // Evaluate if the current sync target is still good bool shouldChangeSyncTarget(); // check lastOpTimeWritten against the remote's earliest op, filling in remoteOldestOp. @@ -111,6 +115,9 @@ namespace repl { // restart syncing void start(); + // A pointer to the replication coordinator running the show. + ReplicationCoordinator* _replCoord; + public: bool isAssumingPrimary(); @@ -138,6 +145,19 @@ namespace repl { // Wait for replication to finish and buffer to be applied so that the member can become // primary. void stopReplicationAndFlushBuffer(); + + /** + * Connects an oplog reader to a viable sync source. Legacy uses getOplogReaderLegacy(), + * which sets _currentSyncTarget as a side effect. + * connectOplogReader() is used in new replication. + * Both functions can affect the TopoCoord's blacklist of sync sources, and may set + * our minValid value, durably, if we detect we haven fallen off the back of all sync + * sources' oplogs. + **/ + void getOplogReaderLegacy(OperationContext* txn, OplogReader* reader); + void connectOplogReader(OperationContext* txn, + ReplicationCoordinatorImpl* replCoordImpl, + OplogReader* reader); }; diff --git a/src/mongo/db/repl/repl_coordinator.h b/src/mongo/db/repl/repl_coordinator.h index ae242cbeb03..77e57f4f66d 100644 --- a/src/mongo/db/repl/repl_coordinator.h +++ b/src/mongo/db/repl/repl_coordinator.h @@ -49,7 +49,9 @@ namespace mongo { namespace repl { + class BackgroundSync; class HandshakeArgs; + class OplogReader; class ReplSetHeartbeatArgs; class ReplSetHeartbeatResponse; class UpdatePositionArgs; @@ -460,6 +462,16 @@ namespace repl { */ virtual Status checkReplEnabledForCommand(BSONObjBuilder* result) = 0; + /** + * Connects an oplog reader to a viable sync source, using BackgroundSync object bgsync. + * When this function returns, reader is connected to a viable sync source or is left + * unconnected if no sync sources are currently available. In legacy, bgsync's + * _currentSyncTarget is also set appropriately. + **/ + virtual void connectOplogReader(OperationContext* txn, + BackgroundSync* bgsync, + OplogReader* reader) = 0; + protected: ReplicationCoordinator(); diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.cpp b/src/mongo/db/repl/repl_coordinator_hybrid.cpp index 7489a0ad43f..2c5cddbf9dc 100644 --- a/src/mongo/db/repl/repl_coordinator_hybrid.cpp +++ b/src/mongo/db/repl/repl_coordinator_hybrid.cpp @@ -33,6 +33,7 @@ #include "mongo/db/repl/repl_coordinator_hybrid.h" #include "mongo/db/global_environment_experiment.h" +#include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/isself.h" #include "mongo/db/repl/network_interface_impl.h" #include "mongo/db/repl/repl_coordinator_external_state_impl.h" @@ -407,6 +408,20 @@ namespace repl { return legacyResponse; } + void HybridReplicationCoordinator::connectOplogReader(OperationContext* txn, + BackgroundSync* bgsync, + OplogReader* r) { + _legacy.connectOplogReader(txn, bgsync, r); + HostAndPort legacySyncSource = r->getHost(); + bgsync->connectOplogReader(txn, &_impl, r); + HostAndPort implSyncSource = r->getHost(); + if (legacySyncSource != implSyncSource) { + severe() << "sync source mismatch between legacy and impl: " << + legacySyncSource.toString() << " and " << implSyncSource.toString(); + fassertFailed(18742); + } + } + void HybridReplicationCoordinator::setImplConfigHack(const ReplSetConfig* config) { int myIndex = -1; for (size_t i = 0; i < config->members.size(); ++i) { // find my index in the config diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.h b/src/mongo/db/repl/repl_coordinator_hybrid.h index 0e15b27505c..cede8b8c9fb 100644 --- a/src/mongo/db/repl/repl_coordinator_hybrid.h +++ b/src/mongo/db/repl/repl_coordinator_hybrid.h @@ -161,6 +161,10 @@ namespace repl { virtual bool isReplEnabled() const; + virtual void connectOplogReader(OperationContext* txn, + BackgroundSync* bgsync, + OplogReader* r); + /** * This is a temporary hack to force _impl to set its replset config to the one loaded by * _legacy. diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp index 5921b878833..8d12a1d5c2c 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.cpp +++ b/src/mongo/db/repl/repl_coordinator_impl.cpp @@ -40,6 +40,7 @@ #include "mongo/db/repl/check_quorum_for_config_change.h" #include "mongo/db/repl/handshake_args.h" #include "mongo/db/repl/master_slave.h" +#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_response.h" #include "mongo/db/repl/repl_settings.h" @@ -1110,6 +1111,7 @@ namespace { if (_rsConfig.isInitialized()) { cancelHeartbeats(); } + OpTime lastOpApplied(_getLastOpApplied_inlock()); _setConfigState_inlock(kConfigSteady); _rsConfig = newConfig; _thisMembersConfigIndex = myIndex; @@ -1117,7 +1119,7 @@ namespace { newConfig, myIndex, _replExecutor.now(), - _getLastOpApplied_inlock()); + lastOpApplied); _currentState = _topCoord->getMemberState(); // Ensure that there's an entry in the _slaveInfoMap for ourself @@ -1263,5 +1265,59 @@ namespace { return _settings.usingReplSets() || _settings.master || _settings.slave; } + void ReplicationCoordinatorImpl::connectOplogReader(OperationContext* txn, + BackgroundSync* bgsync, + OplogReader* r) { + invariant(false); + } + + void ReplicationCoordinatorImpl::_chooseNewSyncSource( + const ReplicationExecutor::CallbackData& cbData, + HostAndPort* newSyncSource) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + return; + } + *newSyncSource = _topCoord->chooseNewSyncSource(_replExecutor.now(), _getLastOpApplied()); + } + + HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource() { + HostAndPort newSyncSource; + CBHStatus cbh = _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_chooseNewSyncSource, + this, + stdx::placeholders::_1, + &newSyncSource)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return newSyncSource; // empty + } + fassert(18740, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); + return newSyncSource; + } + + void ReplicationCoordinatorImpl::_blacklistSyncSource( + const ReplicationExecutor::CallbackData& cbData, + const HostAndPort& host, + Date_t until) { + if (cbData.status == ErrorCodes::CallbackCanceled) { + return; + } + _topCoord->blacklistSyncSource(host, until); + } + + void ReplicationCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) { + CBHStatus cbh = _replExecutor.scheduleWork( + stdx::bind(&ReplicationCoordinatorImpl::_blacklistSyncSource, + this, + stdx::placeholders::_1, + host, + until)); + if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) { + return; + } + fassert(18741, cbh.getStatus()); + _replExecutor.wait(cbh.getValue()); + } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/repl_coordinator_impl.h b/src/mongo/db/repl/repl_coordinator_impl.h index d90738508be..7ecb7022f62 100644 --- a/src/mongo/db/repl/repl_coordinator_impl.h +++ b/src/mongo/db/repl/repl_coordinator_impl.h @@ -53,6 +53,7 @@ namespace mongo { namespace repl { + class OplogReader; class SyncSourceFeedback; class TopologyCoordinator; @@ -194,6 +195,11 @@ namespace repl { virtual bool isReplEnabled() const; + virtual void connectOplogReader(OperationContext* txn, + BackgroundSync* bgsync, + OplogReader* r); + + // ================== Members of replication code internal API =================== // This is a temporary hack to set the replset config to the config detected by the @@ -217,6 +223,21 @@ namespace repl { */ void cancelHeartbeats(); + /** + * Chooses a sync source. + * A wrapper that schedules _chooseNewSyncSource() through the Replication Executor and + * waits for its completion. + */ + HostAndPort chooseNewSyncSource(); + + /** + * Blacklists 'host' until 'until'. + * A wrapper that schedules _blacklistSyncSource() through the Replication Executor and + * waits for its completion. + */ + void blacklistSyncSource(const HostAndPort& host, Date_t until); + + // ================== Test support API =================== /** @@ -459,6 +480,25 @@ namespace repl { void _onElectCmdRunnerComplete(const ReplicationExecutor::CallbackData& cbData, const ReplicationExecutor::EventHandle& finishEvh); + /** + * Chooses a new sync source. Must be scheduled as a callback. + * + * Calls into the Topology Coordinator, which uses its current view of the set to choose + * the most appropriate sync source. + */ + void _chooseNewSyncSource(const ReplicationExecutor::CallbackData& cbData, + HostAndPort* newSyncSource); + + /** + * Adds 'host' to the sync source blacklist until 'until'. A blacklisted source cannot + * be chosen as a sync source. + * + * Must be scheduled as a callback. + */ + void _blacklistSyncSource(const ReplicationExecutor::CallbackData& cbData, + const HostAndPort& host, + Date_t until); + // // All member variables are labeled with one of the following codes indicating the diff --git a/src/mongo/db/repl/repl_coordinator_legacy.cpp b/src/mongo/db/repl/repl_coordinator_legacy.cpp index 38ffa126e94..aa5eb6a4251 100644 --- a/src/mongo/db/repl/repl_coordinator_legacy.cpp +++ b/src/mongo/db/repl/repl_coordinator_legacy.cpp @@ -981,5 +981,13 @@ namespace { return _settings.usingReplSets() || _settings.slave || _settings.master; } + void LegacyReplicationCoordinator::connectOplogReader(OperationContext* txn, + BackgroundSync* bgsync, + OplogReader* r) { + bgsync->getOplogReaderLegacy(txn, r); + } + + + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/repl_coordinator_legacy.h b/src/mongo/db/repl/repl_coordinator_legacy.h index 250e0fa1c29..415a8cfe175 100644 --- a/src/mongo/db/repl/repl_coordinator_legacy.h +++ b/src/mongo/db/repl/repl_coordinator_legacy.h @@ -157,6 +157,10 @@ namespace repl { virtual bool isReplEnabled() const; + virtual void connectOplogReader(OperationContext* txn, + BackgroundSync* bgsync, + OplogReader* r); + private: // Mutex that protects the _slaveOpTimeMap diff --git a/src/mongo/db/repl/repl_coordinator_mock.cpp b/src/mongo/db/repl/repl_coordinator_mock.cpp index 972568c46ab..6c4e5828595 100644 --- a/src/mongo/db/repl/repl_coordinator_mock.cpp +++ b/src/mongo/db/repl/repl_coordinator_mock.cpp @@ -250,5 +250,11 @@ namespace repl { return Status::OK(); } + void ReplicationCoordinatorMock::connectOplogReader(OperationContext* txn, + BackgroundSync* bgsync, + OplogReader* r) { + invariant(false); + } + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/repl_coordinator_mock.h b/src/mongo/db/repl/repl_coordinator_mock.h index 2f9036df0a4..53f045c4443 100644 --- a/src/mongo/db/repl/repl_coordinator_mock.h +++ b/src/mongo/db/repl/repl_coordinator_mock.h @@ -156,6 +156,10 @@ namespace repl { virtual Status checkReplEnabledForCommand(BSONObjBuilder* result); + virtual void connectOplogReader(OperationContext* txn, + BackgroundSync* bgsync, + OplogReader* r); + private: ReplSettings _settings; diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h index 4aa76f8e107..83ed1c80b2a 100644 --- a/src/mongo/db/repl/topology_coordinator.h +++ b/src/mongo/db/repl/topology_coordinator.h @@ -112,7 +112,7 @@ namespace repl { /** * Chooses and sets a new sync source, based on our current knowledge of the world. */ - virtual void chooseNewSyncSource(Date_t now, const OpTime& lastOpApplied) = 0; + virtual HostAndPort chooseNewSyncSource(Date_t now, const OpTime& lastOpApplied) = 0; /** * Suppresses selecting "host" as sync source until "until". diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp index f531ee2c241..55d72db622e 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl.cpp @@ -88,7 +88,6 @@ namespace { TopologyCoordinatorImpl::TopologyCoordinatorImpl(Seconds maxSyncSourceLagSecs) : _role(Role::follower), _currentPrimaryIndex(-1), - _syncSourceIndex(-1), _forceSyncSourceIndex(-1), _maxSyncSourceLagSecs(maxSyncSourceLagSecs), _selfIndex(-1), @@ -109,22 +108,20 @@ namespace { } HostAndPort TopologyCoordinatorImpl::getSyncSourceAddress() const { - if (_syncSourceIndex == -1) { - return HostAndPort(); - } - return _currentConfig.getMemberAt(_syncSourceIndex).getHostAndPort(); + return _syncSource; } - void TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now, const OpTime& lastOpApplied) { + HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now, + const OpTime& lastOpApplied) { + // if we have a target we've requested to sync from, use it if (_forceSyncSourceIndex != -1) { invariant(_forceSyncSourceIndex < _currentConfig.getNumMembers()); - _syncSourceIndex = _forceSyncSourceIndex; + _syncSource = _currentConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort(); _forceSyncSourceIndex = -1; - _sethbmsg( str::stream() << "syncing from: " - << _currentConfig.getMemberAt(_syncSourceIndex).getHostAndPort().toString() - << " by request", 0); - return; + _sethbmsg(str::stream() << "syncing from: " << _syncSource.toString() << " by request", + 0); + return _syncSource; } // wait for 2N pings (not counting ourselves) before choosing a sync target @@ -133,14 +130,20 @@ namespace { if (needMorePings > 0) { OCCASIONALLY log() << "waiting for " << needMorePings << " pings from other members before syncing"; - return; + _syncSource = HostAndPort(); + return _syncSource; } // If we are only allowed to sync from the primary, set that if (!_currentConfig.isChainingAllowed()) { - // Sets -1 if there is no current primary - _syncSourceIndex = _currentPrimaryIndex; - return; + if (_currentPrimaryIndex == -1) { + _syncSource = HostAndPort(); + return _syncSource; + } + else { + _syncSource = _currentConfig.getMemberAt(_currentPrimaryIndex).getHostAndPort(); + return _syncSource; + } } // find the member with the lowest ping time that is ahead of me @@ -148,11 +151,13 @@ namespace { // Find primary's oplog time. Reject sync candidates that are more than // maxSyncSourceLagSecs seconds behind. OpTime primaryOpTime; - if (_currentPrimaryIndex != -1) + if (_currentPrimaryIndex != -1) { primaryOpTime = _hbdata[_currentPrimaryIndex].getOpTime(); - else + } + else { // choose a time that will exclude no candidates, since we don't see a primary primaryOpTime = OpTime(_maxSyncSourceLagSecs.total_seconds(), 0); + } if (primaryOpTime.getSecs() < static_cast<unsigned int>(_maxSyncSourceLagSecs.total_seconds())) { @@ -243,14 +248,14 @@ namespace { if (closestIndex == -1) { // Did not find any members to sync from - _syncSourceIndex = -1; - return; + _syncSource = HostAndPort(); + return _syncSource; } - std::string msg(str::stream() << "syncing to: " << - _currentConfig.getMemberAt(closestIndex).getHostAndPort().toString(), 0); + _syncSource = _currentConfig.getMemberAt(closestIndex).getHostAndPort(); + std::string msg(str::stream() << "syncing to: " << _syncSource.toString(), 0); _sethbmsg(msg); log() << msg; - _syncSourceIndex = closestIndex; + return _syncSource; } void TopologyCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) { @@ -585,9 +590,8 @@ namespace { response->setTime(Seconds(Milliseconds(now.asInt64()).total_seconds())); response->setOpTime(lastOpApplied.asDate()); - if (_syncSourceIndex != -1) { - response->setSyncingTo( - _currentConfig.getMemberAt(_syncSourceIndex).getHostAndPort().toString()); + if (!_syncSource.empty()) { + response->setSyncingTo(_syncSource.toString()); } long long v = _currentConfig.getConfigVersion(); @@ -1196,9 +1200,8 @@ namespace { response->append("myState", myState.s); // Add sync source info - if ((_syncSourceIndex != -1) && !myState.primary() && !myState.removed()) { - response->append("syncingTo", _currentConfig.getMemberAt(_syncSourceIndex) - .getHostAndPort().toString()); + if (!_syncSource.empty() && !myState.primary() && !myState.removed()) { + response->append("syncingTo", _syncSource.toString()); } response->append("members", membersOut); @@ -1260,7 +1263,6 @@ namespace { _hbdata.clear(); _role = Role::follower; _currentPrimaryIndex = -1; - _syncSourceIndex = -1; _forceSyncSourceIndex = -1; _selfIndex = selfIndex; @@ -1294,7 +1296,7 @@ namespace { // we're electable, we must be the leader. _role = Role::leader; } - chooseNewSyncSource(now, lastOpApplied); + } // TODO(emilkie): Better story for heartbeat message handling. @@ -1472,7 +1474,6 @@ namespace { OpTime myLastOpApplied, OpTime electionOpTime) { invariant(_role == Role::candidate); - _syncSourceIndex = -1; _electionTime = electionOpTime; _electionId = electionId; _role = Role::leader; diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h index 1f5a167b278..4ea27133a4d 100644 --- a/src/mongo/db/repl/topology_coordinator_impl.h +++ b/src/mongo/db/repl/topology_coordinator_impl.h @@ -127,7 +127,8 @@ namespace repl { virtual std::vector<HostAndPort> getMaybeUpHostAndPorts() const; virtual int getMaintenanceCount() const; virtual void setForceSyncSourceIndex(int index); - virtual void chooseNewSyncSource(Date_t now, const OpTime& lastOpApplied); + virtual HostAndPort chooseNewSyncSource(Date_t now, + const OpTime& lastOpApplied); virtual void blacklistSyncSource(const HostAndPort& host, Date_t until); virtual void setStepDownTime(Date_t newTime); virtual void setFollowerMode(MemberState::MS newMode); @@ -293,9 +294,9 @@ namespace repl { // the member we currently believe is primary, if one exists int _currentPrimaryIndex; - // the member we are currently syncing from - // -1 if no sync source (we are primary, or we cannot connect to anyone yet) - int _syncSourceIndex; + // the hostandport we are currently syncing from + // empty if no sync source (we are primary, or we cannot connect to anyone yet) + HostAndPort _syncSource; // These members are not chosen as sync sources for a period of time, due to connection // issues with them std::map<HostAndPort, Date_t> _syncSourceBlacklist; diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp index 132a0f0889f..1eafe5206e0 100644 --- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp @@ -164,7 +164,8 @@ namespace { ASSERT(getTopoCoord().getSyncSourceAddress().empty()); // Fail due to insufficient number of pings - getTopoCoord().chooseNewSyncSource(now()++, OpTime(0,0)); + HostAndPort newSyncSource = getTopoCoord().chooseNewSyncSource(now()++, OpTime(0,0)); + ASSERT_EQUALS(getTopoCoord().getSyncSourceAddress(), newSyncSource); ASSERT(getTopoCoord().getSyncSourceAddress().empty()); // Record 2nd round of pings to allow choosing a new sync source; all members equidistant @@ -172,7 +173,8 @@ namespace { heartbeatFromMember(HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(0,0)); // Should choose h2, since it is furthest ahead - getTopoCoord().chooseNewSyncSource(now()++, OpTime(0,0)); + newSyncSource = getTopoCoord().chooseNewSyncSource(now()++, OpTime(0,0)); + ASSERT_EQUALS(getTopoCoord().getSyncSourceAddress(), newSyncSource); ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress()); // h3 becomes further ahead, so it should be chosen |