summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2017-02-28 11:11:30 -0500
committerJudah Schvimer <judah@mongodb.com>2017-03-24 09:58:46 -0400
commit45cc6d20a413d88fc49f6dac257f800fda926be6 (patch)
tree36e85beb9686237f31f6167158e6a719590bcda9
parent1ec124dffdf665b428715549740dd63ee8a7407c (diff)
downloadmongo-45cc6d20a413d88fc49f6dac257f800fda926be6.tar.gz
SERVER-27403 SERVER-28278 Ensure sync source is ahead and not rolled back after first fetcher batch
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_legacy.yml2
-rw-r--r--jstests/replsets/chaining_removal.js23
-rw-r--r--jstests/replsets/do_not_sync_from_stale_sync_source.js44
-rw-r--r--jstests/replsets/maxSyncSourceLagSecs.js24
-rw-r--r--jstests/replsets/no_chaining.js10
-rw-r--r--jstests/replsets/rollback_after_sync_source_selection.js124
-rw-r--r--jstests/replsets/rollback_too_new.js21
-rw-r--r--jstests/replsets/rslib.js25
-rw-r--r--jstests/replsets/server8070.js7
-rw-r--r--jstests/replsets/slavedelay3.js7
-rw-r--r--src/mongo/db/repl/bgsync.cpp91
-rw-r--r--src/mongo/db/repl/bgsync.h10
12 files changed, 327 insertions, 61 deletions
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml b/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml
index 70ef1cada5e..11b160e4925 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml
@@ -21,6 +21,8 @@ selector:
- jstests/replsets/write_concern_after_stepdown_and_stepup.js
# This test expects the server to log a PV1-only vote-not-granted reason
- jstests/replsets/no_flapping_during_network_partition.js
+ # This test requires terms.
+ - jstests/replsets/rollback_too_new.js
executor:
js_test:
diff --git a/jstests/replsets/chaining_removal.js b/jstests/replsets/chaining_removal.js
index dbc80148745..027de0215d6 100644
--- a/jstests/replsets/chaining_removal.js
+++ b/jstests/replsets/chaining_removal.js
@@ -2,6 +2,8 @@
(function() {
"use strict";
+ load("jstests/replsets/rslib.js");
+
var numNodes = 5;
var host = getHostName();
var name = "chaining_removal";
@@ -25,27 +27,10 @@
replTest.awaitReplication();
// Force node 1 to sync directly from node 0.
- assert.commandWorked(nodes[1].getDB("admin").runCommand({"replSetSyncFrom": nodes[0].host}));
- var res;
- assert.soon(
- function() {
- res = nodes[1].getDB("admin").runCommand({"replSetGetStatus": 1});
- return res.syncingTo === nodes[0].host;
- },
- function() {
- return "node 1 failed to start syncing from node 0: " + tojson(res);
- });
+ syncFrom(nodes[1], nodes[0], replTest);
// Force node 4 to sync through node 1.
- assert.commandWorked(nodes[4].getDB("admin").runCommand({"replSetSyncFrom": nodes[1].host}));
- assert.soon(
- function() {
- res = nodes[4].getDB("admin").runCommand({"replSetGetStatus": 1});
- return res.syncingTo === nodes[1].host;
- },
- function() {
- return "node 4 failed to start chaining through node 1: " + tojson(res);
- });
+ syncFrom(nodes[4], nodes[1], replTest);
// write that should reach all nodes
var timeout = 60 * 1000;
diff --git a/jstests/replsets/do_not_sync_from_stale_sync_source.js b/jstests/replsets/do_not_sync_from_stale_sync_source.js
new file mode 100644
index 00000000000..bf6957a0e06
--- /dev/null
+++ b/jstests/replsets/do_not_sync_from_stale_sync_source.js
@@ -0,0 +1,44 @@
+/*
+ * This tests that nodes do not sync from other nodes that are behind them. The test sets up a
+ * 3-node replica set and then stops replication at one node so it starts to lag. The test then
+ * uses 'replSetSyncFrom' to force the up to date node to sync from the lagging node. After it
+ * receives its first batch, it errors saying that it cannot sync from a node behind it.
+ */
+
+(function() {
+ 'use strict';
+
+ load("jstests/libs/check_log.js");
+ load("jstests/replsets/rslib.js");
+
+ var name = "do_not_sync_from_stale_sync_source";
+ var collName = "test.coll";
+
+ var rst = new ReplSetTest({
+ name: name,
+ nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}],
+ useBridge: true,
+ settings: {chainingAllowed: false}
+ });
+ var nodes = rst.startSet();
+ rst.initiate();
+
+ jsTestLog("Make sure node 0 is primary.");
+ assert.eq(nodes[0], rst.getPrimary());
+ // Wait for all data bearing nodes to get up to date.
+ assert.writeOK(nodes[0].getCollection(collName).insert(
+ {a: 0}, {writeConcern: {w: 3, wtimeout: rst.kDefaultTimeoutMS}}));
+
+ jsTestLog("Stop node 2 from syncing so it starts lagging.");
+ assert.commandWorked(nodes[2].getDB('admin').runCommand(
+ {configureFailPoint: 'stopReplProducer', mode: 'alwaysOn'}));
+ checkLog.contains(nodes[2], 'stopReplProducer fail point enabled');
+
+ jsTestLog("Do a write that replicates to [0,1].");
+ assert.writeOK(nodes[0].getCollection(collName).insert(
+ {a: 1}, {writeConcern: {w: 2, wtimeout: rst.kDefaultTimeoutMS}}));
+
+ jsTestLog("Tell node 1 to sync from node 0 which is now behind.");
+ assert.commandWorked(nodes[1].adminCommand({"replSetSyncFrom": nodes[0].host}));
+ checkLog.contains(nodes[1], "is not greater than our last fetched OpTime");
+}());
diff --git a/jstests/replsets/maxSyncSourceLagSecs.js b/jstests/replsets/maxSyncSourceLagSecs.js
index 6bcec4a3f71..a9aaa104c59 100644
--- a/jstests/replsets/maxSyncSourceLagSecs.js
+++ b/jstests/replsets/maxSyncSourceLagSecs.js
@@ -4,6 +4,8 @@
// @tags: [requires_fsync]
(function() {
"use strict";
+ load("jstests/replsets/rslib.js");
+
var name = "maxSyncSourceLagSecs";
var replTest = new ReplSetTest({
name: name,
@@ -25,8 +27,8 @@
var master = replTest.getPrimary();
var slaves = replTest.liveNodes.slaves;
- assert.commandWorked(slaves[0].getDB("admin").runCommand({replSetSyncFrom: master.name}));
- assert.commandWorked(slaves[1].getDB("admin").runCommand({replSetSyncFrom: master.name}));
+ syncFrom(slaves[0], master, replTest);
+ syncFrom(slaves[1], master, replTest);
master.getDB("foo").bar.save({a: 1});
replTest.awaitReplication();
@@ -35,23 +37,7 @@
sleep(4000);
jsTestLog("Setting sync target of slave 2 to slave 1");
- assert.soon(function() {
- // We do a write each time and have this in a try...catch block due to the fallout of
- // SERVER-24114. If that timeout occurs, then we search for another sync source, however we
- // will not find one unless more writes have come in. Additionally, it is possible that
- // slaves[1] will switch to sync from slaves[0] after slaves[1] replicates a write from
- // the primary but before slaves[0] replicates it. slaves[1] will then have to roll back
- // which would cause a network error.
- try {
- slaves[1].getDB("admin").runCommand({replSetSyncFrom: slaves[0].name});
- var res = slaves[1].getDB("admin").runCommand({"replSetGetStatus": 1});
- master.getDB("foo").bar.insert({a: 1});
- return res.syncingTo === slaves[0].name;
- } catch (e) {
- print("Exception in assert.soon, retrying: " + e);
- return false;
- }
- }, "sync target not changed to other slave", 100 * 1000, 2 * 1000);
+ syncFrom(slaves[1], slaves[0], replTest);
printjson(replTest.status());
jsTestLog("Lock slave 1 and add some docs. Force sync target for slave 2 to change to primary");
diff --git a/jstests/replsets/no_chaining.js b/jstests/replsets/no_chaining.js
index ad086c72f9a..07a67c3ea2b 100644
--- a/jstests/replsets/no_chaining.js
+++ b/jstests/replsets/no_chaining.js
@@ -1,3 +1,4 @@
+load("jstests/replsets/rslib.js");
function myprint(x) {
print("chaining output: " + x);
@@ -38,14 +39,7 @@ var checkNoChaining = function() {
};
var forceSync = function() {
- var config;
- try {
- config = nodes[2].getDB("local").system.replset.findOne();
- } catch (e) {
- config = nodes[2].getDB("local").system.replset.findOne();
- }
- var targetHost = config.members[1].host;
- printjson(nodes[2].getDB("admin").runCommand({replSetSyncFrom: targetHost}));
+ syncFrom(nodes[2], nodes[1], replTest);
assert.soon(function() {
return nodes[2].getDB("test").foo.findOne() != null;
}, 'Check for data after force sync');
diff --git a/jstests/replsets/rollback_after_sync_source_selection.js b/jstests/replsets/rollback_after_sync_source_selection.js
new file mode 100644
index 00000000000..aecf09dd39b
--- /dev/null
+++ b/jstests/replsets/rollback_after_sync_source_selection.js
@@ -0,0 +1,124 @@
+/*
+ * This tests that nodes get a new sync source if their sync source rolls back between when it is
+ * chosen as a sync source and when it is first used. The test sets up a five node replicaset
+ * and creates a simple rollback scenario. Before node 0 goes into rollback, however, we pause
+ * the oplog fetcher on node 2, which is syncing from node 0. After the rollback occurs we let
+ * the oplog fetcher continue and after it gets back its first batch from the sync source it
+ * realizes that its sync source has rolled back and it errors before getting a new sync source.
+ */
+
+(function() {
+ 'use strict';
+
+ load("jstests/libs/check_log.js");
+ load("jstests/replsets/rslib.js");
+
+ var name = "rollback_after_sync_source_selection";
+ var collName = "test.coll";
+
+ var rst = new ReplSetTest({
+ name: name,
+ nodes: [
+ {},
+ {},
+ {rsConfig: {priority: 0}},
+ {rsConfig: {arbiterOnly: true}},
+ {rsConfig: {arbiterOnly: true}}
+ ],
+ useBridge: true
+ });
+ var nodes = rst.startSet();
+ rst.initiate();
+
+ function stepUp(rst, node) {
+ var primary = rst.getPrimary();
+ if (primary !== node) {
+ try {
+ assert.commandWorked(primary.adminCommand({replSetStepDown: 60, force: true}));
+ } catch (ex) {
+ print("Caught exception while stepping down from node '" + tojson(primary.host) +
+ "': " + tojson(ex));
+ }
+ waitForState(node, ReplSetTest.State.PRIMARY);
+ assert.commandWorked(primary.adminCommand({replSetFreeze: 0}));
+ }
+ }
+
+ jsTestLog("Make sure node 0 is primary.");
+ stepUp(rst, nodes[0]);
+ assert.eq(nodes[0], rst.getPrimary());
+ // Wait for all data bearing nodes to get up to date.
+ assert.writeOK(nodes[0].getCollection(collName).insert(
+ {a: 0}, {writeConcern: {w: 3, wtimeout: rst.kDefaultTimeoutMS}}));
+
+ jsTestLog("Create two partitions: [1] and [0,2,3,4].");
+ nodes[1].disconnect(nodes[0]);
+ nodes[1].disconnect(nodes[2]);
+ nodes[1].disconnect(nodes[3]);
+ nodes[1].disconnect(nodes[4]);
+
+ jsTestLog("Do a write that replicates to [0,2,3,4].");
+ assert.writeOK(nodes[0].getCollection(collName).insert(
+ {a: 1}, {writeConcern: {w: 2, wtimeout: rst.kDefaultTimeoutMS}}));
+
+ jsTestLog("Pausing node 2's oplog fetcher before first fetch.");
+ assert.commandWorked(nodes[2].getDB('admin').runCommand(
+ {configureFailPoint: 'fetcherHangBeforeStart', mode: 'alwaysOn'}));
+ syncFrom(nodes[2], nodes[0], rst);
+ checkLog.contains(nodes[2], 'fetcherHangBeforeStart fail point enabled');
+
+ jsTestLog("Do a write on partition [0,2,3,4]; it won't replicate due to the failpoint.");
+ assert.writeOK(nodes[0].getCollection(collName).insert({a: 2}));
+
+ jsTestLog("Repartition to: [0,2] and [1,3,4].");
+ nodes[1].reconnect(nodes[3]);
+ nodes[1].reconnect(nodes[4]);
+ nodes[3].disconnect(nodes[0]);
+ nodes[3].disconnect(nodes[2]);
+ nodes[4].disconnect(nodes[0]);
+ nodes[4].disconnect(nodes[2]);
+
+ jsTestLog("Ensure that 0 steps down and that 1 becomes primary.");
+ waitForState(nodes[0], ReplSetTest.State.SECONDARY);
+ waitForState(nodes[1], ReplSetTest.State.PRIMARY);
+ assert.eq(nodes[1], rst.getPrimary());
+
+ jsTestLog("Do a write to node 1 on the [1,3,4] side of the partition.");
+ assert.writeOK(nodes[1].getCollection(collName).insert({a: 3}));
+
+ jsTestLog("Remove the partition but maintain that node 2 only talks to node 0.");
+ var node0RBID = nodes[0].adminCommand('replSetGetRBID').rbid;
+ nodes[0].reconnect(nodes[1]);
+ nodes[0].reconnect(nodes[3]);
+ nodes[0].reconnect(nodes[4]);
+
+ jsTestLog("Wait for node 0 to go into ROLLBACK");
+ // Wait for a rollback to happen.
+ assert.soonNoExcept(function() {
+ var node0RBIDNew = nodes[0].adminCommand('replSetGetRBID').rbid;
+ return node0RBIDNew !== node0RBID;
+ });
+ waitForState(nodes[0], ReplSetTest.State.SECONDARY);
+
+ // At this point nodes 0 and 1 should have the same data.
+ assert.neq(null,
+ nodes[0].getCollection(collName).findOne({a: 0}),
+ "Node " + nodes[0].host +
+ " did not contain initial op that should be present on all nodes");
+ assert.eq(null,
+ nodes[0].getCollection(collName).findOne({a: 1}),
+ "Node " + nodes[0].host + " contained op that should have been rolled back");
+ assert.eq(null,
+ nodes[0].getCollection(collName).findOne({a: 2}),
+ "Node " + nodes[0].host + " contained op that should have been rolled back");
+ assert.neq(null,
+ nodes[0].getCollection(collName).findOne({a: 3}),
+ "Node " + nodes[0].host + " did not contain op from after rollback");
+
+ jsTestLog("Let oplog fetcher continue and error that the sync source rolled back.");
+ // Turn off failpoint on node 2 to allow it to continue fetching.
+ assert.commandWorked(nodes[2].getDB('admin').runCommand(
+ {configureFailPoint: 'fetcherHangBeforeStart', mode: 'off'}));
+ checkLog.contains(nodes[2],
+ "Upstream node rolled back after verifying that it had our MinValid point.");
+}()); \ No newline at end of file
diff --git a/jstests/replsets/rollback_too_new.js b/jstests/replsets/rollback_too_new.js
index 5619df42e18..c7fe4fa2320 100644
--- a/jstests/replsets/rollback_too_new.js
+++ b/jstests/replsets/rollback_too_new.js
@@ -9,6 +9,8 @@
(function() {
"use strict";
+ load("jstests/replsets/rslib.js");
+
// set up a set and grab things for later
var name = "rollback_too_new";
var replTest = new ReplSetTest({name: name, nodes: 3});
@@ -43,9 +45,22 @@
replTest.stop(CID);
- // do one write to master
- // in order to trigger a rollback on C
- assert.writeOK(master.getDB(name).foo.insert({x: 2}, options));
+ // We bump the term to make sure node 0's oplog is ahead of node 2's.
+ var term = getLatestOp(conns[0]).t;
+ try {
+ assert.commandWorked(conns[0].adminCommand({replSetStepDown: 1, force: true}));
+ } catch (e) {
+ if (e.message.indexOf("error doing query: failed") < 0) {
+ throw e;
+ }
+ }
+
+ // After stepping down due to the higher term, it will eventually get reelected.
+ replTest.waitForState(conns[0], ReplSetTest.State.PRIMARY);
+ // Wait for the node to increase its term.
+ assert.soon(function() {
+ return getLatestOp(conns[0]).t > term;
+ });
// Node C should connect to new master as a sync source because chaining is disallowed.
// C is ahead of master but it will still connect to it.
diff --git a/jstests/replsets/rslib.js b/jstests/replsets/rslib.js
index 267397b87a3..3ca7e66d85b 100644
--- a/jstests/replsets/rslib.js
+++ b/jstests/replsets/rslib.js
@@ -1,3 +1,4 @@
+var syncFrom;
var wait, occasionally, reconnect, getLatestOp, waitForAllMembers, reconfig, awaitOpTime;
var waitUntilAllNodesCaughtUp;
var waitForState;
@@ -7,9 +8,33 @@ var startSetIfSupportsReadMajority;
(function() {
"use strict";
+ load("jstests/libs/write_concern_util.js");
+
var count = 0;
var w = 0;
+ /**
+ * A wrapper around `replSetSyncFrom` to ensure that the desired sync source is ahead of the
+ * syncing node so that the syncing node can choose to sync from the desired sync source.
+ * It first stops replication on the syncing node so that it can do a write on the desired
+ * sync source and make sure it's ahead. When replication is restarted, the desired sync
+ * source will be a valid sync source for the syncing node.
+ */
+ syncFrom = function(syncingNode, desiredSyncSource, rst) {
+ jsTestLog("Forcing " + syncingNode.name + " to sync from " + desiredSyncSource.name);
+ stopServerReplication(syncingNode);
+ var dummyName = "dummyForSyncFrom";
+ assert.writeOK(rst.getPrimary().getDB(dummyName).getCollection(dummyName).insert({a: 1}));
+ // Wait for 'desiredSyncSource' to get the dummy write we just did so we know it's
+ // definitely ahead of 'syncingNode' before we call replSetSyncFrom.
+ assert.soonNoExcept(function() {
+ return desiredSyncSource.getDB(dummyName).getCollection(dummyName).findOne({a: 1});
+ });
+ assert.commandWorked(syncingNode.adminCommand({replSetSyncFrom: desiredSyncSource.name}));
+ restartServerReplication(syncingNode);
+ rst.awaitSyncSource(syncingNode, desiredSyncSource);
+ };
+
wait = function(f, msg) {
w++;
var n = 0;
diff --git a/jstests/replsets/server8070.js b/jstests/replsets/server8070.js
index f9d9a3feb16..5d8435a656e 100644
--- a/jstests/replsets/server8070.js
+++ b/jstests/replsets/server8070.js
@@ -1,3 +1,5 @@
+load("jstests/replsets/rslib.js");
+
// Test for SERVER-8070: Flush buffer before changing sync targets to prevent unnecessary rollbacks
// This test writes 50 ops to one secondary's data (member2) and 25 ops to the other secondary's
// data (member3), then puts 50 more ops in member3's buffer and makes sure that member3 doesn't try
@@ -40,8 +42,9 @@ master.getDB("foo").bar.insert({x: 1});
replSet.awaitReplication();
jsTest.log("Make sure 2 & 3 are syncing from the primary");
-member2.adminCommand({replSetSyncFrom: getHostName() + ":" + replSet.ports[0]});
-member3.adminCommand({replSetSyncFrom: getHostName() + ":" + replSet.ports[0]});
+assert.eq(master, replSet.nodes[0]);
+syncFrom(replSet.nodes[1], master, replSet);
+syncFrom(replSet.nodes[2], master, replSet);
jsTest.log("Stop 2's replication");
member2.runCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'alwaysOn'});
diff --git a/jstests/replsets/slavedelay3.js b/jstests/replsets/slavedelay3.js
index 2ce6e9b2a80..dc3bb0786d9 100644
--- a/jstests/replsets/slavedelay3.js
+++ b/jstests/replsets/slavedelay3.js
@@ -29,12 +29,7 @@ nodes[0].disconnect(nodes[2]);
master.foo.insert({x: 1});
-assert.commandWorked(nodes[1].getDB("admin").runCommand({"replSetSyncFrom": nodes[0].host}));
-var res;
-assert.soon(function() {
- res = nodes[1].getDB("admin").runCommand({"replSetGetStatus": 1});
- return res.syncingTo === nodes[0].host;
-}, "node 4 failed to start chaining: " + tojson(res));
+syncFrom(nodes[1], nodes[0], replTest);
// make sure the record still appears in the remote slave
assert.soon(function() {
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 671d56ed68d..98d86f0c7c0 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -52,11 +52,12 @@
#include "mongo/db/repl/rollback_source_impl.h"
#include "mongo/db/repl/rs_rollback.h"
#include "mongo/db/repl/rs_sync.h"
+#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/stats/timer_stats.h"
#include "mongo/executor/network_interface_factory.h"
-#include "mongo/db/repl/storage_interface.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata/repl_set_metadata.h"
+#include "mongo/rpc/metadata/server_selection_metadata.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/exit.h"
@@ -136,6 +137,9 @@ static ServerStatusMetricField<Counter64> displayBytesRead("repl.network.bytes",
// Failpoint which causes rollback to hang before starting.
MONGO_FP_DECLARE(rollbackHangBeforeStart);
+// Failpoint which causes the oplog fetcher to hang before the first fetch.
+MONGO_FP_DECLARE(fetcherHangBeforeStart);
+
// The count of items in the buffer
static Counter64 bufferCountGauge;
static ServerStatusMetricField<Counter64> displayBufferCount("repl.buffer.count",
@@ -159,6 +163,8 @@ size_t getSize(const BSONObj& o) {
}
} // namespace
+const NamespaceString BackgroundSync::kLocalOplogNss("local.oplog.rs");
+
BackgroundSync::BackgroundSync()
: _buffer(bufferMaxSizeGauge, &getSize),
_threadPoolTaskExecutor(makeThreadPool(),
@@ -298,12 +304,13 @@ void BackgroundSync::_produce(OperationContext* txn) {
}
}
-
+ HostAndPort oldSyncSource;
// find a target to sync from the last optime fetched
OpTime lastOpTimeFetched;
{
stdx::unique_lock<stdx::mutex> lock(_mutex);
lastOpTimeFetched = _lastOpTimeFetched;
+ oldSyncSource = _syncSourceHost;
_syncSourceHost = HostAndPort();
}
OplogReader syncSourceReader;
@@ -325,6 +332,17 @@ void BackgroundSync::_produce(OperationContext* txn) {
return;
}
+ // If our sync source has not changed, it is likely caused by our heartbeat data map being
+ // out of date. In that case we sleep for 1 second to reduce the amount we spin waiting
+ // for our map to update.
+ if (syncSourceReader.getHost() == oldSyncSource) {
+ log() << "Chose same sync source candidate as last time, " << oldSyncSource
+ << ". Sleeping for 1 second to avoid immediately choosing a new sync source for "
+ "the same reason as last time.";
+
+ sleepsecs(1);
+ }
+
long long lastHashFetched;
{
stdx::lock_guard<stdx::mutex> lock(_mutex);
@@ -386,6 +404,20 @@ void BackgroundSync::_produce(OperationContext* txn) {
metadataBob.append(rpc::kReplSetMetadataFieldName, 1);
}
+ if (MONGO_FAIL_POINT(fetcherHangBeforeStart)) {
+ // This log output is used in js tests so please leave it.
+ log() << "BackgroundSync - fetcherHangBeforeStart fail point "
+ "enabled. Blocking until fail point is disabled.";
+ while (MONGO_FAIL_POINT(fetcherHangBeforeStart) && !inShutdown()) {
+ mongo::sleepsecs(1);
+ }
+
+ // If the sync source candidate rolls back while in this fail point, it will close all
+ // connections and the next request will fail.
+ // We manually drop all connections here so that the following Fetcher request succeeds.
+ _threadPoolTaskExecutor.dropConnections(source);
+ }
+
auto dbName = nsToDatabase(rsOplogName);
auto cmdObj = cmdBob.obj();
auto metadataObj = metadataBob.obj();
@@ -467,6 +499,39 @@ void BackgroundSync::_produce(OperationContext* txn) {
}
}
+void BackgroundSync::_lastAppliedFetcherCallback(const StatusWith<Fetcher::QueryResponse>& result,
+ OpTime lastOpTimeFetched,
+ Status* returnStatus) {
+ if (!result.isOK()) {
+ *returnStatus = result.getStatus();
+ return;
+ }
+
+ const auto& queryResponse = result.getValue();
+ if (queryResponse.documents.empty()) {
+ *returnStatus = Status(ErrorCodes::InvalidSyncSource, "Upstream node had an empty oplog.");
+ return;
+ }
+
+ const auto& remoteLastAppliedDocument = queryResponse.documents.front();
+ const auto remoteLastAppliedOpTime = OpTime::parseFromOplogEntry(remoteLastAppliedDocument);
+ if (!remoteLastAppliedOpTime.isOK()) {
+ *returnStatus = Status(ErrorCodes::InvalidBSON,
+ str::stream() << "Received invalid oplog entry from upstream node: "
+ << remoteLastAppliedDocument.toString() << ". Error: "
+ << remoteLastAppliedOpTime.getStatus().toString());
+ return;
+ }
+ if (remoteLastAppliedOpTime.getValue() <= lastOpTimeFetched) {
+ *returnStatus = Status(ErrorCodes::InvalidSyncSource,
+ str::stream() << "Upstream node's last applied OpTime "
+ << remoteLastAppliedOpTime.getValue().toString()
+ << " is not greater than our last fetched OpTime "
+ << lastOpTimeFetched.toString());
+ return;
+ }
+}
+
void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& result,
BSONObjBuilder* bob,
const HostAndPort& source,
@@ -528,13 +593,13 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
const auto rbidElem = rbidReplyObj["rbid"];
if (rbidElem.type() != NumberInt) {
*returnStatus =
- Status(ErrorCodes::BadValue,
+ Status(ErrorCodes::InvalidSyncSource,
str::stream() << "Upstream node returned an "
<< "rbid with invalid type " << rbidElem.type());
return;
}
if (rbidElem.Int() != rbid) {
- *returnStatus = Status(ErrorCodes::BadValue,
+ *returnStatus = Status(ErrorCodes::InvalidSyncSource,
"Upstream node rolled back after verifying "
"that it had our MinValid point. Retrying.");
}
@@ -548,6 +613,24 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
if (!returnStatus->isOK())
return;
+ // Check that the upstream last applied OpTime is newer than our last fetched OpTime.
+ Fetcher lastAppliedFetcher(&_threadPoolTaskExecutor,
+ source,
+ kLocalOplogNss.db().toString(),
+ BSON("find" << kLocalOplogNss.coll() << "limit" << 1 << "sort"
+ << BSON("$natural" << -1)),
+ stdx::bind(&BackgroundSync::_lastAppliedFetcherCallback,
+ this,
+ stdx::placeholders::_1,
+ lastOpTimeFetched,
+ returnStatus),
+ rpc::ServerSelectionMetadata(true, boost::none).toBSON(),
+ Seconds(30));
+ lastAppliedFetcher.schedule();
+ lastAppliedFetcher.wait();
+ if (!returnStatus->isOK())
+ return;
+
auto getNextOperation = [&firstDocToApply, lastDocToApply]() -> StatusWith<BSONObj> {
if (firstDocToApply == lastDocToApply) {
return Status(ErrorCodes::OplogStartMissing, "remote oplog start missing");
diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h
index 0c1a1ea99f5..20ea6629ba0 100644
--- a/src/mongo/db/repl/bgsync.h
+++ b/src/mongo/db/repl/bgsync.h
@@ -77,6 +77,8 @@ public:
*/
class BackgroundSync : public BackgroundSyncInterface {
public:
+ static const NamespaceString kLocalOplogNss;
+
// Allow index prefetching to be turned on/off
enum IndexPrefetchConfig {
UNINITIALIZED = 0,
@@ -191,6 +193,14 @@ private:
int rbid);
/**
+ * A callback to a Fetcher that checks that the remote last applied OpTime is newer than the
+ * local last fetched OpTime.
+ */
+ void _lastAppliedFetcherCallback(const StatusWith<Fetcher::QueryResponse>& result,
+ OpTime lastOpTimeFetched,
+ Status* returnStatus);
+
+ /**
* Executes a rollback.
* 'getConnection' returns a connection to the sync source.
*/