diff options
author | Judah Schvimer <judah@mongodb.com> | 2017-02-28 11:11:30 -0500 |
---|---|---|
committer | Judah Schvimer <judah@mongodb.com> | 2017-03-24 09:58:46 -0400 |
commit | 45cc6d20a413d88fc49f6dac257f800fda926be6 (patch) | |
tree | 36e85beb9686237f31f6167158e6a719590bcda9 | |
parent | 1ec124dffdf665b428715549740dd63ee8a7407c (diff) | |
download | mongo-45cc6d20a413d88fc49f6dac257f800fda926be6.tar.gz |
SERVER-27403 SERVER-28278 Ensure sync source is ahead and not rolled back after first fetcher batch
-rw-r--r-- | buildscripts/resmokeconfig/suites/replica_sets_legacy.yml | 2 | ||||
-rw-r--r-- | jstests/replsets/chaining_removal.js | 23 | ||||
-rw-r--r-- | jstests/replsets/do_not_sync_from_stale_sync_source.js | 44 | ||||
-rw-r--r-- | jstests/replsets/maxSyncSourceLagSecs.js | 24 | ||||
-rw-r--r-- | jstests/replsets/no_chaining.js | 10 | ||||
-rw-r--r-- | jstests/replsets/rollback_after_sync_source_selection.js | 124 | ||||
-rw-r--r-- | jstests/replsets/rollback_too_new.js | 21 | ||||
-rw-r--r-- | jstests/replsets/rslib.js | 25 | ||||
-rw-r--r-- | jstests/replsets/server8070.js | 7 | ||||
-rw-r--r-- | jstests/replsets/slavedelay3.js | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 91 | ||||
-rw-r--r-- | src/mongo/db/repl/bgsync.h | 10 |
12 files changed, 327 insertions, 61 deletions
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml b/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml index 70ef1cada5e..11b160e4925 100644 --- a/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml +++ b/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml @@ -21,6 +21,8 @@ selector: - jstests/replsets/write_concern_after_stepdown_and_stepup.js # This test expects the server to log a PV1-only vote-not-granted reason - jstests/replsets/no_flapping_during_network_partition.js + # This test requires terms. + - jstests/replsets/rollback_too_new.js executor: js_test: diff --git a/jstests/replsets/chaining_removal.js b/jstests/replsets/chaining_removal.js index dbc80148745..027de0215d6 100644 --- a/jstests/replsets/chaining_removal.js +++ b/jstests/replsets/chaining_removal.js @@ -2,6 +2,8 @@ (function() { "use strict"; + load("jstests/replsets/rslib.js"); + var numNodes = 5; var host = getHostName(); var name = "chaining_removal"; @@ -25,27 +27,10 @@ replTest.awaitReplication(); // Force node 1 to sync directly from node 0. - assert.commandWorked(nodes[1].getDB("admin").runCommand({"replSetSyncFrom": nodes[0].host})); - var res; - assert.soon( - function() { - res = nodes[1].getDB("admin").runCommand({"replSetGetStatus": 1}); - return res.syncingTo === nodes[0].host; - }, - function() { - return "node 1 failed to start syncing from node 0: " + tojson(res); - }); + syncFrom(nodes[1], nodes[0], replTest); // Force node 4 to sync through node 1. - assert.commandWorked(nodes[4].getDB("admin").runCommand({"replSetSyncFrom": nodes[1].host})); - assert.soon( - function() { - res = nodes[4].getDB("admin").runCommand({"replSetGetStatus": 1}); - return res.syncingTo === nodes[1].host; - }, - function() { - return "node 4 failed to start chaining through node 1: " + tojson(res); - }); + syncFrom(nodes[4], nodes[1], replTest); // write that should reach all nodes var timeout = 60 * 1000; diff --git a/jstests/replsets/do_not_sync_from_stale_sync_source.js b/jstests/replsets/do_not_sync_from_stale_sync_source.js new file mode 100644 index 00000000000..bf6957a0e06 --- /dev/null +++ b/jstests/replsets/do_not_sync_from_stale_sync_source.js @@ -0,0 +1,44 @@ +/* + * This tests that nodes do not sync from other nodes that are behind them. The test sets up a + * 3-node replica set and then stops replication at one node so it starts to lag. The test then + * uses 'replSetSyncFrom' to force the up to date node to sync from the lagging node. After it + * receives its first batch, it errors saying that it cannot sync from a node behind it. + */ + +(function() { + 'use strict'; + + load("jstests/libs/check_log.js"); + load("jstests/replsets/rslib.js"); + + var name = "do_not_sync_from_stale_sync_source"; + var collName = "test.coll"; + + var rst = new ReplSetTest({ + name: name, + nodes: [{}, {rsConfig: {priority: 0}}, {rsConfig: {priority: 0}}], + useBridge: true, + settings: {chainingAllowed: false} + }); + var nodes = rst.startSet(); + rst.initiate(); + + jsTestLog("Make sure node 0 is primary."); + assert.eq(nodes[0], rst.getPrimary()); + // Wait for all data bearing nodes to get up to date. + assert.writeOK(nodes[0].getCollection(collName).insert( + {a: 0}, {writeConcern: {w: 3, wtimeout: rst.kDefaultTimeoutMS}})); + + jsTestLog("Stop node 2 from syncing so it starts lagging."); + assert.commandWorked(nodes[2].getDB('admin').runCommand( + {configureFailPoint: 'stopReplProducer', mode: 'alwaysOn'})); + checkLog.contains(nodes[2], 'stopReplProducer fail point enabled'); + + jsTestLog("Do a write that replicates to [0,1]."); + assert.writeOK(nodes[0].getCollection(collName).insert( + {a: 1}, {writeConcern: {w: 2, wtimeout: rst.kDefaultTimeoutMS}})); + + jsTestLog("Tell node 1 to sync from node 0 which is now behind."); + assert.commandWorked(nodes[1].adminCommand({"replSetSyncFrom": nodes[0].host})); + checkLog.contains(nodes[1], "is not greater than our last fetched OpTime"); +}()); diff --git a/jstests/replsets/maxSyncSourceLagSecs.js b/jstests/replsets/maxSyncSourceLagSecs.js index 6bcec4a3f71..a9aaa104c59 100644 --- a/jstests/replsets/maxSyncSourceLagSecs.js +++ b/jstests/replsets/maxSyncSourceLagSecs.js @@ -4,6 +4,8 @@ // @tags: [requires_fsync] (function() { "use strict"; + load("jstests/replsets/rslib.js"); + var name = "maxSyncSourceLagSecs"; var replTest = new ReplSetTest({ name: name, @@ -25,8 +27,8 @@ var master = replTest.getPrimary(); var slaves = replTest.liveNodes.slaves; - assert.commandWorked(slaves[0].getDB("admin").runCommand({replSetSyncFrom: master.name})); - assert.commandWorked(slaves[1].getDB("admin").runCommand({replSetSyncFrom: master.name})); + syncFrom(slaves[0], master, replTest); + syncFrom(slaves[1], master, replTest); master.getDB("foo").bar.save({a: 1}); replTest.awaitReplication(); @@ -35,23 +37,7 @@ sleep(4000); jsTestLog("Setting sync target of slave 2 to slave 1"); - assert.soon(function() { - // We do a write each time and have this in a try...catch block due to the fallout of - // SERVER-24114. If that timeout occurs, then we search for another sync source, however we - // will not find one unless more writes have come in. Additionally, it is possible that - // slaves[1] will switch to sync from slaves[0] after slaves[1] replicates a write from - // the primary but before slaves[0] replicates it. slaves[1] will then have to roll back - // which would cause a network error. - try { - slaves[1].getDB("admin").runCommand({replSetSyncFrom: slaves[0].name}); - var res = slaves[1].getDB("admin").runCommand({"replSetGetStatus": 1}); - master.getDB("foo").bar.insert({a: 1}); - return res.syncingTo === slaves[0].name; - } catch (e) { - print("Exception in assert.soon, retrying: " + e); - return false; - } - }, "sync target not changed to other slave", 100 * 1000, 2 * 1000); + syncFrom(slaves[1], slaves[0], replTest); printjson(replTest.status()); jsTestLog("Lock slave 1 and add some docs. Force sync target for slave 2 to change to primary"); diff --git a/jstests/replsets/no_chaining.js b/jstests/replsets/no_chaining.js index ad086c72f9a..07a67c3ea2b 100644 --- a/jstests/replsets/no_chaining.js +++ b/jstests/replsets/no_chaining.js @@ -1,3 +1,4 @@ +load("jstests/replsets/rslib.js"); function myprint(x) { print("chaining output: " + x); @@ -38,14 +39,7 @@ var checkNoChaining = function() { }; var forceSync = function() { - var config; - try { - config = nodes[2].getDB("local").system.replset.findOne(); - } catch (e) { - config = nodes[2].getDB("local").system.replset.findOne(); - } - var targetHost = config.members[1].host; - printjson(nodes[2].getDB("admin").runCommand({replSetSyncFrom: targetHost})); + syncFrom(nodes[2], nodes[1], replTest); assert.soon(function() { return nodes[2].getDB("test").foo.findOne() != null; }, 'Check for data after force sync'); diff --git a/jstests/replsets/rollback_after_sync_source_selection.js b/jstests/replsets/rollback_after_sync_source_selection.js new file mode 100644 index 00000000000..aecf09dd39b --- /dev/null +++ b/jstests/replsets/rollback_after_sync_source_selection.js @@ -0,0 +1,124 @@ +/* + * This tests that nodes get a new sync source if their sync source rolls back between when it is + * chosen as a sync source and when it is first used. The test sets up a five node replicaset + * and creates a simple rollback scenario. Before node 0 goes into rollback, however, we pause + * the oplog fetcher on node 2, which is syncing from node 0. After the rollback occurs we let + * the oplog fetcher continue and after it gets back its first batch from the sync source it + * realizes that its sync source has rolled back and it errors before getting a new sync source. + */ + +(function() { + 'use strict'; + + load("jstests/libs/check_log.js"); + load("jstests/replsets/rslib.js"); + + var name = "rollback_after_sync_source_selection"; + var collName = "test.coll"; + + var rst = new ReplSetTest({ + name: name, + nodes: [ + {}, + {}, + {rsConfig: {priority: 0}}, + {rsConfig: {arbiterOnly: true}}, + {rsConfig: {arbiterOnly: true}} + ], + useBridge: true + }); + var nodes = rst.startSet(); + rst.initiate(); + + function stepUp(rst, node) { + var primary = rst.getPrimary(); + if (primary !== node) { + try { + assert.commandWorked(primary.adminCommand({replSetStepDown: 60, force: true})); + } catch (ex) { + print("Caught exception while stepping down from node '" + tojson(primary.host) + + "': " + tojson(ex)); + } + waitForState(node, ReplSetTest.State.PRIMARY); + assert.commandWorked(primary.adminCommand({replSetFreeze: 0})); + } + } + + jsTestLog("Make sure node 0 is primary."); + stepUp(rst, nodes[0]); + assert.eq(nodes[0], rst.getPrimary()); + // Wait for all data bearing nodes to get up to date. + assert.writeOK(nodes[0].getCollection(collName).insert( + {a: 0}, {writeConcern: {w: 3, wtimeout: rst.kDefaultTimeoutMS}})); + + jsTestLog("Create two partitions: [1] and [0,2,3,4]."); + nodes[1].disconnect(nodes[0]); + nodes[1].disconnect(nodes[2]); + nodes[1].disconnect(nodes[3]); + nodes[1].disconnect(nodes[4]); + + jsTestLog("Do a write that replicates to [0,2,3,4]."); + assert.writeOK(nodes[0].getCollection(collName).insert( + {a: 1}, {writeConcern: {w: 2, wtimeout: rst.kDefaultTimeoutMS}})); + + jsTestLog("Pausing node 2's oplog fetcher before first fetch."); + assert.commandWorked(nodes[2].getDB('admin').runCommand( + {configureFailPoint: 'fetcherHangBeforeStart', mode: 'alwaysOn'})); + syncFrom(nodes[2], nodes[0], rst); + checkLog.contains(nodes[2], 'fetcherHangBeforeStart fail point enabled'); + + jsTestLog("Do a write on partition [0,2,3,4]; it won't replicate due to the failpoint."); + assert.writeOK(nodes[0].getCollection(collName).insert({a: 2})); + + jsTestLog("Repartition to: [0,2] and [1,3,4]."); + nodes[1].reconnect(nodes[3]); + nodes[1].reconnect(nodes[4]); + nodes[3].disconnect(nodes[0]); + nodes[3].disconnect(nodes[2]); + nodes[4].disconnect(nodes[0]); + nodes[4].disconnect(nodes[2]); + + jsTestLog("Ensure that 0 steps down and that 1 becomes primary."); + waitForState(nodes[0], ReplSetTest.State.SECONDARY); + waitForState(nodes[1], ReplSetTest.State.PRIMARY); + assert.eq(nodes[1], rst.getPrimary()); + + jsTestLog("Do a write to node 1 on the [1,3,4] side of the partition."); + assert.writeOK(nodes[1].getCollection(collName).insert({a: 3})); + + jsTestLog("Remove the partition but maintain that node 2 only talks to node 0."); + var node0RBID = nodes[0].adminCommand('replSetGetRBID').rbid; + nodes[0].reconnect(nodes[1]); + nodes[0].reconnect(nodes[3]); + nodes[0].reconnect(nodes[4]); + + jsTestLog("Wait for node 0 to go into ROLLBACK"); + // Wait for a rollback to happen. + assert.soonNoExcept(function() { + var node0RBIDNew = nodes[0].adminCommand('replSetGetRBID').rbid; + return node0RBIDNew !== node0RBID; + }); + waitForState(nodes[0], ReplSetTest.State.SECONDARY); + + // At this point nodes 0 and 1 should have the same data. + assert.neq(null, + nodes[0].getCollection(collName).findOne({a: 0}), + "Node " + nodes[0].host + + " did not contain initial op that should be present on all nodes"); + assert.eq(null, + nodes[0].getCollection(collName).findOne({a: 1}), + "Node " + nodes[0].host + " contained op that should have been rolled back"); + assert.eq(null, + nodes[0].getCollection(collName).findOne({a: 2}), + "Node " + nodes[0].host + " contained op that should have been rolled back"); + assert.neq(null, + nodes[0].getCollection(collName).findOne({a: 3}), + "Node " + nodes[0].host + " did not contain op from after rollback"); + + jsTestLog("Let oplog fetcher continue and error that the sync source rolled back."); + // Turn off failpoint on node 2 to allow it to continue fetching. + assert.commandWorked(nodes[2].getDB('admin').runCommand( + {configureFailPoint: 'fetcherHangBeforeStart', mode: 'off'})); + checkLog.contains(nodes[2], + "Upstream node rolled back after verifying that it had our MinValid point."); +}());
\ No newline at end of file diff --git a/jstests/replsets/rollback_too_new.js b/jstests/replsets/rollback_too_new.js index 5619df42e18..c7fe4fa2320 100644 --- a/jstests/replsets/rollback_too_new.js +++ b/jstests/replsets/rollback_too_new.js @@ -9,6 +9,8 @@ (function() { "use strict"; + load("jstests/replsets/rslib.js"); + // set up a set and grab things for later var name = "rollback_too_new"; var replTest = new ReplSetTest({name: name, nodes: 3}); @@ -43,9 +45,22 @@ replTest.stop(CID); - // do one write to master - // in order to trigger a rollback on C - assert.writeOK(master.getDB(name).foo.insert({x: 2}, options)); + // We bump the term to make sure node 0's oplog is ahead of node 2's. + var term = getLatestOp(conns[0]).t; + try { + assert.commandWorked(conns[0].adminCommand({replSetStepDown: 1, force: true})); + } catch (e) { + if (e.message.indexOf("error doing query: failed") < 0) { + throw e; + } + } + + // After stepping down due to the higher term, it will eventually get reelected. + replTest.waitForState(conns[0], ReplSetTest.State.PRIMARY); + // Wait for the node to increase its term. + assert.soon(function() { + return getLatestOp(conns[0]).t > term; + }); // Node C should connect to new master as a sync source because chaining is disallowed. // C is ahead of master but it will still connect to it. diff --git a/jstests/replsets/rslib.js b/jstests/replsets/rslib.js index 267397b87a3..3ca7e66d85b 100644 --- a/jstests/replsets/rslib.js +++ b/jstests/replsets/rslib.js @@ -1,3 +1,4 @@ +var syncFrom; var wait, occasionally, reconnect, getLatestOp, waitForAllMembers, reconfig, awaitOpTime; var waitUntilAllNodesCaughtUp; var waitForState; @@ -7,9 +8,33 @@ var startSetIfSupportsReadMajority; (function() { "use strict"; + load("jstests/libs/write_concern_util.js"); + var count = 0; var w = 0; + /** + * A wrapper around `replSetSyncFrom` to ensure that the desired sync source is ahead of the + * syncing node so that the syncing node can choose to sync from the desired sync source. + * It first stops replication on the syncing node so that it can do a write on the desired + * sync source and make sure it's ahead. When replication is restarted, the desired sync + * source will be a valid sync source for the syncing node. + */ + syncFrom = function(syncingNode, desiredSyncSource, rst) { + jsTestLog("Forcing " + syncingNode.name + " to sync from " + desiredSyncSource.name); + stopServerReplication(syncingNode); + var dummyName = "dummyForSyncFrom"; + assert.writeOK(rst.getPrimary().getDB(dummyName).getCollection(dummyName).insert({a: 1})); + // Wait for 'desiredSyncSource' to get the dummy write we just did so we know it's + // definitely ahead of 'syncingNode' before we call replSetSyncFrom. + assert.soonNoExcept(function() { + return desiredSyncSource.getDB(dummyName).getCollection(dummyName).findOne({a: 1}); + }); + assert.commandWorked(syncingNode.adminCommand({replSetSyncFrom: desiredSyncSource.name})); + restartServerReplication(syncingNode); + rst.awaitSyncSource(syncingNode, desiredSyncSource); + }; + wait = function(f, msg) { w++; var n = 0; diff --git a/jstests/replsets/server8070.js b/jstests/replsets/server8070.js index f9d9a3feb16..5d8435a656e 100644 --- a/jstests/replsets/server8070.js +++ b/jstests/replsets/server8070.js @@ -1,3 +1,5 @@ +load("jstests/replsets/rslib.js"); + // Test for SERVER-8070: Flush buffer before changing sync targets to prevent unnecessary rollbacks // This test writes 50 ops to one secondary's data (member2) and 25 ops to the other secondary's // data (member3), then puts 50 more ops in member3's buffer and makes sure that member3 doesn't try @@ -40,8 +42,9 @@ master.getDB("foo").bar.insert({x: 1}); replSet.awaitReplication(); jsTest.log("Make sure 2 & 3 are syncing from the primary"); -member2.adminCommand({replSetSyncFrom: getHostName() + ":" + replSet.ports[0]}); -member3.adminCommand({replSetSyncFrom: getHostName() + ":" + replSet.ports[0]}); +assert.eq(master, replSet.nodes[0]); +syncFrom(replSet.nodes[1], master, replSet); +syncFrom(replSet.nodes[2], master, replSet); jsTest.log("Stop 2's replication"); member2.runCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'alwaysOn'}); diff --git a/jstests/replsets/slavedelay3.js b/jstests/replsets/slavedelay3.js index 2ce6e9b2a80..dc3bb0786d9 100644 --- a/jstests/replsets/slavedelay3.js +++ b/jstests/replsets/slavedelay3.js @@ -29,12 +29,7 @@ nodes[0].disconnect(nodes[2]); master.foo.insert({x: 1}); -assert.commandWorked(nodes[1].getDB("admin").runCommand({"replSetSyncFrom": nodes[0].host})); -var res; -assert.soon(function() { - res = nodes[1].getDB("admin").runCommand({"replSetGetStatus": 1}); - return res.syncingTo === nodes[0].host; -}, "node 4 failed to start chaining: " + tojson(res)); +syncFrom(nodes[1], nodes[0], replTest); // make sure the record still appears in the remote slave assert.soon(function() { diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 671d56ed68d..98d86f0c7c0 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -52,11 +52,12 @@ #include "mongo/db/repl/rollback_source_impl.h" #include "mongo/db/repl/rs_rollback.h" #include "mongo/db/repl/rs_sync.h" +#include "mongo/db/repl/storage_interface.h" #include "mongo/db/stats/timer_stats.h" #include "mongo/executor/network_interface_factory.h" -#include "mongo/db/repl/storage_interface.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/metadata/repl_set_metadata.h" +#include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/stdx/memory.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/exit.h" @@ -136,6 +137,9 @@ static ServerStatusMetricField<Counter64> displayBytesRead("repl.network.bytes", // Failpoint which causes rollback to hang before starting. MONGO_FP_DECLARE(rollbackHangBeforeStart); +// Failpoint which causes the oplog fetcher to hang before the first fetch. +MONGO_FP_DECLARE(fetcherHangBeforeStart); + // The count of items in the buffer static Counter64 bufferCountGauge; static ServerStatusMetricField<Counter64> displayBufferCount("repl.buffer.count", @@ -159,6 +163,8 @@ size_t getSize(const BSONObj& o) { } } // namespace +const NamespaceString BackgroundSync::kLocalOplogNss("local.oplog.rs"); + BackgroundSync::BackgroundSync() : _buffer(bufferMaxSizeGauge, &getSize), _threadPoolTaskExecutor(makeThreadPool(), @@ -298,12 +304,13 @@ void BackgroundSync::_produce(OperationContext* txn) { } } - + HostAndPort oldSyncSource; // find a target to sync from the last optime fetched OpTime lastOpTimeFetched; { stdx::unique_lock<stdx::mutex> lock(_mutex); lastOpTimeFetched = _lastOpTimeFetched; + oldSyncSource = _syncSourceHost; _syncSourceHost = HostAndPort(); } OplogReader syncSourceReader; @@ -325,6 +332,17 @@ void BackgroundSync::_produce(OperationContext* txn) { return; } + // If our sync source has not changed, it is likely caused by our heartbeat data map being + // out of date. In that case we sleep for 1 second to reduce the amount we spin waiting + // for our map to update. + if (syncSourceReader.getHost() == oldSyncSource) { + log() << "Chose same sync source candidate as last time, " << oldSyncSource + << ". Sleeping for 1 second to avoid immediately choosing a new sync source for " + "the same reason as last time."; + + sleepsecs(1); + } + long long lastHashFetched; { stdx::lock_guard<stdx::mutex> lock(_mutex); @@ -386,6 +404,20 @@ void BackgroundSync::_produce(OperationContext* txn) { metadataBob.append(rpc::kReplSetMetadataFieldName, 1); } + if (MONGO_FAIL_POINT(fetcherHangBeforeStart)) { + // This log output is used in js tests so please leave it. + log() << "BackgroundSync - fetcherHangBeforeStart fail point " + "enabled. Blocking until fail point is disabled."; + while (MONGO_FAIL_POINT(fetcherHangBeforeStart) && !inShutdown()) { + mongo::sleepsecs(1); + } + + // If the sync source candidate rolls back while in this fail point, it will close all + // connections and the next request will fail. + // We manually drop all connections here so that the following Fetcher request succeeds. + _threadPoolTaskExecutor.dropConnections(source); + } + auto dbName = nsToDatabase(rsOplogName); auto cmdObj = cmdBob.obj(); auto metadataObj = metadataBob.obj(); @@ -467,6 +499,39 @@ void BackgroundSync::_produce(OperationContext* txn) { } } +void BackgroundSync::_lastAppliedFetcherCallback(const StatusWith<Fetcher::QueryResponse>& result, + OpTime lastOpTimeFetched, + Status* returnStatus) { + if (!result.isOK()) { + *returnStatus = result.getStatus(); + return; + } + + const auto& queryResponse = result.getValue(); + if (queryResponse.documents.empty()) { + *returnStatus = Status(ErrorCodes::InvalidSyncSource, "Upstream node had an empty oplog."); + return; + } + + const auto& remoteLastAppliedDocument = queryResponse.documents.front(); + const auto remoteLastAppliedOpTime = OpTime::parseFromOplogEntry(remoteLastAppliedDocument); + if (!remoteLastAppliedOpTime.isOK()) { + *returnStatus = Status(ErrorCodes::InvalidBSON, + str::stream() << "Received invalid oplog entry from upstream node: " + << remoteLastAppliedDocument.toString() << ". Error: " + << remoteLastAppliedOpTime.getStatus().toString()); + return; + } + if (remoteLastAppliedOpTime.getValue() <= lastOpTimeFetched) { + *returnStatus = Status(ErrorCodes::InvalidSyncSource, + str::stream() << "Upstream node's last applied OpTime " + << remoteLastAppliedOpTime.getValue().toString() + << " is not greater than our last fetched OpTime " + << lastOpTimeFetched.toString()); + return; + } +} + void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& result, BSONObjBuilder* bob, const HostAndPort& source, @@ -528,13 +593,13 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& const auto rbidElem = rbidReplyObj["rbid"]; if (rbidElem.type() != NumberInt) { *returnStatus = - Status(ErrorCodes::BadValue, + Status(ErrorCodes::InvalidSyncSource, str::stream() << "Upstream node returned an " << "rbid with invalid type " << rbidElem.type()); return; } if (rbidElem.Int() != rbid) { - *returnStatus = Status(ErrorCodes::BadValue, + *returnStatus = Status(ErrorCodes::InvalidSyncSource, "Upstream node rolled back after verifying " "that it had our MinValid point. Retrying."); } @@ -548,6 +613,24 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>& if (!returnStatus->isOK()) return; + // Check that the upstream last applied OpTime is newer than our last fetched OpTime. + Fetcher lastAppliedFetcher(&_threadPoolTaskExecutor, + source, + kLocalOplogNss.db().toString(), + BSON("find" << kLocalOplogNss.coll() << "limit" << 1 << "sort" + << BSON("$natural" << -1)), + stdx::bind(&BackgroundSync::_lastAppliedFetcherCallback, + this, + stdx::placeholders::_1, + lastOpTimeFetched, + returnStatus), + rpc::ServerSelectionMetadata(true, boost::none).toBSON(), + Seconds(30)); + lastAppliedFetcher.schedule(); + lastAppliedFetcher.wait(); + if (!returnStatus->isOK()) + return; + auto getNextOperation = [&firstDocToApply, lastDocToApply]() -> StatusWith<BSONObj> { if (firstDocToApply == lastDocToApply) { return Status(ErrorCodes::OplogStartMissing, "remote oplog start missing"); diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 0c1a1ea99f5..20ea6629ba0 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -77,6 +77,8 @@ public: */ class BackgroundSync : public BackgroundSyncInterface { public: + static const NamespaceString kLocalOplogNss; + // Allow index prefetching to be turned on/off enum IndexPrefetchConfig { UNINITIALIZED = 0, @@ -191,6 +193,14 @@ private: int rbid); /** + * A callback to a Fetcher that checks that the remote last applied OpTime is newer than the + * local last fetched OpTime. + */ + void _lastAppliedFetcherCallback(const StatusWith<Fetcher::QueryResponse>& result, + OpTime lastOpTimeFetched, + Status* returnStatus); + + /** * Executes a rollback. * 'getConnection' returns a connection to the sync source. */ |