summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_pv0.yml2
-rw-r--r--jstests/replsets/chaining_removal.js24
-rw-r--r--jstests/replsets/maxSyncSourceLagSecs.js24
-rw-r--r--jstests/replsets/no_chaining.js10
-rw-r--r--jstests/replsets/rollback_too_new.js22
-rw-r--r--jstests/replsets/rslib.js19
-rw-r--r--jstests/replsets/server8070.js6
-rw-r--r--jstests/replsets/slavedelay3.js7
-rw-r--r--src/mongo/db/repl/bgsync.cpp65
-rw-r--r--src/mongo/db/repl/bgsync.h3
-rw-r--r--src/mongo/db/repl/data_replicator.cpp2
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp23
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp84
-rw-r--r--src/mongo/db/repl/oplog_fetcher.h11
-rw-r--r--src/mongo/db/repl/oplog_fetcher_test.cpp291
-rw-r--r--src/mongo/db/repl/rollback_checker.cpp2
-rw-r--r--src/mongo/db/repl/rollback_checker.h6
-rw-r--r--src/mongo/db/repl/rollback_checker_test.cpp10
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp13
-rw-r--r--src/mongo/db/repl/rs_rollback.h5
-rw-r--r--src/mongo/db/repl/rs_rollback_test.cpp4
-rw-r--r--src/mongo/db/repl/sync_source_resolver.cpp21
-rw-r--r--src/mongo/db/repl/sync_source_resolver.h6
-rw-r--r--src/mongo/db/repl/sync_source_resolver_test.cpp35
24 files changed, 477 insertions, 218 deletions
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_pv0.yml b/buildscripts/resmokeconfig/suites/replica_sets_pv0.yml
index 3a313af2f10..ed4cc1ad65c 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_pv0.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_pv0.yml
@@ -25,6 +25,8 @@ selector:
- jstests/replsets/last_vote.js
# This test expects the server to log a PV1-only vote-not-granted reason
- jstests/replsets/no_flapping_during_network_partition.js
+ # This test requires terms.
+ - jstests/replsets/rollback_too_new.js
executor:
js_test:
diff --git a/jstests/replsets/chaining_removal.js b/jstests/replsets/chaining_removal.js
index ee70a00289e..bab3373bffa 100644
--- a/jstests/replsets/chaining_removal.js
+++ b/jstests/replsets/chaining_removal.js
@@ -2,6 +2,8 @@
(function() {
"use strict";
+ load("jstests/replsets/rslib.js");
+
var numNodes = 5;
var host = getHostName();
var name = "chaining_removal";
@@ -35,27 +37,9 @@
{configureFailPoint: 'disableMaxSyncSourceLagSecs', mode: 'alwaysOn'}));
// Force node 1 to sync directly from node 0.
- assert.commandWorked(nodes[1].getDB("admin").runCommand({"replSetSyncFrom": nodes[0].host}));
- var res;
- assert.soon(
- function() {
- res = nodes[1].getDB("admin").runCommand({"replSetGetStatus": 1});
- return res.syncingTo === nodes[0].host;
- },
- function() {
- return "node 1 failed to start syncing from node 0: " + tojson(res);
- });
-
+ syncFrom(nodes[1], nodes[0], replTest);
// Force node 4 to sync through node 1.
- assert.commandWorked(nodes[4].getDB("admin").runCommand({"replSetSyncFrom": nodes[1].host}));
- assert.soon(
- function() {
- res = nodes[4].getDB("admin").runCommand({"replSetGetStatus": 1});
- return res.syncingTo === nodes[1].host;
- },
- function() {
- return "node 4 failed to start chaining through node 1: " + tojson(res);
- });
+ syncFrom(nodes[4], nodes[1], replTest);
// write that should reach all nodes
var timeout = 60 * 1000;
diff --git a/jstests/replsets/maxSyncSourceLagSecs.js b/jstests/replsets/maxSyncSourceLagSecs.js
index f616f1fff16..96d6a530aee 100644
--- a/jstests/replsets/maxSyncSourceLagSecs.js
+++ b/jstests/replsets/maxSyncSourceLagSecs.js
@@ -4,6 +4,8 @@
// @tags: [requires_fsync]
(function() {
"use strict";
+ load("jstests/replsets/rslib.js");
+
var name = "maxSyncSourceLagSecs";
var replTest = new ReplSetTest({
name: name,
@@ -21,8 +23,8 @@
var master = replTest.getPrimary();
var slaves = replTest.liveNodes.slaves;
- assert.commandWorked(slaves[0].getDB("admin").runCommand({replSetSyncFrom: master.name}));
- assert.commandWorked(slaves[1].getDB("admin").runCommand({replSetSyncFrom: master.name}));
+ syncFrom(slaves[0], master, replTest);
+ syncFrom(slaves[1], master, replTest);
master.getDB("foo").bar.save({a: 1});
replTest.awaitReplication();
@@ -31,23 +33,7 @@
sleep(4000);
jsTestLog("Setting sync target of slave 2 to slave 1");
- assert.soon(function() {
- // We do a write each time and have this in a try...catch block due to the fallout of
- // SERVER-24114. If that timeout occurs, then we search for another sync source, however we
- // will not find one unless more writes have come in. Additionally, it is possible that
- // slaves[1] will switch to sync from slaves[0] after slaves[1] replicates a write from
- // the primary but before slaves[0] replicates it. slaves[1] will then have to roll back
- // which would cause a network error.
- try {
- slaves[1].getDB("admin").runCommand({replSetSyncFrom: slaves[0].name});
- var res = slaves[1].getDB("admin").runCommand({"replSetGetStatus": 1});
- master.getDB("foo").bar.insert({a: 1});
- return res.syncingTo === slaves[0].name;
- } catch (e) {
- print("Exception in assert.soon, retrying: " + e);
- return false;
- }
- }, "sync target not changed to other slave", 100 * 1000, 2 * 1000);
+ syncFrom(slaves[1], slaves[0], replTest);
printjson(replTest.status());
jsTestLog("Lock slave 1 and add some docs. Force sync target for slave 2 to change to primary");
diff --git a/jstests/replsets/no_chaining.js b/jstests/replsets/no_chaining.js
index ad086c72f9a..07a67c3ea2b 100644
--- a/jstests/replsets/no_chaining.js
+++ b/jstests/replsets/no_chaining.js
@@ -1,3 +1,4 @@
+load("jstests/replsets/rslib.js");
function myprint(x) {
print("chaining output: " + x);
@@ -38,14 +39,7 @@ var checkNoChaining = function() {
};
var forceSync = function() {
- var config;
- try {
- config = nodes[2].getDB("local").system.replset.findOne();
- } catch (e) {
- config = nodes[2].getDB("local").system.replset.findOne();
- }
- var targetHost = config.members[1].host;
- printjson(nodes[2].getDB("admin").runCommand({replSetSyncFrom: targetHost}));
+ syncFrom(nodes[2], nodes[1], replTest);
assert.soon(function() {
return nodes[2].getDB("test").foo.findOne() != null;
}, 'Check for data after force sync');
diff --git a/jstests/replsets/rollback_too_new.js b/jstests/replsets/rollback_too_new.js
index 2e8e4d3693d..4e216a6028d 100644
--- a/jstests/replsets/rollback_too_new.js
+++ b/jstests/replsets/rollback_too_new.js
@@ -9,6 +9,8 @@
(function() {
"use strict";
+ load("jstests/replsets/rslib.js"); // For getLatestOp()
+
// set up a set and grab things for later
var name = "rollback_too_new";
var replTest = new ReplSetTest({name: name, nodes: 3});
@@ -39,14 +41,28 @@
replTest.stop(CID);
- // do one write to master
- // in order to trigger a rollback on C
- assert.writeOK(master.getDB(name).foo.insert({x: 2}, options));
+ // We bump the term to make sure node 0's oplog is ahead of node 2's.
+ var term = getLatestOp(conns[0]).t;
+ try {
+ assert.commandWorked(conns[0].adminCommand({replSetStepDown: 1, force: true}));
+ } catch (e) {
+ if (!isNetworkError(e)) {
+ throw e;
+ }
+ }
+
+ // After stepping down due to the higher term, it will eventually get reelected.
+ replTest.waitForState(conns[0], ReplSetTest.State.PRIMARY);
+ // Wait for the node to increase its term.
+ assert.soon(function() {
+ return getLatestOp(conns[0]).t > term;
+ });
// Node C should connect to new master as a sync source because chaining is disallowed.
// C is ahead of master but it will still connect to it.
clearRawMongoProgramOutput();
replTest.restart(CID);
+
assert.soon(function() {
try {
return rawMongoProgramOutput().match(
diff --git a/jstests/replsets/rslib.js b/jstests/replsets/rslib.js
index d310f46cdbb..14c244e7977 100644
--- a/jstests/replsets/rslib.js
+++ b/jstests/replsets/rslib.js
@@ -1,3 +1,4 @@
+var syncFrom;
var wait;
var occasionally;
var reconnect;
@@ -14,9 +15,27 @@ var getLastOpTime;
(function() {
"use strict";
+ load("jstests/libs/write_concern_util.js");
+
var count = 0;
var w = 0;
+ /**
+ * A wrapper around `replSetSyncFrom` to ensure that the desired sync source is ahead of the
+ * syncing node so that the syncing node can choose to sync from the desired sync source.
+ * It first stops replication on the syncing node so that it can do a write on the desired
+ * sync source and make sure it's ahead. When replication is restarted, the desired sync
+ * source will be a valid sync source for the syncing node.
+ */
+ syncFrom = function(syncingNode, desiredSyncSource, rst) {
+ jsTestLog("Forcing " + syncingNode.name + " to sync from " + desiredSyncSource.name);
+ stopServerReplication(syncingNode);
+ assert.writeOK(rst.getPrimary().getDB("dummy").foo.insert({a: 1}));
+ assert.commandWorked(syncingNode.adminCommand({replSetSyncFrom: desiredSyncSource.name}));
+ restartServerReplication(syncingNode);
+ rst.awaitSyncSource(syncingNode, desiredSyncSource);
+ };
+
wait = function(f, msg) {
w++;
var n = 0;
diff --git a/jstests/replsets/server8070.js b/jstests/replsets/server8070.js
index 6be51507d4d..500def42a51 100644
--- a/jstests/replsets/server8070.js
+++ b/jstests/replsets/server8070.js
@@ -7,6 +7,7 @@
"use strict";
load('jstests/libs/write_concern_util.js');
+ load("jstests/replsets/rslib.js");
// helper to ensure two nodes are at the same place in the oplog
var waitForSameOplogPosition = function(db1, db2, errmsg) {
@@ -47,8 +48,9 @@
replSet.awaitReplication();
jsTest.log("Make sure 2 & 3 are syncing from the primary");
- member2.adminCommand({replSetSyncFrom: getHostName() + ":" + replSet.ports[0]});
- member3.adminCommand({replSetSyncFrom: getHostName() + ":" + replSet.ports[0]});
+ assert.eq(master, replSet.nodes[0]);
+ syncFrom(replSet.nodes[1], master, replSet);
+ syncFrom(replSet.nodes[2], master, replSet);
jsTest.log("Stop 2's replication");
member2.runCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'alwaysOn'});
diff --git a/jstests/replsets/slavedelay3.js b/jstests/replsets/slavedelay3.js
index 2e2675557ab..f07bccfee47 100644
--- a/jstests/replsets/slavedelay3.js
+++ b/jstests/replsets/slavedelay3.js
@@ -27,12 +27,7 @@ nodes[0].disconnect(nodes[2]);
master.foo.insert({x: 1});
-assert.commandWorked(nodes[1].getDB("admin").runCommand({"replSetSyncFrom": nodes[0].host}));
-var res;
-assert.soon(function() {
- res = nodes[1].getDB("admin").runCommand({"replSetGetStatus": 1});
- return res.syncingTo === nodes[0].host;
-}, "node 4 failed to start chaining: " + tojson(res));
+syncFrom(nodes[1], nodes[0], replTest);
// make sure the record still appears in the remote slave
assert.soon(function() {
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 56e5ab0e0c0..c0211a21022 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -270,6 +270,7 @@ void BackgroundSync::_produce(OperationContext* opCtx) {
// find a target to sync from the last optime fetched
OpTime lastOpTimeFetched;
HostAndPort source;
+ HostAndPort oldSource = _syncSourceHost;
SyncSourceResolverResponse syncSourceResp;
{
const OpTime minValidSaved = storageInterface->getMinValid(opCtx);
@@ -332,6 +333,16 @@ void BackgroundSync::_produce(OperationContext* opCtx) {
stdx::lock_guard<stdx::mutex> lock(_mutex);
_syncSourceHost = syncSourceResp.getSyncSource();
source = _syncSourceHost;
+
+ // If our sync source has not changed, it is likely caused by our heartbeat data map being
+ // out of date. In that case we sleep for 1 second to reduce the amount we spin waiting
+ // for our map to update.
+ if (oldSource == source) {
+ log() << "Chose same sync source candidate as last time, " << source
+ << ". Sleeping for 1 second to avoid immediately choosing a new sync source for "
+ "the same reason as last time.";
+ sleepsecs(1);
+ }
} else {
if (!syncSourceResp.isOK()) {
log() << "failed to find sync source, received error "
@@ -367,7 +378,6 @@ void BackgroundSync::_produce(OperationContext* opCtx) {
Status fetcherReturnStatus = Status::OK();
DataReplicatorExternalStateBackgroundSync dataReplicatorExternalState(
_replCoord, _replicationCoordinatorExternalState, this);
- auto rbidCopyForFetcher = syncSourceResp.rbid; // OplogFetcher's callback modifies this.
OplogFetcher* oplogFetcher;
try {
auto executor = _replicationCoordinatorExternalState->getTaskExecutor();
@@ -385,13 +395,14 @@ void BackgroundSync::_produce(OperationContext* opCtx) {
NamespaceString(rsOplogName),
config,
_replicationCoordinatorExternalState->getOplogFetcherMaxFetcherRestarts(),
+ syncSourceResp.rbid,
+ true /* requireFresherSyncSource */,
&dataReplicatorExternalState,
stdx::bind(&BackgroundSync::_enqueueDocuments,
this,
stdx::placeholders::_1,
stdx::placeholders::_2,
- stdx::placeholders::_3,
- &rbidCopyForFetcher),
+ stdx::placeholders::_3),
onOplogFetcherShutdownCallbackFn);
oplogFetcher = _oplogFetcher.get();
} catch (const mongo::DBException& ex) {
@@ -428,8 +439,7 @@ void BackgroundSync::_produce(OperationContext* opCtx) {
// Do not blacklist the server here, it will be blacklisted when we try to reuse it,
// if it can't return a matching oplog start from the last fetch oplog ts field.
return;
- } else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing ||
- fetcherReturnStatus.code() == ErrorCodes::RemoteOplogStale) {
+ } else if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing) {
if (_replCoord->getMemberState().primary()) {
// TODO: Abort catchup mode early if rollback detected.
warning() << "Rollback situation detected in catch-up mode; catch-up mode will end.";
@@ -503,50 +513,7 @@ void BackgroundSync::_produce(OperationContext* opCtx) {
Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begin,
Fetcher::Documents::const_iterator end,
- const OplogFetcher::DocumentsInfo& info,
- boost::optional<int>* requiredRBID) {
- // Once we establish our cursor, we need to ensure that our upstream node hasn't rolled back
- // since that could cause it to not have our required minValid point. The cursor will be killed
- // if the upstream node rolls back so we don't need to keep checking. This must be blocking
- // since the Fetcher doesn't give us a way to defer sending the getmores after we return.
- if (*requiredRBID) {
- auto rbidStatus = Status(ErrorCodes::InternalError, "");
- auto handle =
- _replicationCoordinatorExternalState->getTaskExecutor()->scheduleRemoteCommand(
- {getSyncTarget(), "admin", BSON("replSetGetRBID" << 1), nullptr},
- [&](const executor::TaskExecutor::RemoteCommandCallbackArgs& rbidReply) {
- rbidStatus = rbidReply.response.status;
- if (!rbidStatus.isOK())
- return;
-
- rbidStatus = getStatusFromCommandResult(rbidReply.response.data);
- if (!rbidStatus.isOK())
- return;
-
- const auto rbidElem = rbidReply.response.data["rbid"];
- if (rbidElem.type() != NumberInt) {
- rbidStatus = Status(ErrorCodes::BadValue,
- str::stream() << "Upstream node returned an "
- << "rbid with invalid type "
- << rbidElem.type());
- return;
- }
- if (rbidElem.Int() != **requiredRBID) {
- rbidStatus = Status(ErrorCodes::BadValue,
- "Upstream node rolled back after verifying "
- "that it had our MinValid point. Retrying.");
- }
- });
- if (!handle.isOK())
- return handle.getStatus();
-
- _replicationCoordinatorExternalState->getTaskExecutor()->wait(handle.getValue());
- if (!rbidStatus.isOK())
- return rbidStatus;
-
- requiredRBID->reset(); // Don't come back to this block while on this cursor.
- }
-
+ const OplogFetcher::DocumentsInfo& info) {
// If this is the first batch of operations returned from the query, "toApplyDocumentCount" will
// be one fewer than "networkDocumentCount" because the first document (which was applied
// previously) is skipped.
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index e46d7842936..f0c50f870e9 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -158,8 +158,7 @@ private:
*/
Status _enqueueDocuments(Fetcher::Documents::const_iterator begin,
Fetcher::Documents::const_iterator end,
- const OplogFetcher::DocumentsInfo& info,
- boost::optional<int>* requiredRBID);
+ const OplogFetcher::DocumentsInfo& info);
// restart syncing
void start(OperationContext* opCtx);
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 92e64cbb993..81d63055d72 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -682,6 +682,8 @@ void DataReplicator::_lastOplogEntryFetcherCallbackForBeginTimestamp(
_opts.remoteOplogNS,
config,
_opts.oplogFetcherMaxFetcherRestarts,
+ _rollbackChecker->getBaseRBID(),
+ false /* requireFresherSyncSource */,
_dataReplicatorExternalState.get(),
stdx::bind(&DataReplicator::_enqueueDocuments,
this,
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index 61a5dcb8f2d..f6eaae1fa99 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -476,10 +476,17 @@ BSONObj makeRollbackCheckerResponse(int rollbackId) {
/**
* Generates a cursor response for a Fetcher to consume.
*/
-BSONObj makeCursorResponse(CursorId cursorId,
- const NamespaceString& nss,
- std::vector<BSONObj> docs,
- bool isFirstBatch = true) {
+RemoteCommandResponse makeCursorResponse(CursorId cursorId,
+ const NamespaceString& nss,
+ std::vector<BSONObj> docs,
+ bool isFirstBatch = true,
+ int rbid = 1) {
+ OpTime futureOpTime(Timestamp(1000, 1000), 1000);
+ rpc::OplogQueryMetadata oqMetadata(futureOpTime, futureOpTime, rbid, 0, 0);
+ BSONObjBuilder metadataBob;
+ ASSERT_OK(oqMetadata.writeToMetadata(&metadataBob));
+ auto metadataObj = metadataBob.obj();
+
BSONObjBuilder bob;
{
BSONObjBuilder cursorBob(bob.subobjStart("cursor"));
@@ -494,7 +501,7 @@ BSONObj makeCursorResponse(CursorId cursorId,
}
}
bob.append("ok", 1);
- return bob.obj();
+ return {bob.obj(), metadataObj, Milliseconds(0)};
}
/**
@@ -2889,11 +2896,7 @@ TEST_F(DataReplicatorTest, DataReplicatorPassesThroughMultiApplierScheduleError)
// MultiApplier next time _getNextApplierBatchCallback() runs.
net->scheduleSuccessfulResponse(
oplogFetcherNoi,
- executor::RemoteCommandResponse(
- makeCursorResponse(
- 1LL, _options.localOplogNS, {makeOplogEntry(1), makeOplogEntry(2)}),
- BSONObj(),
- Milliseconds(0)));
+ makeCursorResponse(1LL, _options.localOplogNS, {makeOplogEntry(1), makeOplogEntry(2)}));
net->runReadyNetworkOperations();
// Ignore OplogFetcher's getMore request.
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index d8aacca5a04..052de5cba7f 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -144,21 +144,75 @@ StatusWith<BSONObj> makeMetadataObject(bool isV1ElectionProtocol) {
* Checks the first batch of results from query.
* 'documents' are the first batch of results returned from tailing the remote oplog.
* 'lastFetched' optime and hash should be consistent with the predicate in the query.
- * Returns RemoteOplogStale if the oplog query has no results.
+ * 'remoteLastOpApplied' is the last OpTime applied on the sync source. This is optional for
+ * compatibility with 3.4 servers that do not send OplogQueryMetadata.
+ * 'requiredRBID' is a RollbackID received when we chose the sync source that we use here to
+ * guarantee we have not rolled back since we confirmed the sync source had our minValid.
+ * 'remoteRBID' is a RollbackId for the sync source returned in this oplog query. This is optional
+ * for compatibility with 3.4 servers that do not send OplogQueryMetadata.
+ * 'requireFresherSyncSource' is a boolean indicating whether we should require the sync source's
+ * oplog to be ahead of ours. If false, the sync source's oplog is allowed to be at the same point
+ * as ours, but still cannot be behind ours.
+ *
+ * TODO (SERVER-27668): Make remoteLastOpApplied and remoteRBID non-optional in mongodb 3.8.
+ *
* Returns OplogStartMissing if we cannot find the optime of the last fetched operation in
* the remote oplog.
*/
-Status checkRemoteOplogStart(const Fetcher::Documents& documents, OpTimeWithHash lastFetched) {
+Status checkRemoteOplogStart(const Fetcher::Documents& documents,
+ OpTimeWithHash lastFetched,
+ boost::optional<OpTime> remoteLastOpApplied,
+ int requiredRBID,
+ boost::optional<int> remoteRBID,
+ bool requireFresherSyncSource) {
+ // Once we establish our cursor, we need to ensure that our upstream node hasn't rolled back
+ // since that could cause it to not have our required minValid point. The cursor will be
+ // killed if the upstream node rolls back so we don't need to keep checking once the cursor
+ // is established.
+ if (remoteRBID && (*remoteRBID != requiredRBID)) {
+ return Status(ErrorCodes::InvalidSyncSource,
+ "Upstream node rolled back after choosing it as a sync source. Choosing "
+ "new sync source.");
+ }
+
+ // The SyncSourceResolver never checks that the sync source candidate is actually ahead of
+ // us. Rather than have it check there with an extra network roundtrip, we check here.
+ if (requireFresherSyncSource && remoteLastOpApplied &&
+ (*remoteLastOpApplied <= lastFetched.opTime)) {
+ return Status(ErrorCodes::InvalidSyncSource,
+ str::stream() << "Sync source's last applied OpTime "
+ << remoteLastOpApplied->toString()
+ << " is not greater than our last fetched OpTime "
+ << lastFetched.opTime.toString()
+ << ". Choosing new sync source.");
+ } else if (remoteLastOpApplied && (*remoteLastOpApplied < lastFetched.opTime)) {
+ // In initial sync, the lastFetched OpTime will almost always equal the remoteLastOpApplied
+ // since we fetch the sync source's last applied OpTime to determine where to start our
+ // OplogFetcher. This is fine since no other node can sync off of an initial syncing node
+ // and thus cannot form a sync source cycle. To account for this, we must relax the
+ // constraint on our sync source being fresher.
+ return Status(ErrorCodes::InvalidSyncSource,
+ str::stream() << "Sync source's last applied OpTime "
+ << remoteLastOpApplied->toString()
+ << " is older than our last fetched OpTime "
+ << lastFetched.opTime.toString()
+ << ". Choosing new sync source.");
+ }
+
+ // At this point we know that our sync source has our minValid and is ahead of us, so if our
+ // history diverges from our sync source's we should prefer its history and roll back ours.
+
+ // Since we checked for rollback and our sync source is ahead of us, an empty batch means that
+ // we have a higher timestamp on our last fetched OpTime than our sync source's last applied
+ // OpTime, but a lower term. When this occurs, we must roll back our inconsistent oplog entry.
if (documents.empty()) {
- // The GTE query from upstream returns nothing, so we're ahead of the upstream.
- return Status(ErrorCodes::RemoteOplogStale,
- str::stream() << "We are ahead of the sync source. Our last op time fetched: "
- << lastFetched.opTime.toString());
+ return Status(ErrorCodes::OplogStartMissing, "Received an empty batch from sync source.");
}
+
const auto& o = documents.front();
auto opTimeResult = OpTime::parseFromOplogEntry(o);
if (!opTimeResult.isOK()) {
- return Status(ErrorCodes::OplogStartMissing,
+ return Status(ErrorCodes::InvalidBSON,
str::stream() << "our last op time fetched: " << lastFetched.opTime.toString()
<< " (hash: "
<< lastFetched.value
@@ -277,6 +331,8 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor,
NamespaceString nss,
ReplSetConfig config,
std::size_t maxFetcherRestarts,
+ int requiredRBID,
+ bool requireFresherSyncSource,
DataReplicatorExternalState* dataReplicatorExternalState,
EnqueueDocumentsFn enqueueDocumentsFn,
OnShutdownCallbackFn onShutdownCallbackFn)
@@ -285,6 +341,8 @@ OplogFetcher::OplogFetcher(executor::TaskExecutor* executor,
_nss(nss),
_metadataObject(uassertStatusOK(makeMetadataObject(config.getProtocolVersion() == 1LL))),
_maxFetcherRestarts(maxFetcherRestarts),
+ _requiredRBID(requiredRBID),
+ _requireFresherSyncSource(requireFresherSyncSource),
_dataReplicatorExternalState(dataReplicatorExternalState),
_enqueueDocumentsFn(enqueueDocumentsFn),
_awaitDataTimeout(calculateAwaitDataTimeout(config)),
@@ -480,9 +538,17 @@ void OplogFetcher::_callback(const Fetcher::QueryResponseStatus& result,
// Check start of remote oplog and, if necessary, stop fetcher to execute rollback.
if (queryResponse.first) {
- auto status = checkRemoteOplogStart(documents, opTimeWithHash);
+ auto remoteRBID = oqMetadata ? boost::make_optional(oqMetadata->getRBID()) : boost::none;
+ auto remoteLastApplied =
+ oqMetadata ? boost::make_optional(oqMetadata->getLastOpApplied()) : boost::none;
+ auto status = checkRemoteOplogStart(documents,
+ opTimeWithHash,
+ remoteLastApplied,
+ _requiredRBID,
+ remoteRBID,
+ _requireFresherSyncSource);
if (!status.isOK()) {
- // Stop oplog fetcher and execute rollback.
+ // Stop oplog fetcher and execute rollback if necessary.
_finishCallback(status, opTimeWithHash);
return;
}
diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h
index 9ed1fdbe63a..5d739bfae0c 100644
--- a/src/mongo/db/repl/oplog_fetcher.h
+++ b/src/mongo/db/repl/oplog_fetcher.h
@@ -136,6 +136,8 @@ public:
NamespaceString nss,
ReplSetConfig config,
std::size_t maxFetcherRestarts,
+ int requiredRBID,
+ bool requireFresherSyncSource,
DataReplicatorExternalState* dataReplicatorExternalState,
EnqueueDocumentsFn enqueueDocumentsFn,
OnShutdownCallbackFn onShutdownCallbackFn);
@@ -256,6 +258,15 @@ private:
// Maximum number of times to consecutively restart the fetcher on non-cancellation errors.
const std::size_t _maxFetcherRestarts;
+ // Rollback ID that the sync source is required to have after the first batch.
+ int _requiredRBID;
+
+ // A boolean indicating whether we should error if the sync source is not ahead of our initial
+ // last fetched OpTime on the first batch. Most of the time this should be set to true,
+ // but there are certain special cases, namely during initial sync, where it's acceptable for
+ // our sync source to have no ops newer than _lastFetched.
+ bool _requireFresherSyncSource;
+
DataReplicatorExternalState* const _dataReplicatorExternalState;
const EnqueueDocumentsFn _enqueueDocumentsFn;
const Milliseconds _awaitDataTimeout;
diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp
index 5419e7d7f13..827cbd86e26 100644
--- a/src/mongo/db/repl/oplog_fetcher_test.cpp
+++ b/src/mongo/db/repl/oplog_fetcher_test.cpp
@@ -91,8 +91,18 @@ protected:
* the oplog query and shuts down.
* Returns shutdown state.
*/
- std::unique_ptr<ShutdownState> processSingleBatch(RemoteCommandResponse response);
- std::unique_ptr<ShutdownState> processSingleBatch(BSONObj obj);
+ std::unique_ptr<ShutdownState> processSingleBatch(RemoteCommandResponse response,
+ bool requireFresherSyncSource = true);
+ std::unique_ptr<ShutdownState> processSingleBatch(BSONObj obj,
+ bool requireFresherSyncSource = true);
+
+ /**
+ * Makes an OplogQueryMetadata object with the given fields and a stale committed OpTime.
+ */
+ BSONObj makeOplogQueryMetadataObject(OpTime lastAppliedOpTime,
+ int rbid,
+ int primaryIndex,
+ int syncSourceIndex);
/**
* Tests checkSyncSource result handling.
@@ -107,6 +117,9 @@ protected:
RemoteCommandRequest testTwoBatchHandling(bool isV1ElectionProtocol);
OpTimeWithHash lastFetched;
+ OpTime remoteNewerOpTime;
+ OpTime staleOpTime;
+ int rbid;
std::unique_ptr<DataReplicatorExternalStateMock> dataReplicatorExternalState;
@@ -135,6 +148,9 @@ void OplogFetcherTest::setUp() {
launchExecutorThread();
lastFetched = {456LL, {{123, 0}, 1}};
+ remoteNewerOpTime = {{124, 1}, 2};
+ staleOpTime = {{1, 1}, 0};
+ rbid = 2;
dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>();
dataReplicatorExternalState->currentTerm = lastFetched.opTime.getTerm();
@@ -174,6 +190,17 @@ RemoteCommandRequest OplogFetcherTest::processNetworkResponse(
expectReadyRequestsAfterProcessing);
}
+BSONObj OplogFetcherTest::makeOplogQueryMetadataObject(OpTime lastAppliedOpTime,
+ int rbid,
+ int primaryIndex,
+ int syncSourceIndex) {
+ rpc::OplogQueryMetadata oqMetadata(
+ staleOpTime, lastAppliedOpTime, rbid, primaryIndex, syncSourceIndex);
+ BSONObjBuilder bob;
+ ASSERT_OK(oqMetadata.writeToMetadata(&bob));
+ return bob.obj();
+}
+
HostAndPort source("localhost:12345");
NamespaceString nss("local.oplog.rs");
@@ -200,8 +227,8 @@ ReplSetConfig _createConfig(bool isV1ElectionProtocol) {
return config;
}
-std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(
- RemoteCommandResponse response) {
+std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(RemoteCommandResponse response,
+ bool requireFresherSyncSource) {
auto shutdownState = stdx::make_unique<ShutdownState>();
OplogFetcher oplogFetcher(&getExecutor(),
@@ -210,6 +237,8 @@ std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(
nss,
_createConfig(true),
0,
+ rbid,
+ requireFresherSyncSource,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
stdx::ref(*shutdownState));
@@ -229,8 +258,10 @@ std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(
return shutdownState;
}
-std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(BSONObj obj) {
- return processSingleBatch({obj, rpc::makeEmptyMetadata(), Milliseconds(0)});
+std::unique_ptr<ShutdownState> OplogFetcherTest::processSingleBatch(BSONObj obj,
+ bool requireFresherSyncSource) {
+ return processSingleBatch({obj, rpc::makeEmptyMetadata(), Milliseconds(0)},
+ requireFresherSyncSource);
}
TEST_F(OplogFetcherTest, InvalidConstruction) {
@@ -241,6 +272,8 @@ TEST_F(OplogFetcherTest, InvalidConstruction) {
nss,
_createConfig(true),
0,
+ -1,
+ true,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {}),
@@ -255,6 +288,8 @@ TEST_F(OplogFetcherTest, InvalidConstruction) {
nss,
_createConfig(true),
0,
+ -1,
+ true,
dataReplicatorExternalState.get(),
OplogFetcher::EnqueueDocumentsFn(),
[](Status, OpTimeWithHash) {}),
@@ -269,6 +304,8 @@ TEST_F(OplogFetcherTest, InvalidConstruction) {
nss,
ReplSetConfig(),
0,
+ -1,
+ true,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {}),
@@ -283,6 +320,8 @@ TEST_F(OplogFetcherTest, InvalidConstruction) {
nss,
_createConfig(true),
0,
+ -1,
+ true,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
OplogFetcher::OnShutdownCallbackFn()),
@@ -298,6 +337,8 @@ TEST_F(OplogFetcherTest, StartupWhenActiveReturnsIllegalOperation) {
nss,
_createConfig(true),
0,
+ -1,
+ true,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {});
@@ -316,6 +357,8 @@ TEST_F(OplogFetcherTest, ShutdownAfterStartupTransitionsToShuttingDownState) {
nss,
_createConfig(true),
0,
+ -1,
+ true,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {});
@@ -333,6 +376,8 @@ TEST_F(OplogFetcherTest, StartupWhenShuttingDownReturnsShutdownInProgress) {
nss,
_createConfig(true),
0,
+ -1,
+ true,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {});
@@ -358,6 +403,8 @@ TEST_F(
nss,
_createConfig(true),
0,
+ -1,
+ true,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {})
@@ -379,6 +426,8 @@ TEST_F(
nss,
_createConfig(true),
0,
+ -1,
+ true,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {})
@@ -397,6 +446,8 @@ TEST_F(OplogFetcherTest, MetadataObjectContainsMetadataFieldsUnderProtocolVersio
nss,
_createConfig(true),
0,
+ -1,
+ true,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {})
@@ -413,6 +464,8 @@ TEST_F(OplogFetcherTest, MetadataObjectIsEmptyUnderProtocolVersion0) {
nss,
_createConfig(false),
0,
+ -1,
+ true,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {})
@@ -430,6 +483,8 @@ TEST_F(OplogFetcherTest, AwaitDataTimeoutShouldEqualHalfElectionTimeoutUnderProt
nss,
config,
0,
+ -1,
+ true,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {})
@@ -444,6 +499,8 @@ TEST_F(OplogFetcherTest, AwaitDataTimeoutShouldBeAConstantUnderProtocolVersion0)
nss,
_createConfig(false),
0,
+ -1,
+ true,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {})
@@ -461,6 +518,8 @@ TEST_F(OplogFetcherTest, ShuttingExecutorDownShouldPreventOplogFetcherFromStarti
nss,
_createConfig(true),
0,
+ -1,
+ true,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[](Status, OpTimeWithHash) {});
@@ -485,6 +544,8 @@ TEST_F(OplogFetcherTest, ShuttingExecutorDownAfterStartupStopsTheOplogFetcher) {
nss,
_createConfig(true),
0,
+ -1,
+ true,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
stdx::ref(shutdownState));
@@ -562,6 +623,7 @@ TEST_F(OplogFetcherTest,
BSONObjBuilder bob;
ASSERT_OK(metadata.writeToMetadata(&bob));
auto metadataObj = bob.obj();
+
ASSERT_OK(processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}),
metadataObj,
Milliseconds(0)})
@@ -574,7 +636,7 @@ TEST_F(OplogFetcherTest,
TEST_F(OplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMetadataFn) {
rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata(lastFetched.opTime, lastFetched.opTime, 1, 2, 2);
+ rpc::OplogQueryMetadata oqMetadata(staleOpTime, remoteNewerOpTime, rbid, 2, 2);
BSONObjBuilder bob;
ASSERT_OK(replMetadata.writeToMetadata(&bob));
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
@@ -590,6 +652,95 @@ TEST_F(OplogFetcherTest, ValidMetadataWithInResponseShouldBeForwardedToProcessMe
dataReplicatorExternalState->oqMetadataProcessed.getPrimaryIndex());
}
+TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceRollsBack) {
+ rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1);
+ rpc::OplogQueryMetadata oqMetadata(staleOpTime, remoteNewerOpTime, rbid + 1, 2, 2);
+ BSONObjBuilder bob;
+ ASSERT_OK(replMetadata.writeToMetadata(&bob));
+ ASSERT_OK(oqMetadata.writeToMetadata(&bob));
+ auto metadataObj = bob.obj();
+
+ ASSERT_EQUALS(ErrorCodes::InvalidSyncSource,
+ processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}),
+ metadataObj,
+ Milliseconds(0)})
+ ->getStatus());
+ ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
+ ASSERT(lastEnqueuedDocuments.empty());
+}
+
+TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehind) {
+ rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1);
+ rpc::OplogQueryMetadata oqMetadata(staleOpTime, staleOpTime, rbid, 2, 2);
+ BSONObjBuilder bob;
+ ASSERT_OK(replMetadata.writeToMetadata(&bob));
+ ASSERT_OK(oqMetadata.writeToMetadata(&bob));
+ auto metadataObj = bob.obj();
+
+ ASSERT_EQUALS(ErrorCodes::InvalidSyncSource,
+ processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}),
+ metadataObj,
+ Milliseconds(0)})
+ ->getStatus());
+ ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
+ ASSERT(lastEnqueuedDocuments.empty());
+}
+
+TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead) {
+ rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1);
+ rpc::OplogQueryMetadata oqMetadata(staleOpTime, lastFetched.opTime, rbid, 2, 2);
+ BSONObjBuilder bob;
+ ASSERT_OK(replMetadata.writeToMetadata(&bob));
+ ASSERT_OK(oqMetadata.writeToMetadata(&bob));
+ auto metadataObj = bob.obj();
+
+ ASSERT_EQUALS(ErrorCodes::InvalidSyncSource,
+ processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}),
+ metadataObj,
+ Milliseconds(0)})
+ ->getStatus());
+ ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
+ ASSERT(lastEnqueuedDocuments.empty());
+}
+
+TEST_F(OplogFetcherTest,
+ MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehindWithoutRequiringFresherSyncSource) {
+ rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1);
+ rpc::OplogQueryMetadata oqMetadata(staleOpTime, staleOpTime, rbid, 2, 2);
+ BSONObjBuilder bob;
+ ASSERT_OK(replMetadata.writeToMetadata(&bob));
+ ASSERT_OK(oqMetadata.writeToMetadata(&bob));
+ auto metadataObj = bob.obj();
+
+ ASSERT_EQUALS(ErrorCodes::InvalidSyncSource,
+ processSingleBatch({makeCursorResponse(0, {makeNoopOplogEntry(lastFetched)}),
+ metadataObj,
+ Milliseconds(0)},
+ false)
+ ->getStatus());
+ ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
+ ASSERT(lastEnqueuedDocuments.empty());
+}
+
+TEST_F(OplogFetcherTest,
+ MetadataAndBatchAreProcessedWhenSyncSourceIsNotAheadWithoutRequiringFresherSyncSource) {
+ rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1);
+ rpc::OplogQueryMetadata oqMetadata(staleOpTime, lastFetched.opTime, rbid, 2, 2);
+ BSONObjBuilder bob;
+ ASSERT_OK(replMetadata.writeToMetadata(&bob));
+ ASSERT_OK(oqMetadata.writeToMetadata(&bob));
+ auto metadataObj = bob.obj();
+
+ auto entry = makeNoopOplogEntry(lastFetched);
+ auto shutdownState =
+ processSingleBatch({makeCursorResponse(0, {entry}), metadataObj, Milliseconds(0)}, false);
+ ASSERT_OK(shutdownState->getStatus());
+ ASSERT(dataReplicatorExternalState->metadataWasProcessed);
+ ASSERT_EQUALS(OpTimeWithHash(entry["h"].numberLong(),
+ unittest::assertGet(OpTime::parseFromOplogEntry(entry))),
+ shutdownState->getLastFetched());
+}
+
TEST_F(OplogFetcherTest,
MetadataWithoutOplogQueryMetadataIsNotProcessedOnBatchThatTriggersRollback) {
rpc::ReplSetMetadata metadata(1, lastFetched.opTime, lastFetched.opTime, 1, OID::gen(), 2, 2);
@@ -607,7 +758,7 @@ TEST_F(OplogFetcherTest,
TEST_F(OplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) {
rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata(lastFetched.opTime, lastFetched.opTime, 1, 2, 2);
+ rpc::OplogQueryMetadata oqMetadata(staleOpTime, remoteNewerOpTime, rbid, 2, 2);
BSONObjBuilder bob;
ASSERT_OK(replMetadata.writeToMetadata(&bob));
ASSERT_OK(oqMetadata.writeToMetadata(&bob));
@@ -629,64 +780,80 @@ TEST_F(OplogFetcherTest, EmptyMetadataIsNotProcessed) {
ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed);
}
-TEST_F(OplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithRemoteOplogStaleError) {
- ASSERT_EQUALS(ErrorCodes::RemoteOplogStale,
+TEST_F(OplogFetcherTest, EmptyFirstBatchStopsOplogFetcherWithOplogStartMissingError) {
+ ASSERT_EQUALS(ErrorCodes::OplogStartMissing,
processSingleBatch(makeCursorResponse(0, {}))->getStatus());
}
-TEST_F(OplogFetcherTest,
- MissingOpTimeInFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) {
- ASSERT_EQUALS(ErrorCodes::OplogStartMissing,
- processSingleBatch(makeCursorResponse(0, {BSONObj()}))->getStatus());
+TEST_F(OplogFetcherTest, MissingOpTimeInFirstDocumentCausesOplogFetcherToStopWithInvalidBSONError) {
+ auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
+ ASSERT_EQUALS(
+ ErrorCodes::InvalidBSON,
+ processSingleBatch({makeCursorResponse(0, {BSONObj()}), metadataObj, Milliseconds(0)})
+ ->getStatus());
}
TEST_F(
OplogFetcherTest,
LastOpTimeFetchedDoesNotMatchFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) {
+ auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
ASSERT_EQUALS(ErrorCodes::OplogStartMissing,
processSingleBatch(
- makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}))
+ {makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}),
+ metadataObj,
+ Milliseconds(0)})
->getStatus());
}
TEST_F(OplogFetcherTest,
LastHashFetchedDoesNotMatchFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) {
+ auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
ASSERT_EQUALS(
ErrorCodes::OplogStartMissing,
processSingleBatch(
- makeCursorResponse(0, {makeNoopOplogEntry(lastFetched.opTime, lastFetched.value + 1)}))
+ {makeCursorResponse(0, {makeNoopOplogEntry(lastFetched.opTime, lastFetched.value + 1)}),
+ metadataObj,
+ Milliseconds(0)})
->getStatus());
}
TEST_F(OplogFetcherTest,
MissingOpTimeInSecondDocumentOfFirstBatchCausesOplogFetcherToStopWithNoSuchKey) {
- ASSERT_EQUALS(
- ErrorCodes::NoSuchKey,
- processSingleBatch(makeCursorResponse(0,
- {makeNoopOplogEntry(lastFetched),
- BSON("o" << BSON("msg"
- << "oplog entry without optime"))}))
- ->getStatus());
+ auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey,
+ processSingleBatch(
+ {makeCursorResponse(0,
+ {makeNoopOplogEntry(lastFetched),
+ BSON("o" << BSON("msg"
+ << "oplog entry without optime"))}),
+ metadataObj,
+ Milliseconds(0)})
+ ->getStatus());
}
TEST_F(OplogFetcherTest, TimestampsNotAdvancingInBatchCausesOplogFetcherStopWithOplogOutOfOrder) {
+ auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder,
- processSingleBatch(makeCursorResponse(0,
- {makeNoopOplogEntry(lastFetched),
- makeNoopOplogEntry(Seconds(1000), 1),
- makeNoopOplogEntry(Seconds(2000), 1),
- makeNoopOplogEntry(Seconds(1500), 1)}))
+ processSingleBatch({makeCursorResponse(0,
+ {makeNoopOplogEntry(lastFetched),
+ makeNoopOplogEntry(Seconds(1000), 1),
+ makeNoopOplogEntry(Seconds(2000), 1),
+ makeNoopOplogEntry(Seconds(1500), 1)}),
+ metadataObj,
+ Milliseconds(0)})
->getStatus());
}
TEST_F(OplogFetcherTest, OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenEnqueuingDocuments) {
+ auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
+
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200);
auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300);
Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry};
- auto shutdownState = processSingleBatch(
- {makeCursorResponse(0, documents), rpc::makeEmptyMetadata(), Milliseconds(0)});
+ auto shutdownState =
+ processSingleBatch({makeCursorResponse(0, documents), metadataObj, Milliseconds(0)});
ASSERT_EQUALS(2U, lastEnqueuedDocuments.size());
ASSERT_BSONOBJ_EQ(secondEntry, lastEnqueuedDocuments[0]);
@@ -713,6 +880,8 @@ TEST_F(OplogFetcherTest, OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenE
}
TEST_F(OplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromCallback) {
+ auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
+
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200);
auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300);
@@ -724,8 +893,8 @@ TEST_F(OplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromCallback) {
return Status(ErrorCodes::InternalError, "my custom error");
};
- auto shutdownState = processSingleBatch(
- {makeCursorResponse(0, documents), rpc::makeEmptyMetadata(), Milliseconds(0)});
+ auto shutdownState =
+ processSingleBatch({makeCursorResponse(0, documents), metadataObj, Milliseconds(0)});
ASSERT_EQ(shutdownState->getStatus(), Status(ErrorCodes::InternalError, "my custom error"));
}
@@ -790,7 +959,8 @@ TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithReplSetMetadataStopsTheOplogFe
TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetcher) {
rpc::ReplSetMetadata replMetadata(
lastFetched.opTime.getTerm(), OpTime(), OpTime(), 1, OID::gen(), -1, -1);
- rpc::OplogQueryMetadata oqMetadata({{Seconds(10000), 0}, 1}, {{Seconds(20000), 0}, 1}, 1, 2, 2);
+ rpc::OplogQueryMetadata oqMetadata(
+ {{Seconds(10000), 0}, 1}, {{Seconds(20000), 0}, 1}, rbid, 2, 2);
testSyncSourceChecking(&replMetadata, &oqMetadata);
@@ -828,7 +998,7 @@ TEST_F(OplogFetcherTest,
2,
2);
rpc::OplogQueryMetadata oqMetadata(
- {{Seconds(10000), 0}, 1}, {{Seconds(20000), 0}, 1}, 1, 2, -1);
+ {{Seconds(10000), 0}, 1}, {{Seconds(20000), 0}, 1}, rbid, 2, -1);
testSyncSourceChecking(&replMetadata, &oqMetadata);
@@ -851,6 +1021,8 @@ RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling(bool isV1ElectionPro
nss,
_createConfig(isV1ElectionProtocol),
0,
+ rbid,
+ true,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
stdx::ref(shutdownState));
@@ -862,7 +1034,11 @@ RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling(bool isV1ElectionPro
CursorId cursorId = 22LL;
auto firstEntry = makeNoopOplogEntry(lastFetched);
auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200);
- processNetworkResponse(makeCursorResponse(cursorId, {firstEntry, secondEntry}), true);
+
+ auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
+ processNetworkResponse(
+ {makeCursorResponse(cursorId, {firstEntry, secondEntry}), metadataObj, Milliseconds(0)},
+ true);
ASSERT_EQUALS(1U, lastEnqueuedDocuments.size());
ASSERT_BSONOBJ_EQ(secondEntry, lastEnqueuedDocuments[0]);
@@ -1084,6 +1260,8 @@ TEST_F(OplogFetcherTest, OplogFetcherCreatesNewFetcherOnCallbackErrorDuringGetMo
nss,
_createConfig(true),
maxFetcherRestarts,
+ rbid,
+ true,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
stdx::ref(*shutdownState));
@@ -1091,9 +1269,13 @@ TEST_F(OplogFetcherTest, OplogFetcherCreatesNewFetcherOnCallbackErrorDuringGetMo
ASSERT_OK(oplogFetcher.startup());
+ auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
+
// Send first batch from FIND.
_assertFindCommandTimestampEquals(
- ops[0], processNetworkResponse(makeCursorResponse(1, {ops[0], ops[1], ops[2]}), true));
+ ops[0],
+ processNetworkResponse(
+ {makeCursorResponse(1, {ops[0], ops[1], ops[2]}), metadataObj, Milliseconds(0)}, true));
// Send error during GETMORE.
processNetworkResponse({ErrorCodes::CursorNotFound, "blah"}, true);
@@ -1101,7 +1283,10 @@ TEST_F(OplogFetcherTest, OplogFetcherCreatesNewFetcherOnCallbackErrorDuringGetMo
// Send first batch from FIND, and Check that it started from the end of the last FIND response.
// Check that the optimes match for the query and last oplog entry.
_assertFindCommandTimestampEquals(
- ops[2], processNetworkResponse(makeCursorResponse(0, {ops[2], ops[3], ops[4]}), false));
+ ops[2],
+ processNetworkResponse(
+ {makeCursorResponse(0, {ops[2], ops[3], ops[4]}), metadataObj, Milliseconds(0)},
+ false));
// Done.
oplogFetcher.join();
@@ -1119,6 +1304,8 @@ TEST_F(OplogFetcherTest, OplogFetcherStopsRestartingFetcherIfRestartLimitIsReach
nss,
_createConfig(true),
maxFetcherRestarts,
+ rbid,
+ true,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
stdx::ref(*shutdownState));
@@ -1127,8 +1314,13 @@ TEST_F(OplogFetcherTest, OplogFetcherStopsRestartingFetcherIfRestartLimitIsReach
ASSERT_OK(oplogFetcher.startup());
unittest::log() << "processing find request from first fetcher";
+
+
+ auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
_assertFindCommandTimestampEquals(
- ops[0], processNetworkResponse(makeCursorResponse(1, {ops[0], ops[1], ops[2]}), true));
+ ops[0],
+ processNetworkResponse(
+ {makeCursorResponse(1, {ops[0], ops[1], ops[2]}), metadataObj, Milliseconds(0)}, true));
unittest::log() << "sending error response to getMore request from first fetcher";
assertRemoteCommandNameEquals(
@@ -1157,6 +1349,8 @@ TEST_F(OplogFetcherTest, OplogFetcherResetsRestartCounterOnSuccessfulFetcherResp
nss,
_createConfig(true),
maxFetcherRestarts,
+ rbid,
+ true,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
stdx::ref(*shutdownState));
@@ -1165,8 +1359,13 @@ TEST_F(OplogFetcherTest, OplogFetcherResetsRestartCounterOnSuccessfulFetcherResp
ASSERT_OK(oplogFetcher.startup());
unittest::log() << "processing find request from first fetcher";
+
+
+ auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
_assertFindCommandTimestampEquals(
- ops[0], processNetworkResponse(makeCursorResponse(1, {ops[0], ops[1], ops[2]}), true));
+ ops[0],
+ processNetworkResponse(
+ {makeCursorResponse(1, {ops[0], ops[1], ops[2]}), metadataObj, Milliseconds(0)}, true));
unittest::log() << "sending error response to getMore request from first fetcher";
assertRemoteCommandNameEquals(
@@ -1174,7 +1373,9 @@ TEST_F(OplogFetcherTest, OplogFetcherResetsRestartCounterOnSuccessfulFetcherResp
unittest::log() << "processing find request from second fetcher";
_assertFindCommandTimestampEquals(
- ops[2], processNetworkResponse(makeCursorResponse(1, {ops[2], ops[3], ops[4]}), true));
+ ops[2],
+ processNetworkResponse(
+ {makeCursorResponse(1, {ops[2], ops[3], ops[4]}), metadataObj, Milliseconds(0)}, true));
unittest::log() << "sending error response to getMore request from second fetcher";
assertRemoteCommandNameEquals(
@@ -1228,6 +1429,8 @@ TEST_F(OplogFetcherTest, OplogFetcherAbortsWithOriginalResponseErrorOnFailureToS
nss,
_createConfig(true),
maxFetcherRestarts,
+ rbid,
+ true,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
stdx::ref(*shutdownState));
@@ -1237,8 +1440,12 @@ TEST_F(OplogFetcherTest, OplogFetcherAbortsWithOriginalResponseErrorOnFailureToS
ASSERT_TRUE(oplogFetcher.isActive());
unittest::log() << "processing find request from first fetcher";
+
+ auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2);
_assertFindCommandTimestampEquals(
- ops[0], processNetworkResponse(makeCursorResponse(1, {ops[0], ops[1], ops[2]}), true));
+ ops[0],
+ processNetworkResponse(
+ {makeCursorResponse(1, {ops[0], ops[1], ops[2]}), metadataObj, Milliseconds(0)}, true));
unittest::log() << "sending error response to getMore request from first fetcher";
shouldFailSchedule = true;
@@ -1274,6 +1481,8 @@ TEST_F(OplogFetcherTest, OplogFetcherResetsOnShutdownCallbackFunctionOnCompletio
nss,
_createConfig(true),
0,
+ rbid,
+ true,
dataReplicatorExternalState.get(),
enqueueDocumentsFn,
[&callbackInvoked, sharedCallbackData, &status](
diff --git a/src/mongo/db/repl/rollback_checker.cpp b/src/mongo/db/repl/rollback_checker.cpp
index 4c2966acb96..589ba0e6444 100644
--- a/src/mongo/db/repl/rollback_checker.cpp
+++ b/src/mongo/db/repl/rollback_checker.cpp
@@ -119,7 +119,7 @@ Status RollbackChecker::reset_sync() {
return resetStatus;
}
-int RollbackChecker::getBaseRBID_forTest() {
+int RollbackChecker::getBaseRBID() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
return _baseRBID;
}
diff --git a/src/mongo/db/repl/rollback_checker.h b/src/mongo/db/repl/rollback_checker.h
index d1c6ea5bc5c..17d8d8d0d08 100644
--- a/src/mongo/db/repl/rollback_checker.h
+++ b/src/mongo/db/repl/rollback_checker.h
@@ -92,10 +92,10 @@ public:
// Synchronously calls reset and returns the Status of the command.
Status reset_sync();
- // ================== Test Support API ===================
-
// Returns the current baseline rbid.
- int getBaseRBID_forTest();
+ int getBaseRBID();
+
+ // ================== Test Support API ===================
// Returns the last rbid seen.
int getLastRBID_forTest();
diff --git a/src/mongo/db/repl/rollback_checker_test.cpp b/src/mongo/db/repl/rollback_checker_test.cpp
index 717eaaf6c7e..9b29fde9874 100644
--- a/src/mongo/db/repl/rollback_checker_test.cpp
+++ b/src/mongo/db/repl/rollback_checker_test.cpp
@@ -108,7 +108,7 @@ TEST_F(RollbackCheckerTest, reset) {
getNet()->exitNetwork();
getReplExecutor().wait(cbh);
- ASSERT_EQUALS(getRollbackChecker()->getBaseRBID_forTest(), 3);
+ ASSERT_EQUALS(getRollbackChecker()->getBaseRBID(), 3);
}
TEST_F(RollbackCheckerTest, RollbackRBID) {
@@ -124,7 +124,7 @@ TEST_F(RollbackCheckerTest, RollbackRBID) {
getNet()->scheduleSuccessfulResponse(commandResponse);
getNet()->runReadyNetworkOperations();
getReplExecutor().wait(refreshCBH);
- ASSERT_EQUALS(getRollbackChecker()->getBaseRBID_forTest(), 3);
+ ASSERT_EQUALS(getRollbackChecker()->getBaseRBID(), 3);
{
LockGuard lk(_mutex);
ASSERT_TRUE(_hasCalledCallback);
@@ -143,7 +143,7 @@ TEST_F(RollbackCheckerTest, RollbackRBID) {
getReplExecutor().wait(rbCBH);
ASSERT_EQUALS(getRollbackChecker()->getLastRBID_forTest(), 4);
- ASSERT_EQUALS(getRollbackChecker()->getBaseRBID_forTest(), 3);
+ ASSERT_EQUALS(getRollbackChecker()->getBaseRBID(), 3);
LockGuard lk(_mutex);
ASSERT_TRUE(_hasCalledCallback);
ASSERT_TRUE(unittest::assertGet(_hasRolledBackResult));
@@ -162,7 +162,7 @@ TEST_F(RollbackCheckerTest, NoRollbackRBID) {
getNet()->scheduleSuccessfulResponse(commandResponse);
getNet()->runReadyNetworkOperations();
getReplExecutor().wait(refreshCBH);
- ASSERT_EQUALS(getRollbackChecker()->getBaseRBID_forTest(), 3);
+ ASSERT_EQUALS(getRollbackChecker()->getBaseRBID(), 3);
{
LockGuard lk(_mutex);
ASSERT_TRUE(_hasCalledCallback);
@@ -181,7 +181,7 @@ TEST_F(RollbackCheckerTest, NoRollbackRBID) {
getReplExecutor().wait(rbCBH);
ASSERT_EQUALS(getRollbackChecker()->getLastRBID_forTest(), 3);
- ASSERT_EQUALS(getRollbackChecker()->getBaseRBID_forTest(), 3);
+ ASSERT_EQUALS(getRollbackChecker()->getBaseRBID(), 3);
LockGuard lk(_mutex);
ASSERT_TRUE(_hasCalledCallback);
ASSERT_FALSE(unittest::assertGet(_hasRolledBackResult));
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index 2653c08812e..88fbf63f036 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -790,7 +790,7 @@ void syncFixUp(OperationContext* opCtx,
Status _syncRollback(OperationContext* opCtx,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
- boost::optional<int> requiredRBID,
+ int requiredRBID,
ReplicationCoordinator* replCoord,
StorageInterface* storageInterface) {
invariant(!opCtx->lockState()->isLocked());
@@ -798,11 +798,8 @@ Status _syncRollback(OperationContext* opCtx,
FixUpInfo how;
log() << "rollback 1";
how.rbid = rollbackSource.getRollbackId();
- if (requiredRBID) {
- uassert(40362,
- "Upstream node rolled back. Need to retry our rollback.",
- how.rbid == *requiredRBID);
- }
+ uassert(
+ 40362, "Upstream node rolled back. Need to retry our rollback.", how.rbid == requiredRBID);
log() << "rollback 2 FindCommonPoint";
try {
@@ -861,7 +858,7 @@ Status _syncRollback(OperationContext* opCtx,
Status syncRollback(OperationContext* opCtx,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
- boost::optional<int> requiredRBID,
+ int requiredRBID,
ReplicationCoordinator* replCoord,
StorageInterface* storageInterface) {
invariant(opCtx);
@@ -881,7 +878,7 @@ Status syncRollback(OperationContext* opCtx,
void rollback(OperationContext* opCtx,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
- boost::optional<int> requiredRBID,
+ int requiredRBID,
ReplicationCoordinator* replCoord,
StorageInterface* storageInterface,
stdx::function<void(int)> sleepSecsFn) {
diff --git a/src/mongo/db/repl/rs_rollback.h b/src/mongo/db/repl/rs_rollback.h
index 9234cb4c063..e305c076e44 100644
--- a/src/mongo/db/repl/rs_rollback.h
+++ b/src/mongo/db/repl/rs_rollback.h
@@ -61,7 +61,7 @@ class StorageInterface;
void rollback(OperationContext* opCtx,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
- boost::optional<int> requiredRBID,
+ int requiredRBID,
ReplicationCoordinator* replCoord,
StorageInterface* storageInterface,
stdx::function<void(int)> sleepSecsFn = [](int secs) { sleepsecs(secs); });
@@ -85,6 +85,7 @@ void rollback(OperationContext* opCtx,
* @param rollbackSource interface for sync source:
* provides oplog; and
* supports fetching documents and copying collections.
+ * @param requiredRBID Rollback ID we are required to have throughout rollback.
* @param replCoord Used to track the rollback ID and to change the follower state
* @param storageInterface Used to update minValid.
*
@@ -98,7 +99,7 @@ void rollback(OperationContext* opCtx,
Status syncRollback(OperationContext* opCtx,
const OplogInterface& localOplog,
const RollbackSource& rollbackSource,
- boost::optional<int> requiredRBID,
+ int requiredRBID,
ReplicationCoordinator* replCoord,
StorageInterface* storageInterface);
diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp
index 2e8c199c277..8d8919a1ae1 100644
--- a/src/mongo/db/repl/rs_rollback_test.cpp
+++ b/src/mongo/db/repl/rs_rollback_test.cpp
@@ -247,7 +247,7 @@ TEST_F(RSRollbackTest, RemoteGetRollbackIdDiffersFromRequiredRBID) {
OplogInterfaceMock({operation}),
RollbackSourceLocal(std::unique_ptr<OplogInterface>(
new OplogInterfaceMock(kEmptyMockOperations))),
- {1},
+ 1,
_coordinator,
&_storageInterface),
UserException,
@@ -796,7 +796,7 @@ TEST_F(RSRollbackTest, RollbackDropCollectionCommandFailsIfRBIDChangesWhileSynci
ASSERT_THROWS_CODE(syncRollback(_opCtx.get(),
OplogInterfaceMock({dropCollectionOperation, commonOperation}),
rollbackSource,
- {0},
+ 0,
_coordinator,
&_storageInterface),
DBException,
diff --git a/src/mongo/db/repl/sync_source_resolver.cpp b/src/mongo/db/repl/sync_source_resolver.cpp
index 33818e14c50..948a471b245 100644
--- a/src/mongo/db/repl/sync_source_resolver.cpp
+++ b/src/mongo/db/repl/sync_source_resolver.cpp
@@ -311,12 +311,7 @@ void SyncSourceResolver::_firstOplogEntryFetcherCallback(
return;
}
- if (!_requiredOpTime.isNull()) {
- _scheduleRBIDRequest(candidate, earliestOpTimeSeen);
- return;
- }
-
- _finishCallback(candidate);
+ _scheduleRBIDRequest(candidate, earliestOpTimeSeen);
}
void SyncSourceResolver::_scheduleRBIDRequest(HostAndPort candidate, OpTime earliestOpTimeSeen) {
@@ -362,12 +357,16 @@ void SyncSourceResolver::_rbidRequestCallback(
return;
}
- // Schedule fetcher to look for '_requiredOpTime' in the remote oplog.
- // Unittest requires that this kind of failure be handled specially.
- auto status = _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen));
- if (!status.isOK()) {
- _finishCallback(status);
+ if (!_requiredOpTime.isNull()) {
+ // Schedule fetcher to look for '_requiredOpTime' in the remote oplog.
+ // Unittest requires that this kind of failure be handled specially.
+ auto status = _scheduleFetcher(_makeRequiredOpTimeFetcher(candidate, earliestOpTimeSeen));
+ if (!status.isOK()) {
+ _finishCallback(status);
+ }
+ return;
}
+ _finishCallback(candidate);
}
Status SyncSourceResolver::_compareRequiredOpTimeWithQueryResponse(
diff --git a/src/mongo/db/repl/sync_source_resolver.h b/src/mongo/db/repl/sync_source_resolver.h
index 48f7e968a8d..201658caacf 100644
--- a/src/mongo/db/repl/sync_source_resolver.h
+++ b/src/mongo/db/repl/sync_source_resolver.h
@@ -70,10 +70,10 @@ struct SyncSourceResolverResponse {
// Contains the new MinValid boundry if syncSourceStatus is ErrorCodes::OplogStartMissing.
OpTime earliestOpTimeSeen;
- // Rollback ID of the selected sync source. Only filled in when there is a required optime.
+ // Rollback ID of the selected sync source.
// The rbid is fetched before the required optime so callers can be sure that as long as the
// rbid is the same, the required optime is still present.
- boost::optional<int> rbid;
+ int rbid;
bool isOK() {
return syncSourceStatus.isOK();
@@ -230,7 +230,7 @@ private:
const OnCompletionFn _onCompletion;
// The rbid we will return to our caller.
- boost::optional<int> _rbid;
+ int _rbid;
// Protects members of this sync source resolver defined below.
mutable stdx::mutex _mutex;
diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp
index fb27a2cc681..bab9d57bb89 100644
--- a/src/mongo/db/repl/sync_source_resolver_test.cpp
+++ b/src/mongo/db/repl/sync_source_resolver_test.cpp
@@ -303,6 +303,19 @@ void _scheduleFirstOplogEntryFetcherResponse(executor::NetworkInterfaceMock* net
net, selector, currentSyncSource, nextSyncSource, {BSON("ts" << ts << "t" << 0)});
}
+void _scheduleRBIDResponse(executor::NetworkInterfaceMock* net,
+ HostAndPort currentSyncSource,
+ const BSONObj& reply = BSON("ok" << 1 << "rbid" << 1)) {
+ executor::NetworkInterfaceMock::InNetworkGuard networkGuard(net);
+ ASSERT_TRUE(net->hasReadyRequests());
+ auto request = net->scheduleSuccessfulResponse(reply);
+ ASSERT_EQUALS(currentSyncSource, request.target);
+ ASSERT_EQUALS("admin", request.dbname);
+ ASSERT_EQUALS(SyncSourceResolver::kFetcherTimeout, request.timeout);
+ ASSERT_BSONOBJ_EQ(BSON("replSetGetRBID" << 1), request.cmdObj);
+ net->runReadyNetworkOperations();
+}
+
TEST_F(SyncSourceResolverTest,
SyncSourceResolverReturnsStatusOkAndTheFoundHostWhenAnEligibleSyncSourceExists) {
HostAndPort candidate1("node1", 12345);
@@ -312,6 +325,7 @@ TEST_F(SyncSourceResolverTest,
_scheduleFirstOplogEntryFetcherResponse(
getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 2));
+ _scheduleRBIDResponse(getNet(), candidate1);
_resolver->join();
ASSERT_FALSE(_resolver->isActive());
@@ -332,6 +346,7 @@ TEST_F(SyncSourceResolverTest,
_scheduleFirstOplogEntryFetcherResponse(
getNet(), _selector.get(), candidate1, HostAndPort(), Timestamp(10, 2));
+ _scheduleRBIDResponse(getNet(), candidate1);
_resolver->join();
ASSERT_FALSE(_resolver->isActive());
@@ -382,6 +397,7 @@ TEST_F(SyncSourceResolverTest,
_scheduleFirstOplogEntryFetcherResponse(
getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2));
+ _scheduleRBIDResponse(getNet(), candidate2);
_resolver->join();
ASSERT_FALSE(_resolver->isActive());
@@ -447,6 +463,7 @@ TEST_F(SyncSourceResolverTest,
_scheduleFirstOplogEntryFetcherResponse(
getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2));
+ _scheduleRBIDResponse(getNet(), candidate2);
_resolver->join();
ASSERT_FALSE(_resolver->isActive());
@@ -512,6 +529,7 @@ TEST_F(SyncSourceResolverTest,
_scheduleFirstOplogEntryFetcherResponse(
getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2));
+ _scheduleRBIDResponse(getNet(), candidate2);
_resolver->join();
ASSERT_FALSE(_resolver->isActive());
@@ -536,6 +554,7 @@ TEST_F(SyncSourceResolverTest,
_scheduleFirstOplogEntryFetcherResponse(
getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2));
+ _scheduleRBIDResponse(getNet(), candidate2);
_resolver->join();
ASSERT_FALSE(_resolver->isActive());
@@ -561,6 +580,7 @@ TEST_F(SyncSourceResolverTest,
_scheduleFirstOplogEntryFetcherResponse(
getNet(), _selector.get(), candidate2, HostAndPort(), Timestamp(10, 2));
+ _scheduleRBIDResponse(getNet(), candidate2);
_resolver->join();
ASSERT_FALSE(_resolver->isActive());
@@ -614,19 +634,6 @@ void _scheduleRequiredOpTimeFetcherResponse(executor::NetworkInterfaceMock* net,
{BSON("ts" << requiredOpTime.getTimestamp() << "t" << requiredOpTime.getTerm())});
}
-void _scheduleRBIDResponse(executor::NetworkInterfaceMock* net,
- HostAndPort currentSyncSource,
- const BSONObj& reply = BSON("ok" << 1 << "rbid" << 1)) {
- executor::NetworkInterfaceMock::InNetworkGuard networkGuard(net);
- ASSERT_TRUE(net->hasReadyRequests());
- auto request = net->scheduleSuccessfulResponse(reply);
- ASSERT_EQUALS(currentSyncSource, request.target);
- ASSERT_EQUALS("admin", request.dbname);
- ASSERT_EQUALS(SyncSourceResolver::kFetcherTimeout, request.timeout);
- ASSERT_BSONOBJ_EQ(BSON("replSetGetRBID" << 1), request.cmdObj);
- net->runReadyNetworkOperations();
-}
-
const OpTime requiredOpTime(Timestamp(200, 1U), 1LL);
TEST_F(
@@ -651,7 +658,7 @@ TEST_F(
ASSERT_FALSE(_resolver->isActive());
ASSERT_EQUALS(candidate1, unittest::assertGet(_response.syncSourceStatus));
ASSERT(_response.rbid);
- ASSERT_EQ(*_response.rbid, 7);
+ ASSERT_EQ(_response.rbid, 7);
}
TEST_F(SyncSourceResolverTest,