summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormatt dannenberg <matt.dannenberg@10gen.com>2015-07-13 06:33:53 -0400
committermatt dannenberg <matt.dannenberg@10gen.com>2015-07-14 05:41:54 -0400
commit1e614047bcb351e89dbb74b3d2630c151180262b (patch)
treeb540f913cbc484fcc88d99311510f80a8be76657
parent8d49054385b3715c8459e3a7bcb14bb1945710db (diff)
downloadmongo-1e614047bcb351e89dbb74b3d2630c151180262b.tar.gz
SERVER-19375 choose new sync source based on last fetched op rather than last applied op
-rw-r--r--src/mongo/db/repl/oplogreader.cpp4
-rw-r--r--src/mongo/db/repl/oplogreader.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp6
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h3
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h2
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp6
8 files changed, 16 insertions, 11 deletions
diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp
index 7bc4ba3fc0a..0370d2ae098 100644
--- a/src/mongo/db/repl/oplogreader.cpp
+++ b/src/mongo/db/repl/oplogreader.cpp
@@ -150,7 +150,7 @@ namespace repl {
invariant(conn() == NULL);
while (true) {
- HostAndPort candidate = replCoord->chooseNewSyncSource();
+ HostAndPort candidate = replCoord->chooseNewSyncSource(lastOpTimeFetched);
if (candidate.empty()) {
if (oldestOpTimeSeen == sentinel) {
@@ -200,7 +200,7 @@ namespace repl {
}
OpTime remoteOldOpTime = tsElem._opTime();
- if (lastOpTimeFetched < remoteOldOpTime) {
+ if (!lastOpTimeFetched.isNull() && lastOpTimeFetched < remoteOldOpTime) {
// We're too stale to use this sync source.
resetConnection();
replCoord->blacklistSyncSource(candidate,
diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h
index 527452fd5fc..322559dba87 100644
--- a/src/mongo/db/repl/oplogreader.h
+++ b/src/mongo/db/repl/oplogreader.h
@@ -144,6 +144,8 @@ namespace repl {
* In the process of connecting, this function may add items to the repl coordinator's
* sync source blacklist.
* This function may throw DB exceptions.
+ * If "lastOpTimeFetched" is (0, 0), we do not check staleness as this indicates an initial
+ * sync.
*/
void connectToSyncSource(OperationContext* txn,
OpTime lastOpTimeFetched,
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index e97a0635af5..f9927f719dd 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -533,7 +533,7 @@ namespace repl {
/**
* Chooses a viable sync source, or, if none available, returns empty HostAndPort.
*/
- virtual HostAndPort chooseNewSyncSource() = 0;
+ virtual HostAndPort chooseNewSyncSource(const OpTime& lastOpTimeFetched) = 0;
/**
* Blacklists choosing 'host' as a sync source until time 'until'.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 9e5671743c8..40b28dbb546 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -2287,19 +2287,21 @@ namespace {
void ReplicationCoordinatorImpl::_chooseNewSyncSource(
const ReplicationExecutor::CallbackData& cbData,
+ const OpTime& lastOpTimeFetched,
HostAndPort* newSyncSource) {
if (cbData.status == ErrorCodes::CallbackCanceled) {
return;
}
- *newSyncSource = _topCoord->chooseNewSyncSource(_replExecutor.now(), getMyLastOptime());
+ *newSyncSource = _topCoord->chooseNewSyncSource(_replExecutor.now(), lastOpTimeFetched);
}
- HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource() {
+ HostAndPort ReplicationCoordinatorImpl::chooseNewSyncSource(const OpTime& lastOpTimeFetched) {
HostAndPort newSyncSource;
CBHStatus cbh = _replExecutor.scheduleWork(
stdx::bind(&ReplicationCoordinatorImpl::_chooseNewSyncSource,
this,
stdx::placeholders::_1,
+ lastOpTimeFetched,
&newSyncSource));
if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
return newSyncSource; // empty
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 62fb61b5380..ae08703b628 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -216,7 +216,7 @@ namespace repl {
virtual bool isReplEnabled() const;
- virtual HostAndPort chooseNewSyncSource();
+ virtual HostAndPort chooseNewSyncSource(const OpTime& lastOpTimeFetched);
virtual void blacklistSyncSource(const HostAndPort& host, Date_t until);
@@ -660,6 +660,7 @@ namespace repl {
* the most appropriate sync source.
*/
void _chooseNewSyncSource(const ReplicationExecutor::CallbackData& cbData,
+ const OpTime& lastOpTimeFetched,
HostAndPort* newSyncSource);
/**
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index da3fb0e71d6..c601a8d4f44 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -276,7 +276,7 @@ namespace repl {
return Status::OK();
}
- HostAndPort ReplicationCoordinatorMock::chooseNewSyncSource() {
+ HostAndPort ReplicationCoordinatorMock::chooseNewSyncSource(const OpTime& lastOpTimeFetched) {
invariant(false);
return HostAndPort();
}
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index daff00cec54..3f645554c7c 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -172,7 +172,7 @@ namespace repl {
virtual Status checkReplEnabledForCommand(BSONObjBuilder* result);
- virtual HostAndPort chooseNewSyncSource();
+ virtual HostAndPort chooseNewSyncSource(const OpTime& lastOpTimeFetched);
virtual void blacklistSyncSource(const HostAndPort& host, Date_t until);
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index 536b53b9f2b..2514a88953d 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -351,12 +351,12 @@ namespace {
truncateAndResetOplog(&txn, replCoord, bgsync);
OplogReader r;
- OpTime now(Milliseconds(curTimeMillis64()).total_seconds(), 0);
+ OpTime nullOpTime(0, 0);
while (r.getHost().empty()) {
// We must prime the sync source selector so that it considers all candidates regardless
- // of oplog position, by passing in "now" as the last op fetched time.
- r.connectToSyncSource(&txn, now, replCoord);
+ // of oplog position, by passing in "nullOpTime" as the last op fetched time.
+ r.connectToSyncSource(&txn, nullOpTime, replCoord);
if (r.getHost().empty()) {
std::string msg =
"no valid sync sources found in current replset to do an initial sync";