summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorEric Milkie <milkie@10gen.com>2014-09-08 11:37:42 -0400
committerEric Milkie <milkie@10gen.com>2014-09-12 10:39:11 -0400
commitff1ee391747092e2d03765402c6ab25ba7e1d538 (patch)
treed0650ad040b4b63ed75de9a0d5a349558dd8658f /src/mongo/db
parentce737ebed71bc4485180b86832e907d820858664 (diff)
downloadmongo-ff1ee391747092e2d03765402c6ab25ba7e1d538.tar.gz
SERVER-15089 chooseNewSyncSource hooked up in replication Applier
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/bgsync.cpp155
-rw-r--r--src/mongo/db/repl/bgsync.h24
-rw-r--r--src/mongo/db/repl/repl_coordinator.h12
-rw-r--r--src/mongo/db/repl/repl_coordinator_hybrid.cpp15
-rw-r--r--src/mongo/db/repl/repl_coordinator_hybrid.h4
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.cpp58
-rw-r--r--src/mongo/db/repl/repl_coordinator_impl.h40
-rw-r--r--src/mongo/db/repl/repl_coordinator_legacy.cpp8
-rw-r--r--src/mongo/db/repl/repl_coordinator_legacy.h4
-rw-r--r--src/mongo/db/repl/repl_coordinator_mock.cpp6
-rw-r--r--src/mongo/db/repl/repl_coordinator_mock.h4
-rw-r--r--src/mongo/db/repl/topology_coordinator.h2
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.cpp63
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl.h9
-rw-r--r--src/mongo/db/repl/topology_coordinator_impl_test.cpp6
15 files changed, 336 insertions, 74 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 0e3ad68352b..1a941483984 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -30,20 +30,23 @@
#include "mongo/platform/basic.h"
+#include "mongo/db/repl/bgsync.h"
+
+#include "mongo/base/counter.h"
#include "mongo/db/client.h"
#include "mongo/db/commands/fsync.h"
#include "mongo/db/commands/server_status_metric.h"
+#include "mongo/db/dbhelpers.h"
#include "mongo/db/operation_context_impl.h"
-#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/repl_coordinator_global.h"
-#include "mongo/db/repl/rs_sync.h"
+#include "mongo/db/repl/repl_coordinator_impl.h"
#include "mongo/db/repl/rs.h"
+#include "mongo/db/repl/rs_sync.h"
#include "mongo/db/repl/rslog.h"
+#include "mongo/db/stats/timer_stats.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
-#include "mongo/base/counter.h"
-#include "mongo/db/stats/timer_stats.h"
namespace mongo {
@@ -98,7 +101,8 @@ namespace repl {
_pause(true),
_appliedBuffer(true),
_assumingPrimary(false),
- _currentSyncTarget(NULL) {
+ _currentSyncTarget(NULL),
+ _replCoord(getGlobalReplicationCoordinator()) {
}
BackgroundSync* BackgroundSync::get() {
@@ -188,14 +192,37 @@ namespace repl {
// this oplog reader does not do a handshake because we don't want the server it's syncing
// from to track how far it has synced
OplogReader r;
- OpTime lastOpTimeFetched;
- // find a target to sync from the last op time written
- getOplogReader(txn, r);
- // no server found
{
boost::unique_lock<boost::mutex> lock(_mutex);
+ if (_lastOpTimeFetched.isNull()) {
+ // then we're initial syncing and we're still waiting for this to be set
+ _currentSyncTarget = NULL;
+ lock.unlock();
+ sleepsecs(1);
+ // if there is no one to sync from
+ return;
+ }
+ // Wait until we've applied the ops we have before we choose a sync target
+ while (!_appliedBuffer) {
+ _condvar.wait(lock);
+ }
+ }
+
+ while (MONGO_FAIL_POINT(rsBgSyncProduce)) {
+ sleepmillis(0);
+ }
+
+
+ // find a target to sync from the last op time written
+ _replCoord->connectOplogReader(txn, this, &r);
+
+ OpTime lastOpTimeFetched;
+
+ {
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ // no server found
if (_currentSyncTarget == NULL) {
lock.unlock();
sleepsecs(1);
@@ -364,43 +391,25 @@ namespace repl {
return true;
}
- void BackgroundSync::getOplogReader(OperationContext* txn, OplogReader& r) {
+ void BackgroundSync::getOplogReaderLegacy(OperationContext* txn, OplogReader* r) {
const Member *target = NULL, *stale = NULL;
BSONObj oldest;
- {
- boost::unique_lock<boost::mutex> lock(_mutex);
- if (_lastOpTimeFetched.isNull()) {
- // then we're initial syncing and we're still waiting for this to be set
- _currentSyncTarget = NULL;
- return;
- }
-
- // Wait until we've applied the ops we have before we choose a sync target
- while (!_appliedBuffer) {
- _condvar.wait(lock);
- }
- }
-
- while (MONGO_FAIL_POINT(rsBgSyncProduce)) {
- sleepmillis(0);
- }
-
- verify(r.conn() == NULL);
+ verify(r->conn() == NULL);
while ((target = theReplSet->getMemberToSyncTo()) != NULL) {
string current = target->fullName();
- if (!r.connect(target->h())) {
+ if (!r->connect(target->h())) {
LOG(2) << "replSet can't connect to " << current << " to read operations" << rsLog;
- r.resetConnection();
+ r->resetConnection();
theReplSet->veto(current);
sleepsecs(1);
continue;
}
- if (isStale(r, oldest)) {
- r.resetConnection();
+ if (isStale(*r, oldest)) {
+ r->resetConnection();
theReplSet->veto(current, 600);
stale = target;
continue;
@@ -426,6 +435,86 @@ namespace repl {
boost::unique_lock<boost::mutex> lock(_mutex);
_currentSyncTarget = NULL;
}
+
+ }
+
+ void BackgroundSync::connectOplogReader(OperationContext* txn,
+ ReplicationCoordinatorImpl* replCoordImpl,
+ OplogReader* reader) {
+ OpTime lastOpTimeFetched;
+ {
+ boost::unique_lock<boost::mutex> lock(_mutex);
+ lastOpTimeFetched = _lastOpTimeFetched;
+ }
+ Date_t now(curTimeMillis64());
+ OpTime oldestOpTimeSeen(now,0);
+
+ while (true) {
+ HostAndPort candidate = replCoordImpl->chooseNewSyncSource();
+
+ if (candidate.empty()) {
+ if (oldestOpTimeSeen == OpTime(now,0)) {
+ // If, in this invocation of connectOplogReader(), we did not successfully
+ // connect to any node ahead of us,
+ // we apparently have no sync sources to connect to.
+ // This situation is common; e.g. if there are no writes to the primary at
+ // the moment.
+ return;
+ }
+
+ // Connected to at least one member, but in all cases we were too stale to use them
+ // as a sync source.
+ log() << "replSet error RS102 too stale to catch up" << rsLog;
+ log() << "replSet our last optime : " << lastOpTimeFetched.toStringLong() << rsLog;
+ log() << "replSet oldest available is " << oldestOpTimeSeen.toStringLong() <<
+ rsLog;
+ log() << "replSet "
+ "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember"
+ << rsLog;
+ sethbmsg("error RS102 too stale to catch up");
+ theReplSet->setMinValid(txn, oldestOpTimeSeen);
+ replCoordImpl->setFollowerMode(MemberState::RS_RECOVERING);
+ return;
+ }
+
+ if (!reader->connect(candidate)) {
+ LOG(2) << "replSet can't connect to " << candidate.toString() <<
+ " to read operations" << rsLog;
+ reader->resetConnection();
+ replCoordImpl->blacklistSyncSource(candidate, Date_t(curTimeMillis64() + 10*1000));
+ continue;
+ }
+ // Read the first (oldest) op and confirm that it's not newer than our last
+ // fetched op. Otherwise, we have fallen off the back of that source's oplog.
+ BSONObj remoteOldestOp(reader->findOne(rsoplog, Query()));
+ BSONElement tsElem(remoteOldestOp["ts"]);
+ if (tsElem.type() != Timestamp) {
+ // This member's got a bad op in its oplog.
+ warning() << "oplog invalid format on node " << candidate.toString();
+ reader->resetConnection();
+ replCoordImpl->blacklistSyncSource(candidate,
+ Date_t(curTimeMillis64() + 600*1000));
+ continue;
+ }
+ OpTime remoteOldOpTime = tsElem._opTime();
+
+ if (lastOpTimeFetched < remoteOldOpTime) {
+ // We're too stale to use this sync source.
+ reader->resetConnection();
+ replCoordImpl->blacklistSyncSource(candidate,
+ Date_t(curTimeMillis64() + 600*1000));
+ if (oldestOpTimeSeen > remoteOldOpTime) {
+ warning() << "we are too stale to use " << candidate.toString() <<
+ " as a sync source";
+ oldestOpTimeSeen = remoteOldOpTime;
+ }
+ continue;
+ }
+
+ // Got a valid sync source.
+ return;
+ } // while (true)
+
}
bool BackgroundSync::isRollbackRequired(OperationContext* txn, OplogReader& r) {
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index e8886f1b722..4a8a14a7bc3 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -32,12 +32,15 @@
#include "mongo/util/queue.h"
#include "mongo/db/repl/oplogreader.h"
-#include "mongo/db/repl/rs.h"
#include "mongo/db/jsobj.h"
namespace mongo {
namespace repl {
+ class Member;
+ class ReplicationCoordinator;
+ class ReplicationCoordinatorImpl;
+
// This interface exists to facilitate easier testing;
// the test infrastructure implements these functions with stubs.
class BackgroundSyncInterface {
@@ -71,6 +74,7 @@ namespace repl {
* 3. BackgroundSync::_mutex
*/
class BackgroundSync : public BackgroundSyncInterface {
+ private:
static BackgroundSync *s_instance;
// protects creation of s_instance
static boost::mutex s_mutex;
@@ -101,7 +105,7 @@ namespace repl {
void produce(OperationContext* txn);
// Check if rollback is necessary
bool isRollbackRequired(OperationContext* txn, OplogReader& r);
- void getOplogReader(OperationContext* txn, OplogReader& r);
+
// Evaluate if the current sync target is still good
bool shouldChangeSyncTarget();
// check lastOpTimeWritten against the remote's earliest op, filling in remoteOldestOp.
@@ -111,6 +115,9 @@ namespace repl {
// restart syncing
void start();
+ // A pointer to the replication coordinator running the show.
+ ReplicationCoordinator* _replCoord;
+
public:
bool isAssumingPrimary();
@@ -138,6 +145,19 @@ namespace repl {
// Wait for replication to finish and buffer to be applied so that the member can become
// primary.
void stopReplicationAndFlushBuffer();
+
+ /**
+ * Connects an oplog reader to a viable sync source. Legacy uses getOplogReaderLegacy(),
+ * which sets _currentSyncTarget as a side effect.
+ * connectOplogReader() is used in new replication.
+ * Both functions can affect the TopoCoord's blacklist of sync sources, and may set
+ * our minValid value, durably, if we detect we haven fallen off the back of all sync
+ * sources' oplogs.
+ **/
+ void getOplogReaderLegacy(OperationContext* txn, OplogReader* reader);
+ void connectOplogReader(OperationContext* txn,
+ ReplicationCoordinatorImpl* replCoordImpl,
+ OplogReader* reader);
};
diff --git a/src/mongo/db/repl/repl_coordinator.h b/src/mongo/db/repl/repl_coordinator.h
index ae242cbeb03..77e57f4f66d 100644
--- a/src/mongo/db/repl/repl_coordinator.h
+++ b/src/mongo/db/repl/repl_coordinator.h
@@ -49,7 +49,9 @@ namespace mongo {
namespace repl {
+ class BackgroundSync;
class HandshakeArgs;
+ class OplogReader;
class ReplSetHeartbeatArgs;
class ReplSetHeartbeatResponse;
class UpdatePositionArgs;
@@ -460,6 +462,16 @@ namespace repl {
*/
virtual Status checkReplEnabledForCommand(BSONObjBuilder* result) = 0;
+ /**
+ * Connects an oplog reader to a viable sync source, using BackgroundSync object bgsync.
+ * When this function returns, reader is connected to a viable sync source or is left
+ * unconnected if no sync sources are currently available. In legacy, bgsync's
+ * _currentSyncTarget is also set appropriately.
+ **/
+ virtual void connectOplogReader(OperationContext* txn,
+ BackgroundSync* bgsync,
+ OplogReader* reader) = 0;
+
protected:
ReplicationCoordinator();
diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.cpp b/src/mongo/db/repl/repl_coordinator_hybrid.cpp
index 7489a0ad43f..2c5cddbf9dc 100644
--- a/src/mongo/db/repl/repl_coordinator_hybrid.cpp
+++ b/src/mongo/db/repl/repl_coordinator_hybrid.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/repl/repl_coordinator_hybrid.h"
#include "mongo/db/global_environment_experiment.h"
+#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/isself.h"
#include "mongo/db/repl/network_interface_impl.h"
#include "mongo/db/repl/repl_coordinator_external_state_impl.h"
@@ -407,6 +408,20 @@ namespace repl {
return legacyResponse;
}
+ void HybridReplicationCoordinator::connectOplogReader(OperationContext* txn,
+ BackgroundSync* bgsync,
+ OplogReader* r) {
+ _legacy.connectOplogReader(txn, bgsync, r);
+ HostAndPort legacySyncSource = r->getHost();
+ bgsync->connectOplogReader(txn, &_impl, r);
+ HostAndPort implSyncSource = r->getHost();
+ if (legacySyncSource != implSyncSource) {
+ severe() << "sync source mismatch between legacy and impl: " <<
+ legacySyncSource.toString() << " and " << implSyncSource.toString();
+ fassertFailed(18742);
+ }
+ }
+
void HybridReplicationCoordinator::setImplConfigHack(const ReplSetConfig* config) {
int myIndex = -1;
for (size_t i = 0; i < config->members.size(); ++i) { // find my index in the config
diff --git a/src/mongo/db/repl/repl_coordinator_hybrid.h b/src/mongo/db/repl/repl_coordinator_hybrid.h
index 0e15b27505c..cede8b8c9fb 100644
--- a/src/mongo/db/repl/repl_coordinator_hybrid.h
+++ b/src/mongo/db/repl/repl_coordinator_hybrid.h
@@ -161,6 +161,10 @@ namespace repl {
virtual bool isReplEnabled() const;
+ virtual void connectOplogReader(OperationContext* txn,
+ BackgroundSync* bgsync,
+ OplogReader* r);
+
/**
* This is a temporary hack to force _impl to set its replset config to the one loaded by
* _legacy.
diff --git a/src/mongo/db/repl/repl_coordinator_impl.cpp b/src/mongo/db/repl/repl_coordinator_impl.cpp
index 5921b878833..8d12a1d5c2c 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.cpp
+++ b/src/mongo/db/repl/repl_coordinator_impl.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/repl/check_quorum_for_config_change.h"
#include "mongo/db/repl/handshake_args.h"
#include "mongo/db/repl/master_slave.h"
+#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/repl_set_heartbeat_args.h"
#include "mongo/db/repl/repl_set_heartbeat_response.h"
#include "mongo/db/repl/repl_settings.h"
@@ -1110,6 +1111,7 @@ namespace {
if (_rsConfig.isInitialized()) {
cancelHeartbeats();
}
+ OpTime lastOpApplied(_getLastOpApplied_inlock());
_setConfigState_inlock(kConfigSteady);
_rsConfig = newConfig;
_thisMembersConfigIndex = myIndex;
@@ -1117,7 +1119,7 @@ namespace {
newConfig,
myIndex,
_replExecutor.now(),
- _getLastOpApplied_inlock());
+ lastOpApplied);
_currentState = _topCoord->getMemberState();
// Ensure that there's an entry in the _slaveInfoMap for ourself
@@ -1263,5 +1265,59 @@ namespace {
return _settings.usingReplSets() || _settings.master || _settings.slave;
}
+ void ReplicationCoordinatorImpl::connectOplogReader(OperationContext* txn,
+ BackgroundSync* bgsync,
+ OplogReader* r) {
+ invariant(false);
+ }
+
+ void ReplicationCoordinatorImpl::_chooseNewSyncSource(
+ const ReplicationExecutor::CallbackData& cbData,
+ HostAndPort* newSyncSource) {
+ if (cbData.status == ErrorCodes::CallbackCanceled) {
+ return;
+ }
+ *newSyncSource = _topCoord->chooseNewSyncSource(_replExecutor.now(), _getLastOpApplied());
+ }
+
+ HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource() {
+ HostAndPort newSyncSource;
+ CBHStatus cbh = _replExecutor.scheduleWork(
+ stdx::bind(&ReplicationCoordinatorImpl::_chooseNewSyncSource,
+ this,
+ stdx::placeholders::_1,
+ &newSyncSource));
+ if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
+ return newSyncSource; // empty
+ }
+ fassert(18740, cbh.getStatus());
+ _replExecutor.wait(cbh.getValue());
+ return newSyncSource;
+ }
+
+ void ReplicationCoordinatorImpl::_blacklistSyncSource(
+ const ReplicationExecutor::CallbackData& cbData,
+ const HostAndPort& host,
+ Date_t until) {
+ if (cbData.status == ErrorCodes::CallbackCanceled) {
+ return;
+ }
+ _topCoord->blacklistSyncSource(host, until);
+ }
+
+ void ReplicationCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) {
+ CBHStatus cbh = _replExecutor.scheduleWork(
+ stdx::bind(&ReplicationCoordinatorImpl::_blacklistSyncSource,
+ this,
+ stdx::placeholders::_1,
+ host,
+ until));
+ if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
+ return;
+ }
+ fassert(18741, cbh.getStatus());
+ _replExecutor.wait(cbh.getValue());
+ }
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/repl_coordinator_impl.h b/src/mongo/db/repl/repl_coordinator_impl.h
index d90738508be..7ecb7022f62 100644
--- a/src/mongo/db/repl/repl_coordinator_impl.h
+++ b/src/mongo/db/repl/repl_coordinator_impl.h
@@ -53,6 +53,7 @@ namespace mongo {
namespace repl {
+ class OplogReader;
class SyncSourceFeedback;
class TopologyCoordinator;
@@ -194,6 +195,11 @@ namespace repl {
virtual bool isReplEnabled() const;
+ virtual void connectOplogReader(OperationContext* txn,
+ BackgroundSync* bgsync,
+ OplogReader* r);
+
+
// ================== Members of replication code internal API ===================
// This is a temporary hack to set the replset config to the config detected by the
@@ -217,6 +223,21 @@ namespace repl {
*/
void cancelHeartbeats();
+ /**
+ * Chooses a sync source.
+ * A wrapper that schedules _chooseNewSyncSource() through the Replication Executor and
+ * waits for its completion.
+ */
+ HostAndPort chooseNewSyncSource();
+
+ /**
+ * Blacklists 'host' until 'until'.
+ * A wrapper that schedules _blacklistSyncSource() through the Replication Executor and
+ * waits for its completion.
+ */
+ void blacklistSyncSource(const HostAndPort& host, Date_t until);
+
+
// ================== Test support API ===================
/**
@@ -459,6 +480,25 @@ namespace repl {
void _onElectCmdRunnerComplete(const ReplicationExecutor::CallbackData& cbData,
const ReplicationExecutor::EventHandle& finishEvh);
+ /**
+ * Chooses a new sync source. Must be scheduled as a callback.
+ *
+ * Calls into the Topology Coordinator, which uses its current view of the set to choose
+ * the most appropriate sync source.
+ */
+ void _chooseNewSyncSource(const ReplicationExecutor::CallbackData& cbData,
+ HostAndPort* newSyncSource);
+
+ /**
+ * Adds 'host' to the sync source blacklist until 'until'. A blacklisted source cannot
+ * be chosen as a sync source.
+ *
+ * Must be scheduled as a callback.
+ */
+ void _blacklistSyncSource(const ReplicationExecutor::CallbackData& cbData,
+ const HostAndPort& host,
+ Date_t until);
+
//
// All member variables are labeled with one of the following codes indicating the
diff --git a/src/mongo/db/repl/repl_coordinator_legacy.cpp b/src/mongo/db/repl/repl_coordinator_legacy.cpp
index 38ffa126e94..aa5eb6a4251 100644
--- a/src/mongo/db/repl/repl_coordinator_legacy.cpp
+++ b/src/mongo/db/repl/repl_coordinator_legacy.cpp
@@ -981,5 +981,13 @@ namespace {
return _settings.usingReplSets() || _settings.slave || _settings.master;
}
+ void LegacyReplicationCoordinator::connectOplogReader(OperationContext* txn,
+ BackgroundSync* bgsync,
+ OplogReader* r) {
+ bgsync->getOplogReaderLegacy(txn, r);
+ }
+
+
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/repl_coordinator_legacy.h b/src/mongo/db/repl/repl_coordinator_legacy.h
index 250e0fa1c29..415a8cfe175 100644
--- a/src/mongo/db/repl/repl_coordinator_legacy.h
+++ b/src/mongo/db/repl/repl_coordinator_legacy.h
@@ -157,6 +157,10 @@ namespace repl {
virtual bool isReplEnabled() const;
+ virtual void connectOplogReader(OperationContext* txn,
+ BackgroundSync* bgsync,
+ OplogReader* r);
+
private:
// Mutex that protects the _slaveOpTimeMap
diff --git a/src/mongo/db/repl/repl_coordinator_mock.cpp b/src/mongo/db/repl/repl_coordinator_mock.cpp
index 972568c46ab..6c4e5828595 100644
--- a/src/mongo/db/repl/repl_coordinator_mock.cpp
+++ b/src/mongo/db/repl/repl_coordinator_mock.cpp
@@ -250,5 +250,11 @@ namespace repl {
return Status::OK();
}
+ void ReplicationCoordinatorMock::connectOplogReader(OperationContext* txn,
+ BackgroundSync* bgsync,
+ OplogReader* r) {
+ invariant(false);
+ }
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/repl_coordinator_mock.h b/src/mongo/db/repl/repl_coordinator_mock.h
index 2f9036df0a4..53f045c4443 100644
--- a/src/mongo/db/repl/repl_coordinator_mock.h
+++ b/src/mongo/db/repl/repl_coordinator_mock.h
@@ -156,6 +156,10 @@ namespace repl {
virtual Status checkReplEnabledForCommand(BSONObjBuilder* result);
+ virtual void connectOplogReader(OperationContext* txn,
+ BackgroundSync* bgsync,
+ OplogReader* r);
+
private:
ReplSettings _settings;
diff --git a/src/mongo/db/repl/topology_coordinator.h b/src/mongo/db/repl/topology_coordinator.h
index 4aa76f8e107..83ed1c80b2a 100644
--- a/src/mongo/db/repl/topology_coordinator.h
+++ b/src/mongo/db/repl/topology_coordinator.h
@@ -112,7 +112,7 @@ namespace repl {
/**
* Chooses and sets a new sync source, based on our current knowledge of the world.
*/
- virtual void chooseNewSyncSource(Date_t now, const OpTime& lastOpApplied) = 0;
+ virtual HostAndPort chooseNewSyncSource(Date_t now, const OpTime& lastOpApplied) = 0;
/**
* Suppresses selecting "host" as sync source until "until".
diff --git a/src/mongo/db/repl/topology_coordinator_impl.cpp b/src/mongo/db/repl/topology_coordinator_impl.cpp
index f531ee2c241..55d72db622e 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl.cpp
@@ -88,7 +88,6 @@ namespace {
TopologyCoordinatorImpl::TopologyCoordinatorImpl(Seconds maxSyncSourceLagSecs) :
_role(Role::follower),
_currentPrimaryIndex(-1),
- _syncSourceIndex(-1),
_forceSyncSourceIndex(-1),
_maxSyncSourceLagSecs(maxSyncSourceLagSecs),
_selfIndex(-1),
@@ -109,22 +108,20 @@ namespace {
}
HostAndPort TopologyCoordinatorImpl::getSyncSourceAddress() const {
- if (_syncSourceIndex == -1) {
- return HostAndPort();
- }
- return _currentConfig.getMemberAt(_syncSourceIndex).getHostAndPort();
+ return _syncSource;
}
- void TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now, const OpTime& lastOpApplied) {
+ HostAndPort TopologyCoordinatorImpl::chooseNewSyncSource(Date_t now,
+ const OpTime& lastOpApplied) {
+
// if we have a target we've requested to sync from, use it
if (_forceSyncSourceIndex != -1) {
invariant(_forceSyncSourceIndex < _currentConfig.getNumMembers());
- _syncSourceIndex = _forceSyncSourceIndex;
+ _syncSource = _currentConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort();
_forceSyncSourceIndex = -1;
- _sethbmsg( str::stream() << "syncing from: "
- << _currentConfig.getMemberAt(_syncSourceIndex).getHostAndPort().toString()
- << " by request", 0);
- return;
+ _sethbmsg(str::stream() << "syncing from: " << _syncSource.toString() << " by request",
+ 0);
+ return _syncSource;
}
// wait for 2N pings (not counting ourselves) before choosing a sync target
@@ -133,14 +130,20 @@ namespace {
if (needMorePings > 0) {
OCCASIONALLY log() << "waiting for " << needMorePings
<< " pings from other members before syncing";
- return;
+ _syncSource = HostAndPort();
+ return _syncSource;
}
// If we are only allowed to sync from the primary, set that
if (!_currentConfig.isChainingAllowed()) {
- // Sets -1 if there is no current primary
- _syncSourceIndex = _currentPrimaryIndex;
- return;
+ if (_currentPrimaryIndex == -1) {
+ _syncSource = HostAndPort();
+ return _syncSource;
+ }
+ else {
+ _syncSource = _currentConfig.getMemberAt(_currentPrimaryIndex).getHostAndPort();
+ return _syncSource;
+ }
}
// find the member with the lowest ping time that is ahead of me
@@ -148,11 +151,13 @@ namespace {
// Find primary's oplog time. Reject sync candidates that are more than
// maxSyncSourceLagSecs seconds behind.
OpTime primaryOpTime;
- if (_currentPrimaryIndex != -1)
+ if (_currentPrimaryIndex != -1) {
primaryOpTime = _hbdata[_currentPrimaryIndex].getOpTime();
- else
+ }
+ else {
// choose a time that will exclude no candidates, since we don't see a primary
primaryOpTime = OpTime(_maxSyncSourceLagSecs.total_seconds(), 0);
+ }
if (primaryOpTime.getSecs() <
static_cast<unsigned int>(_maxSyncSourceLagSecs.total_seconds())) {
@@ -243,14 +248,14 @@ namespace {
if (closestIndex == -1) {
// Did not find any members to sync from
- _syncSourceIndex = -1;
- return;
+ _syncSource = HostAndPort();
+ return _syncSource;
}
- std::string msg(str::stream() << "syncing to: " <<
- _currentConfig.getMemberAt(closestIndex).getHostAndPort().toString(), 0);
+ _syncSource = _currentConfig.getMemberAt(closestIndex).getHostAndPort();
+ std::string msg(str::stream() << "syncing to: " << _syncSource.toString(), 0);
_sethbmsg(msg);
log() << msg;
- _syncSourceIndex = closestIndex;
+ return _syncSource;
}
void TopologyCoordinatorImpl::blacklistSyncSource(const HostAndPort& host, Date_t until) {
@@ -585,9 +590,8 @@ namespace {
response->setTime(Seconds(Milliseconds(now.asInt64()).total_seconds()));
response->setOpTime(lastOpApplied.asDate());
- if (_syncSourceIndex != -1) {
- response->setSyncingTo(
- _currentConfig.getMemberAt(_syncSourceIndex).getHostAndPort().toString());
+ if (!_syncSource.empty()) {
+ response->setSyncingTo(_syncSource.toString());
}
long long v = _currentConfig.getConfigVersion();
@@ -1196,9 +1200,8 @@ namespace {
response->append("myState", myState.s);
// Add sync source info
- if ((_syncSourceIndex != -1) && !myState.primary() && !myState.removed()) {
- response->append("syncingTo", _currentConfig.getMemberAt(_syncSourceIndex)
- .getHostAndPort().toString());
+ if (!_syncSource.empty() && !myState.primary() && !myState.removed()) {
+ response->append("syncingTo", _syncSource.toString());
}
response->append("members", membersOut);
@@ -1260,7 +1263,6 @@ namespace {
_hbdata.clear();
_role = Role::follower;
_currentPrimaryIndex = -1;
- _syncSourceIndex = -1;
_forceSyncSourceIndex = -1;
_selfIndex = selfIndex;
@@ -1294,7 +1296,7 @@ namespace {
// we're electable, we must be the leader.
_role = Role::leader;
}
- chooseNewSyncSource(now, lastOpApplied);
+
}
// TODO(emilkie): Better story for heartbeat message handling.
@@ -1472,7 +1474,6 @@ namespace {
OpTime myLastOpApplied,
OpTime electionOpTime) {
invariant(_role == Role::candidate);
- _syncSourceIndex = -1;
_electionTime = electionOpTime;
_electionId = electionId;
_role = Role::leader;
diff --git a/src/mongo/db/repl/topology_coordinator_impl.h b/src/mongo/db/repl/topology_coordinator_impl.h
index 1f5a167b278..4ea27133a4d 100644
--- a/src/mongo/db/repl/topology_coordinator_impl.h
+++ b/src/mongo/db/repl/topology_coordinator_impl.h
@@ -127,7 +127,8 @@ namespace repl {
virtual std::vector<HostAndPort> getMaybeUpHostAndPorts() const;
virtual int getMaintenanceCount() const;
virtual void setForceSyncSourceIndex(int index);
- virtual void chooseNewSyncSource(Date_t now, const OpTime& lastOpApplied);
+ virtual HostAndPort chooseNewSyncSource(Date_t now,
+ const OpTime& lastOpApplied);
virtual void blacklistSyncSource(const HostAndPort& host, Date_t until);
virtual void setStepDownTime(Date_t newTime);
virtual void setFollowerMode(MemberState::MS newMode);
@@ -293,9 +294,9 @@ namespace repl {
// the member we currently believe is primary, if one exists
int _currentPrimaryIndex;
- // the member we are currently syncing from
- // -1 if no sync source (we are primary, or we cannot connect to anyone yet)
- int _syncSourceIndex;
+ // the hostandport we are currently syncing from
+ // empty if no sync source (we are primary, or we cannot connect to anyone yet)
+ HostAndPort _syncSource;
// These members are not chosen as sync sources for a period of time, due to connection
// issues with them
std::map<HostAndPort, Date_t> _syncSourceBlacklist;
diff --git a/src/mongo/db/repl/topology_coordinator_impl_test.cpp b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
index 132a0f0889f..1eafe5206e0 100644
--- a/src/mongo/db/repl/topology_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/topology_coordinator_impl_test.cpp
@@ -164,7 +164,8 @@ namespace {
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
// Fail due to insufficient number of pings
- getTopoCoord().chooseNewSyncSource(now()++, OpTime(0,0));
+ HostAndPort newSyncSource = getTopoCoord().chooseNewSyncSource(now()++, OpTime(0,0));
+ ASSERT_EQUALS(getTopoCoord().getSyncSourceAddress(), newSyncSource);
ASSERT(getTopoCoord().getSyncSourceAddress().empty());
// Record 2nd round of pings to allow choosing a new sync source; all members equidistant
@@ -172,7 +173,8 @@ namespace {
heartbeatFromMember(HostAndPort("h3"), "rs0", MemberState::RS_SECONDARY, OpTime(0,0));
// Should choose h2, since it is furthest ahead
- getTopoCoord().chooseNewSyncSource(now()++, OpTime(0,0));
+ newSyncSource = getTopoCoord().chooseNewSyncSource(now()++, OpTime(0,0));
+ ASSERT_EQUALS(getTopoCoord().getSyncSourceAddress(), newSyncSource);
ASSERT_EQUALS(HostAndPort("h2"), getTopoCoord().getSyncSourceAddress());
// h3 becomes further ahead, so it should be chosen