From 4a6efad4d422b9a06ff0b7e98bfc9b7cc63b5864 Mon Sep 17 00:00:00 2001 From: Benety Goh Date: Tue, 24 Jan 2017 14:06:46 -0500 Subject: SERVER-27123 Only update the commit point as a secondary from oplog queries against your sync source (cherry picked from commit 87f49488f1b5c872daa71fd2fd9b5d744409a817) SERVER-27680 Merge stopOplogFetcher and pauseRsBgSyncProducer failpoint into single stopReplProducer failpoint (cherry picked from commit 21948042b6da5fb5bf15897f9808a70551f5af09) SERVER-27053 Don't acknowledge writes if the term has changed. (cherry picked from commit 8347e322cd46e8ee847e1730a7e94ea8e3981c53) --- .../resmokeconfig/suites/replica_sets_legacy.yml | 2 + jstests/libs/check_log.js | 67 ++++ jstests/libs/write_concern_util.js | 39 ++- .../replsets/disallow_adding_initialized_node1.js | 15 +- .../replsets/disallow_adding_initialized_node2.js | 17 +- jstests/replsets/double_rollback.js | 22 +- jstests/replsets/read_after_optime.js | 29 +- jstests/replsets/read_committed_after_rollback.js | 6 +- jstests/replsets/read_committed_stale_history.js | 159 +++++++++ jstests/replsets/rslib.js | 141 ++++++++ jstests/replsets/server8070.js | 6 +- jstests/replsets/write_concern_after_stepdown.js | 101 ++++++ .../write_concern_after_stepdown_and_stepup.js | 126 +++++++ src/mongo/db/repl/bgsync.cpp | 79 +++-- src/mongo/db/repl/replication_coordinator.h | 13 +- src/mongo/db/repl/replication_coordinator_impl.cpp | 60 +++- src/mongo/db/repl/replication_coordinator_impl.h | 28 +- .../replication_coordinator_impl_heartbeat.cpp | 49 ++- ...lication_coordinator_impl_heartbeat_v1_test.cpp | 2 +- .../db/repl/replication_coordinator_impl_test.cpp | 367 +++++++++++++-------- src/mongo/db/repl/replication_coordinator_mock.cpp | 3 +- src/mongo/db/repl/replication_coordinator_mock.h | 3 +- src/mongo/db/repl/replset_commands.cpp | 2 +- src/mongo/db/write_concern.cpp | 5 + src/mongo/shell/assert.js | 27 +- 25 files changed, 1093 insertions(+), 275 deletions(-) create mode 100644 jstests/libs/check_log.js create mode 100644 jstests/replsets/read_committed_stale_history.js create mode 100644 jstests/replsets/write_concern_after_stepdown.js create mode 100644 jstests/replsets/write_concern_after_stepdown_and_stepup.js diff --git a/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml b/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml index 5a75af572a0..0fcf3cdfe7a 100644 --- a/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml +++ b/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml @@ -17,6 +17,8 @@ selector: - jstests/replsets/sync2.js # PV0 does not persist the last vote - jstests/replsets/last_vote.js + # PV0's w:majority guarantees aren't strong enough for this test to pass. + - jstests/replsets/write_concern_after_stepdown_and_stepup.js executor: js_test: diff --git a/jstests/libs/check_log.js b/jstests/libs/check_log.js new file mode 100644 index 00000000000..81bab58ce2a --- /dev/null +++ b/jstests/libs/check_log.js @@ -0,0 +1,67 @@ +/* + * Helper functions which connect to a server, and check its logs for particular strings. + */ +var checkLog; + +(function() { + "use strict"; + + if (checkLog) { + return; // Protect against this file being double-loaded. + } + + checkLog = (function() { + /* + * Calls the 'getLog' function at regular intervals on the provided connection 'conn' until + * the provided 'msg' is found in the logs, or 5 minutes have elapsed. Throws an exception + * on timeout. + */ + var contains = function(conn, msg) { + assert.soon( + function() { + var logMessages = + assert.commandWorked(conn.adminCommand({getLog: 'global'})).log; + for (var i = 0; i < logMessages.length; i++) { + if (logMessages[i].indexOf(msg) != -1) { + return true; + } + } + return false; + }, + 'Could not find log entries containing the following message: ' + msg, + 5 * 60 * 1000, + 300); + }; + + /* + * Calls the 'getLog' function at regular intervals on the provided connection 'conn' until + * the provided 'msg' is found in the logs exactly 'expectedCount' times, or 5 minutes have + * elapsed. + * Throws an exception on timeout. + */ + var containsWithCount = function(conn, msg, expectedCount) { + var count = 0; + assert.soon( + function() { + var logMessages = + assert.commandWorked(conn.adminCommand({getLog: 'global'})).log; + for (var i = 0; i < logMessages.length; i++) { + if (logMessages[i].indexOf(msg) != -1) { + count++; + } + } + + return expectedCount === count; + }, + 'Expected ' + expectedCount + ', but instead saw ' + count + + ' log entries containing the following message: ' + msg, + 5 * 60 * 1000, + 300); + }; + + return { + contains: contains, + containsWithCount: containsWithCount + }; + })(); +})(); diff --git a/jstests/libs/write_concern_util.js b/jstests/libs/write_concern_util.js index bfbd3b024a0..162a73739c5 100644 --- a/jstests/libs/write_concern_util.js +++ b/jstests/libs/write_concern_util.js @@ -1,18 +1,29 @@ /** * Utilities for testing writeConcern. */ -// Stops replication at a server. + +load("jstests/libs/check_log.js"); + +// Stops replication on the given server(s). function stopServerReplication(conn) { - var errMsg = 'Failed to enable rsSyncApplyStop failpoint.'; + if (conn.length) { + conn.forEach(function(n) { + stopServerReplication(n); + }); + return; + } + var errMsg = 'Failed to enable stopReplProducer failpoint.'; assert.commandWorked( - conn.getDB('admin').runCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'alwaysOn'}), + conn.getDB('admin').runCommand({configureFailPoint: 'stopReplProducer', mode: 'alwaysOn'}), errMsg); + + // Wait until the fail point is actually hit. + checkLog.contains(conn, 'bgsync - stopReplProducer fail point enabled'); } // Stops replication at all replicaset secondaries. function stopReplicationOnSecondaries(rs) { - var secondaries = rs.getSecondaries(); - secondaries.forEach(stopServerReplication); + stopServerReplication(rs.getSecondaries()); } // Stops replication at all shard secondaries. @@ -20,23 +31,29 @@ function stopReplicationOnSecondariesOfAllShards(st) { st._rsObjects.forEach(stopReplicationOnSecondaries); } -// Restarts replication at a server. +// Restarts replication on the given server(s). function restartServerReplication(conn) { - var errMsg = 'Failed to disable rsSyncApplyStop failpoint.'; + if (conn.length) { + conn.forEach(function(n) { + restartServerReplication(n); + }); + return; + } + + var errMsg = 'Failed to disable stopReplProducer failpoint.'; assert.commandWorked( - conn.getDB('admin').runCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'off'}), + conn.getDB('admin').runCommand({configureFailPoint: 'stopReplProducer', mode: 'off'}), errMsg); } // Restarts replication at all nodes in a replicaset. function restartReplSetReplication(rs) { - rs.nodes.forEach(restartServerReplication); + restartServerReplication(rs.nodes); } // Restarts replication at all replicaset secondaries. function restartReplicationOnSecondaries(rs) { - var secondaries = rs.getSecondaries(); - secondaries.forEach(restartServerReplication); + restartServerReplication(rs.getSecondaries()); } // Restarts replication at all nodes in a sharded cluster. diff --git a/jstests/replsets/disallow_adding_initialized_node1.js b/jstests/replsets/disallow_adding_initialized_node1.js index 8d4491975b6..ba39e6010ad 100644 --- a/jstests/replsets/disallow_adding_initialized_node1.js +++ b/jstests/replsets/disallow_adding_initialized_node1.js @@ -3,8 +3,10 @@ // Initialize two replica sets A and B with the same name: A_0; B_0 // Add B_0 to the replica set A. This operation should fail on replica set A should fail on // detecting an inconsistent replica set ID in the heartbeat response metadata from B_0. + (function() { 'use strict'; + load("jstests/libs/check_log.js"); var name = 'disallow_adding_initialized_node1'; var replSetA = new ReplSetTest({name: name, nodes: [{rsConfig: {_id: 10}}, ]}); @@ -45,20 +47,9 @@ assert.eq(primaryB, newPrimaryB); // Mismatch replica set IDs in heartbeat responses should be logged. - var checkLog = function(node, msg) { - assert.soon(function() { - var logMessages = assert.commandWorked(node.adminCommand({getLog: 'global'})).log; - for (var i = 0; i < logMessages.length; i++) { - if (logMessages[i].indexOf(msg) != -1) { - return true; - } - } - return false; - }, 'Did not see a log entry containing the following message: ' + msg, 10000, 1000); - }; var msgB = "replica set IDs do not match, ours: " + configB.settings.replicaSetId + "; remote node's: " + configA.settings.replicaSetId; - checkLog(primaryB, msgB); + checkLog.contains(primaryB, msgB); var statusA = assert.commandWorked(primaryA.adminCommand({replSetGetStatus: 1})); var statusB = assert.commandWorked(primaryB.adminCommand({replSetGetStatus: 1})); diff --git a/jstests/replsets/disallow_adding_initialized_node2.js b/jstests/replsets/disallow_adding_initialized_node2.js index c4125f7c069..793e6a3f02e 100644 --- a/jstests/replsets/disallow_adding_initialized_node2.js +++ b/jstests/replsets/disallow_adding_initialized_node2.js @@ -8,8 +8,10 @@ // This test requires users to persist across a restart. // @tags: [requires_persistence] + (function() { 'use strict'; + load("jstests/libs/check_log.js"); var name = 'disallow_adding_initialized_node2'; var replSetA = new ReplSetTest( @@ -52,23 +54,12 @@ assert.eq(primaryB, newPrimaryB); // Mismatch replica set IDs in heartbeat responses should be logged. - var checkLog = function(node, msg) { - assert.soon(function() { - var logMessages = assert.commandWorked(node.adminCommand({getLog: 'global'})).log; - for (var i = 0; i < logMessages.length; i++) { - if (logMessages[i].indexOf(msg) != -1) { - return true; - } - } - return false; - }, 'Did not see a log entry containing the following message: ' + msg, 10000, 1000); - }; var msgA = "replica set IDs do not match, ours: " + configA.settings.replicaSetId + "; remote node's: " + configB.settings.replicaSetId; var msgB = "replica set IDs do not match, ours: " + configB.settings.replicaSetId + "; remote node's: " + configA.settings.replicaSetId; - checkLog(primaryA, msgA); - checkLog(primaryB, msgB); + checkLog.contains(primaryA, msgA); + checkLog.contains(primaryB, msgB); var statusA = assert.commandWorked(primaryA.adminCommand({replSetGetStatus: 1})); var statusB = assert.commandWorked(primaryB.adminCommand({replSetGetStatus: 1})); diff --git a/jstests/replsets/double_rollback.js b/jstests/replsets/double_rollback.js index 2286b80e315..d3018ec69d6 100644 --- a/jstests/replsets/double_rollback.js +++ b/jstests/replsets/double_rollback.js @@ -9,8 +9,13 @@ * it rolled back, which could then lead to a double-rollback when node 2 was reconnected * to node 1 and tried to apply its oplog despite not being in a consistent state. */ + (function() { 'use strict'; + load("jstests/libs/check_log.js"); + + load("jstests/libs/check_log.js"); + load("jstests/replsets/rslib.js"); var name = "double_rollback"; var dbName = "test"; @@ -100,18 +105,7 @@ jsTestLog("Wait for failpoint on node 2 to pause rollback before it finishes"); // Wait for fail point message to be logged. - var checkLog = function(node, msg) { - assert.soon(function() { - var logMessages = assert.commandWorked(node.adminCommand({getLog: 'global'})).log; - for (var i = 0; i < logMessages.length; i++) { - if (logMessages[i].indexOf(msg) != -1) { - return true; - } - } - return false; - }, 'Did not see a log entry containing the following message: ' + msg, timeout); - }; - checkLog(nodes[2], 'rollback - rollbackHangBeforeFinish fail point enabled'); + checkLog.contains(nodes[2], 'rollback - rollbackHangBeforeFinish fail point enabled'); jsTestLog("Repartition to: [1,3,4] and [0,2]."); nodes[2].disconnect(nodes[1]); @@ -128,8 +122,8 @@ // for a sync source it can use to reach minvalid and get back into SECONDARY state. Node 0 // is the only node it can reach, but since node 0 doesn't contain node 2's minvalid oplog entry // node 2 will refuse to use it as a sync source. - checkLog(nodes[2], - "remote oplog does not contain entry with optime matching our required optime"); + checkLog.contains( + nodes[2], "remote oplog does not contain entry with optime matching our required optime"); var node0RBID = nodes[0].adminCommand('replSetGetRBID').rbid; var node1RBID = nodes[1].adminCommand('replSetGetRBID').rbid; diff --git a/jstests/replsets/read_after_optime.js b/jstests/replsets/read_after_optime.js index 30cf7782679..7c5217660f6 100644 --- a/jstests/replsets/read_after_optime.js +++ b/jstests/replsets/read_after_optime.js @@ -2,6 +2,7 @@ (function() { "use strict"; + load("jstests/libs/check_log.js"); var replTest = new ReplSetTest({nodes: 2}); replTest.startSet(); @@ -33,30 +34,6 @@ assert.gt(timeoutResult.waitedMS, 500); }; - var countLogMessages = function(msg) { - var total = 0; - var logMessages = assert.commandWorked(testDB.adminCommand({getLog: 'global'})).log; - for (var i = 0; i < logMessages.length; i++) { - if (logMessages[i].indexOf(msg) != -1) { - total++; - } - } - return total; - }; - - var checkLog = function(msg, expectedCount) { - var count; - assert.soon( - function() { - count = countLogMessages(msg); - return expectedCount == count; - }, - 'Expected ' + expectedCount + ', but instead saw ' + count + - ' log entries containing the following message: ' + msg, - 60000, - 300); - }; - // Run the time out test 3 times with replication debug log level increased to 2 // for first and last run. The time out message should be logged twice. testDB.setLogLevel(2, 'command'); @@ -65,7 +42,7 @@ var msg = 'Command on database ' + testDB.getName() + ' timed out waiting for read concern to be satisfied. Command:'; - checkLog(msg, 1); + checkLog.containsWithCount(testDB.getMongo(), msg, 1); // Read concern timed out message should not be logged. runTimeoutTest(); @@ -74,7 +51,7 @@ runTimeoutTest(); testDB.setLogLevel(0, 'command'); - checkLog(msg, 2); + checkLog.containsWithCount(testDB.getMongo(), msg, 2); // Test read on future afterOpTime that will eventually occur. var insertFunc = startParallelShell( diff --git a/jstests/replsets/read_committed_after_rollback.js b/jstests/replsets/read_committed_after_rollback.js index 68ba334ac9c..4101d9a242c 100644 --- a/jstests/replsets/read_committed_after_rollback.js +++ b/jstests/replsets/read_committed_after_rollback.js @@ -149,8 +149,10 @@ load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority. // now be visible as a committed read to both oldPrimary and newPrimary. assert.commandWorked( pureSecondary.adminCommand({configureFailPoint: "rsSyncApplyStop", mode: "off"})); - assert.commandWorked( - newPrimaryColl.runCommand({getLastError: 1, w: 'majority', wtimeout: 30000})); + // Do a write to the new primary so that the old primary can establish a sync source to learn + // about the new commit. + assert.writeOK(newPrimary.getDB(name).unrelatedCollection.insert( + {a: 1}, {writeConcern: {w: 'majority', wtimeout: replTest.kDefaultTimeoutMS}})); assert.eq(doCommittedRead(newPrimaryColl), 'new'); assert.eq(doCommittedRead(oldPrimaryColl), 'new'); }()); diff --git a/jstests/replsets/read_committed_stale_history.js b/jstests/replsets/read_committed_stale_history.js new file mode 100644 index 00000000000..99eac7ec5c4 --- /dev/null +++ b/jstests/replsets/read_committed_stale_history.js @@ -0,0 +1,159 @@ +/* + * Tests that a node on a stale branch of history won't incorrectly mark its ops as committed even + * when hearing about a commit point with a higher optime from a new primary. + */ +(function() { + 'use strict'; + + load("jstests/libs/check_log.js"); + load("jstests/libs/write_concern_util.js"); + load("jstests/replsets/rslib.js"); + + var name = "readCommittedStaleHistory"; + var dbName = "wMajorityCheck"; + var collName = "stepdown"; + + var rst = new ReplSetTest({ + name: name, + nodes: [{}, {}, {rsConfig: {priority: 0}}, ], + nodeOptions: {enableMajorityReadConcern: ""}, + useBridge: true + }); + + if (!startSetIfSupportsReadMajority(rst)) { + jsTest.log("skipping test since storage engine doesn't support committed reads"); + return; + } + + var nodes = rst.nodes; + rst.initiate(); + + /** + * Waits for the given node to be in state primary *and* have finished drain mode and thus + * be available for writes. + */ + function waitForPrimary(node) { + assert.soon(function() { + return node.adminCommand('ismaster').ismaster; + }); + } + + function stepUp(node) { + var primary = rst.getPrimary(); + if (primary != node) { + assert.throws(function() { + primary.adminCommand({replSetStepDown: 60 * 5}); + }); + } + waitForPrimary(node); + } + + // Asserts that the given document is not visible in the committed snapshot on the given node. + function checkDocNotCommitted(node, doc) { + var docs = + node.getDB(dbName).getCollection(collName).find(doc).readConcern('majority').toArray(); + assert.eq(0, docs.length, tojson(docs)); + } + + jsTestLog("Make sure node 0 is primary."); + rst.getPrimary(); + stepUp(nodes[0]); + var primary = rst.getPrimary(); + var secondaries = rst.getSecondaries(); + assert.eq(nodes[0], primary); + // Wait for all data bearing nodes to get up to date. + assert.writeOK(nodes[0].getDB(dbName).getCollection(collName).insert( + {a: 1}, {writeConcern: {w: 3, wtimeout: rst.kDefaultTimeoutMS}})); + + // Stop the secondaries from replicating. + stopServerReplication(secondaries); + // Stop the primary from being able to complete stepping down. + assert.commandWorked( + nodes[0].adminCommand({configureFailPoint: 'blockHeartbeatStepdown', mode: 'alwaysOn'})); + + jsTestLog("Do a write that won't ever reach a majority of nodes"); + assert.writeOK(nodes[0].getDB(dbName).getCollection(collName).insert({a: 2})); + + // Ensure that the write that was just done is not visible in the committed snapshot. + checkDocNotCommitted(nodes[0], {a: 2}); + + // Prevent the primary from rolling back later on. + assert.commandWorked( + nodes[0].adminCommand({configureFailPoint: 'rollbackHangBeforeStart', mode: 'alwaysOn'})); + + jsTest.log("Disconnect primary from all secondaries"); + nodes[0].disconnect(nodes[1]); + nodes[0].disconnect(nodes[2]); + + // Ensure the soon-to-be primary cannot see the write from the old primary. + assert.eq(null, nodes[1].getDB(dbName).getCollection(collName).findOne({a: 2})); + + jsTest.log("Wait for a new primary to be elected"); + // Allow the secondaries to replicate again. + restartServerReplication(secondaries); + + waitForPrimary(nodes[1]); + + jsTest.log("Do a write to the new primary"); + assert.writeOK(nodes[1].getDB(dbName).getCollection(collName).insert( + {a: 3}, {writeConcern: {w: 2, wtimeout: rst.kDefaultTimeoutMS}})); + + // Ensure the new primary still cannot see the write from the old primary. + assert.eq(null, nodes[1].getDB(dbName).getCollection(collName).findOne({a: 2})); + + // Ensure the stale primary still hasn't committed the write it did that never reached + // the other nodes. + checkDocNotCommitted(nodes[0], {a: 2}); + + jsTest.log("Reconnect the old primary to the rest of the nodes"); + nodes[1].reconnect(nodes[0]); + nodes[2].reconnect(nodes[0]); + + // Sleep 10 seconds to allow some heartbeats to be processed, so we can verify that the + // heartbeats don't cause the stale primary to incorrectly advance the commit point. + sleep(10000); + + // Ensure the new primary still cannot see the write from the old primary. + assert.eq(null, nodes[1].getDB(dbName).getCollection(collName).findOne({a: 2})); + + // Ensure the stale primary still hasn't committed the write it did that never reached + // the other nodes. + checkDocNotCommitted(nodes[0], {a: 2}); + + jsTest.log("Allow the old primary to finish stepping down and become secondary"); + var res = null; + try { + res = nodes[0].adminCommand({configureFailPoint: 'blockHeartbeatStepdown', mode: 'off'}); + } catch (e) { + // Expected - once we disable the fail point the stepdown will proceed and it's racy whether + // the stepdown closes all connections before or after the configureFailPoint command + // returns + } + if (res) { + assert.commandWorked(res); + } + rst.waitForState(nodes[0], ReplSetTest.State.SECONDARY); + + // At this point the former primary will attempt to go into rollback, but the + // 'rollbackHangBeforeStart' will prevent it from doing so. + checkDocNotCommitted(nodes[0], {a: 2}); + checkLog.contains(nodes[0], 'rollback - rollbackHangBeforeStart fail point enabled'); + checkDocNotCommitted(nodes[0], {a: 2}); + + jsTest.log("Allow the original primary to roll back its write and catch up to the new primary"); + assert.commandWorked( + nodes[0].adminCommand({configureFailPoint: 'rollbackHangBeforeStart', mode: 'off'})); + + assert.soonNoExcept(function() { + return null == nodes[0].getDB(dbName).getCollection(collName).findOne({a: 2}); + }, "Original primary never rolled back its write"); + + rst.awaitReplication(); + + // Ensure that the old primary got the write that the new primary did and sees it as committed. + assert.neq( + null, + nodes[0].getDB(dbName).getCollection(collName).find({a: 3}).readConcern('majority').next()); + + rst.stopSet(); +}()); diff --git a/jstests/replsets/rslib.js b/jstests/replsets/rslib.js index 48e934a88d8..267397b87a3 100644 --- a/jstests/replsets/rslib.js +++ b/jstests/replsets/rslib.js @@ -1,5 +1,9 @@ var wait, occasionally, reconnect, getLatestOp, waitForAllMembers, reconfig, awaitOpTime; var waitUntilAllNodesCaughtUp; +var waitForState; +var awaitRSClientHosts; +var getLastOpTime; +var startSetIfSupportsReadMajority; (function() { "use strict"; @@ -192,4 +196,141 @@ var waitUntilAllNodesCaughtUp; timeout); }; + /** + * Waits for the given node to reach the given state, ignoring network errors. + */ + waitForState = function(node, state) { + assert.soonNoExcept(function() { + assert.commandWorked(node.adminCommand( + {replSetTest: 1, waitForMemberState: state, timeoutMillis: 60 * 1000 * 5})); + return true; + }); + }; + + /** + * Starts each node in the given replica set if the storage engine supports readConcern + *'majority'. + * Returns true if the replica set was started successfully and false otherwise. + * + * @param replSetTest - The instance of {@link ReplSetTest} to start + * @param options - The options passed to {@link ReplSetTest.startSet} + */ + startSetIfSupportsReadMajority = function(replSetTest, options) { + try { + replSetTest.startSet(options); + } catch (e) { + var conn = MongoRunner.runMongod(); + if (!conn.getDB("admin").serverStatus().storageEngine.supportsCommittedReads) { + MongoRunner.stopMongod(conn); + return false; + } + throw e; + } + return true; + }; + + /** + * Waits for the specified hosts to enter a certain state. + */ + awaitRSClientHosts = function(conn, host, hostOk, rs, timeout) { + var hostCount = host.length; + if (hostCount) { + for (var i = 0; i < hostCount; i++) { + awaitRSClientHosts(conn, host[i], hostOk, rs); + } + + return; + } + + timeout = timeout || 5 * 60 * 1000; + + if (hostOk == undefined) + hostOk = { + ok: true + }; + if (host.host) + host = host.host; + if (rs) + rs = rs.name; + + print("Awaiting " + host + " to be " + tojson(hostOk) + " for " + conn + " (rs: " + rs + + ")"); + + var tests = 0; + + assert.soon(function() { + var rsClientHosts = conn.adminCommand('connPoolStats').replicaSets; + if (tests++ % 10 == 0) { + printjson(rsClientHosts); + } + + for (var rsName in rsClientHosts) { + if (rs && rs != rsName) + continue; + + for (var i = 0; i < rsClientHosts[rsName].hosts.length; i++) { + var clientHost = rsClientHosts[rsName].hosts[i]; + if (clientHost.addr != host) + continue; + + // Check that *all* host properties are set correctly + var propOk = true; + for (var prop in hostOk) { + // Use special comparator for tags because isMaster can return the fields in + // different order. The fields of the tags should be treated like a set of + // strings and 2 tags should be considered the same if the set is equal. + if (prop == 'tags') { + if (!clientHost.tags) { + propOk = false; + break; + } + + for (var hostTag in hostOk.tags) { + if (clientHost.tags[hostTag] != hostOk.tags[hostTag]) { + propOk = false; + break; + } + } + + for (var clientTag in clientHost.tags) { + if (clientHost.tags[clientTag] != hostOk.tags[clientTag]) { + propOk = false; + break; + } + } + + continue; + } + + if (isObject(hostOk[prop])) { + if (!friendlyEqual(hostOk[prop], clientHost[prop])) { + propOk = false; + break; + } + } else if (clientHost[prop] != hostOk[prop]) { + propOk = false; + break; + } + } + + if (propOk) { + return true; + } + } + } + + return false; + }, 'timed out waiting for replica set client to recognize hosts', timeout); + }; + + /** + * Returns the last opTime of the connection based from replSetGetStatus. Can only + * be used on replica set nodes. + */ + getLastOpTime = function(conn) { + var replSetStatus = + assert.commandWorked(conn.getDB("admin").runCommand({replSetGetStatus: 1})); + var connStatus = replSetStatus.members.filter(m => m.self)[0]; + return connStatus.optime; + }; }()); diff --git a/jstests/replsets/server8070.js b/jstests/replsets/server8070.js index e91e95e99a4..f9d9a3feb16 100644 --- a/jstests/replsets/server8070.js +++ b/jstests/replsets/server8070.js @@ -100,8 +100,8 @@ assert(syncingTo !== getHostName() + ":" + replSet.ports[1], "node 3 is syncing jsTest.log("Pause 3's bgsync thread"); var rsBgSyncProduceResult3 = - member3.runCommand({configureFailPoint: 'rsBgSyncProduce', mode: 'alwaysOn'}); -assert.eq(1, rsBgSyncProduceResult3.ok, "member 3 rsBgSyncProduce admin command failed"); + member3.runCommand({configureFailPoint: 'stopReplProducer', mode: 'alwaysOn'}); +assert.eq(1, rsBgSyncProduceResult3.ok, "member 3 stopReplProducer admin command failed"); // count documents in member 3 assert.eq(26, @@ -123,7 +123,7 @@ assert.soon(function() { }, "Replication member 3 did not apply ops 25-75"); jsTest.log("Start 3's bgsync thread"); -member3.runCommand({configureFailPoint: 'rsBgSyncProduce', mode: 'off'}); +member3.runCommand({configureFailPoint: 'stopReplProducer', mode: 'off'}); jsTest.log("Node 3 shouldn't hit rollback"); var end = (new Date()).getTime() + 10000; diff --git a/jstests/replsets/write_concern_after_stepdown.js b/jstests/replsets/write_concern_after_stepdown.js new file mode 100644 index 00000000000..22f6f627fda --- /dev/null +++ b/jstests/replsets/write_concern_after_stepdown.js @@ -0,0 +1,101 @@ +/* + * Tests that heartbeats containing writes from a different branch of history can't cause a stale + * primary to incorrectly acknowledge a w:majority write that's about to be rolled back. + */ +(function() { + 'use strict'; + + load("jstests/replsets/rslib.js"); + load("jstests/libs/write_concern_util.js"); + + var name = "writeConcernStepDownAndBackUp"; + var dbName = "wMajorityCheck"; + var collName = "stepdownAndBackUp"; + + var rst = new ReplSetTest( + {name: name, nodes: [{}, {}, {rsConfig: {priority: 0}}, ], useBridge: true}); + var nodes = rst.startSet(); + rst.initiate(); + + function waitForPrimary(node) { + assert.soon(function() { + return node.adminCommand('ismaster').ismaster; + }); + } + + function stepUp(node) { + var primary = rst.getPrimary(); + if (primary != node) { + assert.throws(function() { + primary.adminCommand({replSetStepDown: 60 * 5}); + }); + } + waitForPrimary(node); + } + + jsTestLog("Make sure node 0 is primary."); + stepUp(nodes[0]); + var primary = rst.getPrimary(); + var secondaries = rst.getSecondaries(); + assert.eq(nodes[0], primary); + // Wait for all data bearing nodes to get up to date. + assert.writeOK(nodes[0].getDB(dbName).getCollection(collName).insert( + {a: 1}, {writeConcern: {w: 3, wtimeout: rst.kDefaultTimeoutMS}})); + + // Stop the secondaries from replicating. + stopServerReplication(secondaries); + // Stop the primary from being able to complete stepping down. + assert.commandWorked( + nodes[0].adminCommand({configureFailPoint: 'blockHeartbeatStepdown', mode: 'alwaysOn'})); + + jsTestLog("Do w:majority write that will block waiting for replication."); + var doMajorityWrite = function() { + var res = db.getSiblingDB('wMajorityCheck') + .stepdownAndBackUp.insert({a: 2}, {writeConcern: {w: 'majority'}}); + assert.writeErrorWithCode(res, ErrorCodes.PrimarySteppedDown); + }; + + var joinMajorityWriter = startParallelShell(doMajorityWrite, nodes[0].port); + + jsTest.log("Disconnect primary from all secondaries"); + nodes[0].disconnect(nodes[1]); + nodes[0].disconnect(nodes[2]); + + jsTest.log("Wait for a new primary to be elected"); + // Allow the secondaries to replicate again. + restartServerReplication(secondaries); + + waitForPrimary(nodes[1]); + + jsTest.log("Do a write to the new primary"); + assert.writeOK(nodes[1].getDB(dbName).getCollection(collName).insert( + {a: 3}, {writeConcern: {w: 2, wtimeout: rst.kDefaultTimeoutMS}})); + + jsTest.log("Reconnect the old primary to the rest of the nodes"); + // Only allow the old primary to connect to the other nodes, not the other way around. + // This is so that the old priamry will detect that it needs to step down and step itself down, + // rather than one of the other nodes detecting this and sending it a replSetStepDown command, + // which would cause the old primary to kill all operations and close all connections, making + // the way that the insert in the parallel shell fails be nondeterministic. Rather than + // handling all possible failure modes in the parallel shell, allowing heartbeat connectivity in + // only one direction makes it easier for the test to fail deterministically. + nodes[1].acceptConnectionsFrom(nodes[0]); + nodes[2].acceptConnectionsFrom(nodes[0]); + + joinMajorityWriter(); + + // Allow the old primary to finish stepping down so that shutdown can finish. + var res = null; + try { + res = nodes[0].adminCommand({configureFailPoint: 'blockHeartbeatStepdown', mode: 'off'}); + } catch (e) { + // Expected - once we disable the fail point the stepdown will proceed and it's racy whether + // the stepdown closes all connections before or after the configureFailPoint command + // returns + } + if (res) { + assert.commandWorked(res); + } + + rst.stopSet(); +}()); diff --git a/jstests/replsets/write_concern_after_stepdown_and_stepup.js b/jstests/replsets/write_concern_after_stepdown_and_stepup.js new file mode 100644 index 00000000000..beb2bd16e0a --- /dev/null +++ b/jstests/replsets/write_concern_after_stepdown_and_stepup.js @@ -0,0 +1,126 @@ +/* + * Tests that heartbeats containing writes from a different branch of history can't cause a stale + * primary to incorrectly acknowledge a w:majority write that's about to be rolled back, even if the + * stale primary is re-elected primary before waiting for the write concern acknowledgement. + */ +(function() { + 'use strict'; + + var name = "writeConcernStepDownAndBackUp"; + var dbName = "wMajorityCheck"; + var collName = "stepdownAndBackUp"; + + var rst = new ReplSetTest( + {name: name, nodes: [{}, {}, {rsConfig: {priority: 0}}, ], useBridge: true}); + var nodes = rst.startSet(); + rst.initiate(); + + var timeout = 5 * 60 * 1000; + + function waitForState(node, state) { + assert.soonNoExcept(function() { + assert.commandWorked(node.adminCommand( + {replSetTest: 1, waitForMemberState: state, timeoutMillis: timeout})); + return true; + }); + } + + function waitForPrimary(node) { + assert.soon(function() { + return node.adminCommand('ismaster').ismaster; + }); + } + + function stepUp(node) { + var primary = rst.getPrimary(); + if (primary != node) { + assert.throws(function() { + primary.adminCommand({replSetStepDown: 60 * 5}); + }); + } + waitForPrimary(node); + } + + jsTestLog("Make sure node 0 is primary."); + stepUp(nodes[0]); + var primary = rst.getPrimary(); + var secondaries = rst.getSecondaries(); + assert.eq(nodes[0], primary); + // Wait for all data bearing nodes to get up to date. + assert.writeOK(nodes[0].getDB(dbName).getCollection(collName).insert( + {a: 1}, {writeConcern: {w: 3, wtimeout: timeout}})); + + // Stop the secondaries from replicating. + secondaries.forEach(function(node) { + assert.commandWorked( + node.adminCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'alwaysOn'})); + }); + // Stop the primary from calling into awaitReplication() + assert.commandWorked(nodes[0].adminCommand( + {configureFailPoint: 'hangBeforeWaitingForWriteConcern', mode: 'alwaysOn'})); + + jsTestLog("Do w:majority write that won't enter awaitReplication() until after the primary " + + "has stepped down and back up"); + var doMajorityWrite = function() { + assert.commandWorked(db.adminCommand({ismaster: 1})); + + assert.throws(function() { + db.getSiblingDB('wMajorityCheck') + .stepdownAndBackUp.insert({a: 2}, {writeConcern: {w: 'majority'}}); + }); + }; + + var joinMajorityWriter = startParallelShell(doMajorityWrite, nodes[0].port); + + jsTest.log("Disconnect primary from all secondaries"); + nodes[0].disconnect(nodes[1]); + nodes[0].disconnect(nodes[2]); + + jsTest.log("Wait for a new primary to be elected"); + // Allow the secondaries to replicate again. + secondaries.forEach(function(node) { + assert.commandWorked( + node.adminCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'off'})); + }); + + waitForPrimary(nodes[1]); + + jsTest.log("Do a write to the new primary"); + assert.writeOK(nodes[1].getDB(dbName).getCollection(collName).insert( + {a: 3}, {writeConcern: {w: 2, wtimeout: timeout}})); + + jsTest.log("Reconnect the old primary to the rest of the nodes"); + nodes[0].reconnect(nodes[1]); + nodes[0].reconnect(nodes[2]); + + jsTest.log("Wait for the old primary to step down, roll back its write, and apply the " + + "new writes from the new primary"); + waitForState(nodes[0], ReplSetTest.State.SECONDARY); + rst.awaitReplication(); + + // At this point all 3 nodes should have the same data + assert.soonNoExcept(function() { + nodes.forEach(function(node) { + assert.eq(null, + node.getDB(dbName).getCollection(collName).findOne({a: 2}), + "Node " + node.host + " contained op that should have been rolled back"); + assert.neq(null, + node.getDB(dbName).getCollection(collName).findOne({a: 3}), + "Node " + node.host + + " was missing op from branch of history that should have persisted"); + }); + return true; + }); + + jsTest.log("Make the original primary become primary once again"); + stepUp(nodes[0]); + + jsTest.log("Unblock the thread waiting for replication of the now rolled-back write, ensure " + + "that the write concern failed"); + assert.commandWorked(nodes[0].adminCommand( + {configureFailPoint: 'hangBeforeWaitingForWriteConcern', mode: 'off'})); + + joinMajorityWriter(); + + rst.stopSet(); +}()); diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 37ba22bf1a7..75a91db536d 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -117,7 +117,7 @@ Status checkRemoteOplogStart(stdx::function()> getNextOperat } // namespace -MONGO_FP_DECLARE(rsBgSyncProduce); +MONGO_FP_DECLARE(stopReplProducer); BackgroundSync* BackgroundSync::s_instance = 0; stdx::mutex BackgroundSync::s_mutex; @@ -133,6 +133,9 @@ static ServerStatusMetricField displayOpsRead("repl.network.ops", &op static Counter64 networkByteStats; static ServerStatusMetricField displayBytesRead("repl.network.bytes", &networkByteStats); +// Failpoint which causes rollback to hang before starting. +MONGO_FP_DECLARE(rollbackHangBeforeStart); + // The count of items in the buffer static Counter64 bufferCountGauge; static ServerStatusMetricField displayBufferCount("repl.buffer.count", @@ -262,6 +265,21 @@ void BackgroundSync::_producerThread() { } void BackgroundSync::_produce(OperationContext* txn) { + if (MONGO_FAIL_POINT(stopReplProducer)) { + // This log output is used in js tests so please leave it. + log() << "bgsync - stopReplProducer fail point " + "enabled. Blocking until fail point is disabled."; + + // TODO(SERVER-27120): Remove the return statement and uncomment the while loop. + // Currently we cannot block here or we prevent primaries from being fully elected since + // we'll never call _signalNoNewDataForApplier. + // while (MONGO_FAIL_POINT(stopReplProducer) && !inShutdown()) { + // mongo::sleepsecs(1); + // } + mongo::sleepsecs(1); + return; + } + // this oplog reader does not do a handshake because we don't want the server it's syncing // from to track how far it has synced { @@ -280,10 +298,6 @@ void BackgroundSync::_produce(OperationContext* txn) { } } - while (MONGO_FAIL_POINT(rsBgSyncProduce)) { - sleepmillis(0); - } - // find a target to sync from the last optime fetched OpTime lastOpTimeFetched; @@ -478,26 +492,6 @@ void BackgroundSync::_fetcherCallback(const StatusWith& bool syncSourceHasSyncSource = false; OpTime sourcesLastOp; - // Forward metadata (containing liveness information) to replication coordinator. - bool receivedMetadata = - queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName); - if (receivedMetadata) { - auto metadataResult = - rpc::ReplSetMetadata::readFromMetadata(queryResponse.otherFields.metadata); - if (!metadataResult.isOK()) { - error() << "invalid replication metadata from sync source " << source << ": " - << metadataResult.getStatus() << ": " << queryResponse.otherFields.metadata; - return; - } - const auto& metadata = metadataResult.getValue(); - _replCoord->processReplSetMetadata(metadata); - if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) { - _replCoord->cancelAndRescheduleElectionTimeout(); - } - syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1; - sourcesLastOp = metadata.getLastOpVisible(); - } - const auto& documents = queryResponse.documents; auto firstDocToApply = documents.cbegin(); auto lastDocToApply = documents.cend(); @@ -576,6 +570,32 @@ void BackgroundSync::_fetcherCallback(const StatusWith& return; } + if (MONGO_FAIL_POINT(stopReplProducer)) { + return; + } + + // Process replset metadata. It is important that this happen after we've validated the + // first batch, so we don't progress our knowledge of the commit point from a + // response that triggers a rollback. + bool receivedMetadata = + queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName); + if (receivedMetadata) { + auto metadataResult = + rpc::ReplSetMetadata::readFromMetadata(queryResponse.otherFields.metadata); + if (!metadataResult.isOK()) { + error() << "invalid replication metadata from sync source " << source << ": " + << metadataResult.getStatus() << ": " << queryResponse.otherFields.metadata; + return; + } + const auto& metadata = metadataResult.getValue(); + _replCoord->processReplSetMetadata(metadata, true /*advance commit point*/); + if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) { + _replCoord->cancelAndRescheduleElectionTimeout(); + } + syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1; + sourcesLastOp = metadata.getLastOpVisible(); + } + // The count of the bytes of the documents read off the network. int networkDocumentBytes = 0; Timestamp lastTS; @@ -741,6 +761,15 @@ void BackgroundSync::_rollback(OperationContext* txn, const HostAndPort& source, boost::optional requiredRBID, stdx::function getConnection) { + if (MONGO_FAIL_POINT(rollbackHangBeforeStart)) { + // This log output is used in js tests so please leave it. + log() << "rollback - rollbackHangBeforeStart fail point " + "enabled. Blocking until fail point is disabled."; + while (MONGO_FAIL_POINT(rollbackHangBeforeStart) && !inShutdown()) { + mongo::sleepsecs(1); + } + } + // Set state to ROLLBACK while we are in this function. This prevents serving reads, even from // the oplog. This can fail if we are elected PRIMARY, in which case we better not do any // rolling back. If we successfully enter ROLLBACK we will only exit this function fatally or diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h index 18ccf45a224..57a2c426817 100644 --- a/src/mongo/db/repl/replication_coordinator.h +++ b/src/mongo/db/repl/replication_coordinator.h @@ -466,14 +466,13 @@ public: virtual void processReplSetGetConfig(BSONObjBuilder* result) = 0; /** - * Processes the ReplSetMetadata returned from a command run against another replica set - * member and updates protocol version 1 information (most recent optime that is committed, - * member id of the current PRIMARY, the current config version and the current term). - * - * TODO(dannenberg): Move this method to be testing only if it does not end up being used - * to process the find and getmore metadata responses from the DataReplicator. + * Processes the ReplSetMetadata returned from a command run against another + * replica set member and so long as the config version in the metadata matches the replica set + * config version this node currently has, updates the current term and optionally updates + * this node's notion of the commit point. */ - virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) = 0; + virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata, + bool advanceCommitPoint) = 0; /** * Elections under protocol version 1 are triggered by a timer. diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 7c30cd983dd..67102a30033 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1445,10 +1445,38 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitRepl return StatusAndDuration(Status::OK(), Milliseconds(timer->millis())); } - if (replMode == modeReplSet && !_memberState.primary()) { - return StatusAndDuration( - Status(ErrorCodes::NotMaster, "Not master while waiting for replication"), - Milliseconds(timer->millis())); + auto checkForStepDown = [&]() -> Status { + if (replMode == modeReplSet && !_memberState.primary()) { + return {ErrorCodes::NotMaster, "Primary stepped down while waiting for replication"}; + } + + // Relax term checking under 3.2 because some commands (eg. createIndexes) might not return + // a term in the response metadata to mongos which may pass the no-term OpTime back to + // mongod eventually. + if (opTime.getTerm() != OpTime::kUninitializedTerm && + _cachedTerm != OpTime::kUninitializedTerm && opTime.getTerm() != _cachedTerm) { + return { + ErrorCodes::NotMaster, + str::stream() << "Term changed from " << opTime.getTerm() << " to " << _cachedTerm + << " while waiting for replication, indicating that this node must " + "have stepped down."}; + } + + if (_stepDownPending) { + return {ErrorCodes::NotMaster, + "Received stepdown request while waiting for replication"}; + } + return Status::OK(); + }; + + Status stepdownStatus = checkForStepDown(); + if (!stepdownStatus.isOK()) { + return StatusAndDuration(stepdownStatus, Milliseconds(timer->millis())); + } + + auto interruptStatus = txn->checkForInterruptNoAssert(); + if (!interruptStatus.isOK()) { + return StatusAndDuration(interruptStatus, Milliseconds(timer->millis())); } if (writeConcern.wMode.empty()) { @@ -1470,6 +1498,7 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitRepl return StatusAndDuration(interruptedStatus, elapsed); } + if (!waitInfo.master) { return StatusAndDuration(Status(ErrorCodes::NotMaster, "Not master anymore while waiting for replication" @@ -1506,6 +1535,11 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitRepl } else { condVar.wait_for(*lock, waitTime); } + + stepdownStatus = checkForStepDown(); + if (!stepdownStatus.isOK()) { + return StatusAndDuration(stepdownStatus, elapsed); + } } Status status = _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern); @@ -2001,10 +2035,11 @@ void ReplicationCoordinatorImpl::processReplSetGetConfig(BSONObjBuilder* result) result->append("config", _rsConfig.toBSON()); } -void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) { +void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata, + bool advanceCommitPoint) { EventHandle evh; - _scheduleWorkAndWaitForCompletion([this, &evh, &replMetadata](const CallbackArgs& args) { - evh = _processReplSetMetadata_incallback(replMetadata); + _scheduleWorkAndWaitForCompletion([&](const CallbackArgs& args) { + evh = _processReplSetMetadata_incallback(replMetadata, advanceCommitPoint); }); if (evh.isValid()) { _replExecutor.waitForEvent(evh); @@ -2017,11 +2052,13 @@ void ReplicationCoordinatorImpl::cancelAndRescheduleElectionTimeout() { } EventHandle ReplicationCoordinatorImpl::_processReplSetMetadata_incallback( - const rpc::ReplSetMetadata& replMetadata) { + const rpc::ReplSetMetadata& replMetadata, bool advanceCommitPoint) { if (replMetadata.getConfigVersion() != _rsConfig.getConfigVersion()) { return EventHandle(); } - _setLastCommittedOpTime(replMetadata.getLastOpCommitted()); + if (advanceCommitPoint) { + _setLastCommittedOpTime(replMetadata.getLastOpCommitted()); + } return _updateTerm_incallback(replMetadata.getTerm()); } @@ -2527,6 +2564,7 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() { info->condVar->notify_all(); } _canAcceptNonLocalWrites = false; + _stepDownPending = false; result = kActionCloseAllConnections; } else { result = kActionFollowerModeStateChange; @@ -3103,7 +3141,7 @@ bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& curre } void ReplicationCoordinatorImpl::_updateLastCommittedOpTime_inlock() { - if (!_getMemberState_inlock().primary()) { + if (!_getMemberState_inlock().primary() || _stepDownPending) { return; } @@ -3509,7 +3547,7 @@ EventHandle ReplicationCoordinatorImpl::_updateTerm_incallback( if (localUpdateTermResult == TopologyCoordinator::UpdateTermResult::kTriggerStepDown) { log() << "stepping down from primary, because a new term has begun: " << term; _topCoord->prepareForStepDown(); - return _stepDownStart(); + return _stepDownStart(false); } return EventHandle(); } diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h index 3c6bbaefdbb..e2b2bea1298 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.h +++ b/src/mongo/db/repl/replication_coordinator_impl.h @@ -217,7 +217,8 @@ public: virtual void processReplSetGetConfig(BSONObjBuilder* result) override; - virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) override; + virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata, + bool advanceCommitPoint) override; virtual void cancelAndRescheduleElectionTimeout() override; @@ -1024,7 +1025,10 @@ private: */ void _requestRemotePrimaryStepdown(const HostAndPort& target); - ReplicationExecutor::EventHandle _stepDownStart(); + /** + * Schedules stepdown to run with the global exclusive lock. + */ + ReplicationExecutor::EventHandle _stepDownStart(bool hasMutex); /** * Completes a step-down of the current node. Must be run with a global @@ -1063,9 +1067,11 @@ private: * Utility method that schedules or performs actions specified by a HeartbeatResponseAction * returned by a TopologyCoordinator::processHeartbeatResponse(V1) call with the given * value of "responseStatus". + * 'hasMutex' is true if the caller is holding _mutex. TODO(SERVER-27083): Remove this. */ void _handleHeartbeatResponseAction(const HeartbeatResponseAction& action, - const StatusWith& responseStatus); + const StatusWith& responseStatus, + bool hasMutex); /** * Bottom half of processHeartbeat(), which runs in the replication executor. @@ -1115,11 +1121,13 @@ private: /** * Callback that processes the ReplSetMetadata returned from a command run against another - * replica set member and updates protocol version 1 information (most recent optime that is - * committed, member id of the current PRIMARY, the current config version and the current term) + * replica set member and so long as the config version in the metadata matches the replica set + * config version this node currently has, updates the current term and optionally updates + * this node's notion of the commit point. * Returns the finish event which is invalid if the process has already finished. */ - EventHandle _processReplSetMetadata_incallback(const rpc::ReplSetMetadata& replMetadata); + EventHandle _processReplSetMetadata_incallback(const rpc::ReplSetMetadata& replMetadata, + bool advanceCommitPoint); /** * Blesses a snapshot to be used for new committed reads. @@ -1281,6 +1289,14 @@ private: // TODO: ideally this should only change on rollbacks NOT on mongod restarts also. int _rbid; // (M) + // Indicates that we've received a request to stepdown from PRIMARY (likely via a heartbeat) + // TODO(SERVER-27083): This bool is redundant of the same-named bool in TopologyCoordinatorImpl, + // but due to mutex ordering between _mutex and _topoMutex we can't inspect the + // TopologyCoordinator field in awaitReplication() where this bool is used. Once we get rid + // of topoMutex and start guarding access to the TopologyCoordinator via _mutex we should + // consolidate the two bools. + bool _stepDownPending = false; // (M) + // list of information about clients waiting on replication. Does *not* own the WaiterInfos. std::vector _replicationWaiterList; // (M) diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp index 4c23bab4f5b..f89f1592170 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp @@ -64,6 +64,8 @@ typedef ReplicationExecutor::CallbackHandle CBHandle; using executor::RemoteCommandRequest; +MONGO_FP_DECLARE(blockHeartbeatStepdown); + void ReplicationCoordinatorImpl::_doMemberHeartbeat(ReplicationExecutor::CallbackArgs cbData, const HostAndPort& target, int targetIndex) { @@ -152,7 +154,9 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( if (replMetadata.isOK()) { // Asynchronous stepdown could happen, but it will be queued in executor after // this function, so we cannot and don't need to wait for it to finish. - _processReplSetMetadata_incallback(replMetadata.getValue()); + // Arbiters are the only nodes allowed to advance their commit point via heartbeats. + bool advanceCommitPoint = getMemberState().arbiter(); + _processReplSetMetadata_incallback(replMetadata.getValue(), advanceCommitPoint); } } const Date_t now = _replExecutor.now(); @@ -164,10 +168,11 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( networkTime = cbData.response.getValue().elapsedMillis; // TODO(sz) Because the term is duplicated in ReplSetMetaData, we can get rid of this // and update tests. - _updateTerm_incallback(hbStatusResponse.getValue().getTerm()); - // Postpone election timeout if we have a successful heartbeat response from the primary. const auto& hbResponse = hbStatusResponse.getValue(); - if (hbResponse.hasState() && hbResponse.getState().primary()) { + _updateTerm_incallback(hbResponse.getTerm()); + // Postpone election timeout if we have a successful heartbeat response from the primary. + if (hbResponse.hasState() && hbResponse.getState().primary() && + hbResponse.getTerm() == _topCoord->getTerm()) { cancelAndRescheduleElectionTimeout(); } } else { @@ -206,7 +211,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse( _scheduleHeartbeatToTarget( target, targetIndex, std::max(now, action.getNextHeartbeatStartDate())); - _handleHeartbeatResponseAction(action, hbStatusResponse); + _handleHeartbeatResponseAction(action, hbStatusResponse, false /*we're not holding _mutex*/); } void ReplicationCoordinatorImpl::_updateOpTimesFromHeartbeat_inlock(int targetIndex, @@ -226,11 +231,13 @@ void ReplicationCoordinatorImpl::_updateOpTimesFromHeartbeat_inlock(int targetIn void ReplicationCoordinatorImpl::_handleHeartbeatResponseAction( const HeartbeatResponseAction& action, - const StatusWith& responseStatus) { + const StatusWith& responseStatus, + bool hasMutex) { switch (action.getAction()) { case HeartbeatResponseAction::NoAction: // Update the cached member state if different than the current topology member state if (_memberState != _topCoord->getMemberState()) { + invariant(!hasMutex); stdx::unique_lock lk(_mutex); const PostMemberStateUpdateAction postUpdateAction = _updateMemberStateFromTopologyCoordinator_inlock(); @@ -250,7 +257,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponseAction( log() << "Stepping down from primary in response to heartbeat"; _topCoord->prepareForStepDown(); // Don't need to wait for stepdown to finish. - _stepDownStart(); + _stepDownStart(hasMutex); break; case HeartbeatResponseAction::StepDownRemotePrimary: { invariant(action.getPrimaryConfigIndex() != _selfIndex); @@ -304,11 +311,19 @@ void ReplicationCoordinatorImpl::_requestRemotePrimaryStepdown(const HostAndPort } } -ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart() { +ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart(bool hasMutex) { + { + boost::optional> lk; + if (!hasMutex) { + lk.emplace(_mutex); + } + _stepDownPending = true; + } auto finishEvent = _makeEvent(); if (!finishEvent) { return finishEvent; } + _replExecutor.scheduleWorkWithGlobalExclusiveLock(stdx::bind( &ReplicationCoordinatorImpl::_stepDownFinish, this, stdx::placeholders::_1, finishEvent)); return finishEvent; @@ -321,6 +336,20 @@ void ReplicationCoordinatorImpl::_stepDownFinish( return; } invariant(cbData.txn); + + if (MONGO_FAIL_POINT(blockHeartbeatStepdown)) { + // Must reschedule rather than block so we don't take up threads in the replication + // executor. + sleepmillis(10); + _replExecutor.scheduleWorkWithGlobalExclusiveLock( + stdx::bind(&ReplicationCoordinatorImpl::_stepDownFinish, + this, + stdx::placeholders::_1, + finishedEvent)); + + return; + } + // TODO Add invariant that we've got global shared or global exclusive lock, when supported // by lock manager. stdx::unique_lock lk(_mutex); @@ -623,7 +652,9 @@ void ReplicationCoordinatorImpl::_handleLivenessTimeout( _topCoord->setMemberAsDown(now, memberIndex, _getMyLastDurableOpTime_inlock()); // Don't mind potential asynchronous stepdown as this is the last step of // liveness check. - _handleHeartbeatResponseAction(action, makeStatusWith()); + _handleHeartbeatResponseAction(action, + makeStatusWith(), + true /*we're holding _mutex*/); } } } diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp index 2233e21cc21..40a30c48e5a 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp @@ -351,7 +351,7 @@ TEST_F(ReplCoordHBV1Test, ArbiterRecordsCommittedOpTimeFromHeartbeatMetadata) { << 1 << "primaryIndex" << 1 << "term" << committedOpTime.getTerm() << "syncSourceIndex" << 1))); ASSERT_OK(metadata.getStatus()); - getReplCoord()->processReplSetMetadata(metadata.getValue()); + getReplCoord()->processReplSetMetadata(metadata.getValue(), true); ASSERT_EQ(getReplCoord()->getMyLastAppliedOpTime().getTimestamp(), expected.getTimestamp()); }; diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 9dfd6f11045..2a52f004700 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -44,6 +44,7 @@ #include "mongo/db/repl/optime.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/repl/read_concern_response.h" +#include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_set_heartbeat_args.h" #include "mongo/db/repl/repl_set_heartbeat_args_v1.h" #include "mongo/db/repl/repl_set_request_votes_args.h" @@ -81,15 +82,15 @@ using executor::RemoteCommandResponse; typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs; Status kInterruptedStatus(ErrorCodes::Interrupted, "operation was interrupted"); -// Helper class to wrap Timestamp as an OpTime with term 0. -struct OpTimeWithTermZero { - OpTimeWithTermZero(unsigned int sec, unsigned int i) : timestamp(sec, i) {} +// Helper class to wrap Timestamp as an OpTime with term 1. +struct OpTimeWithTermOne { + OpTimeWithTermOne(unsigned int sec, unsigned int i) : timestamp(sec, i) {} operator OpTime() const { - return OpTime(timestamp, 0); + return OpTime(timestamp, 1); } operator boost::optional() const { - return OpTime(timestamp, 0); + return OpTime(timestamp, 1); } OpTime asOpTime() const { @@ -674,7 +675,7 @@ TEST_F(ReplCoordTest, RollBackIDShouldIncreaseByOneWhenIncrementRollbackIDIsCall TEST_F(ReplCoordTest, NodeReturnsImmediatelyWhenAwaitReplicationIsRanAgainstAStandaloneNode) { init(""); OperationContextNoop txn; - OpTimeWithTermZero time(100, 1); + OpTimeWithTermOne time(100, 1); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoWaiting; @@ -692,7 +693,7 @@ TEST_F(ReplCoordTest, NodeReturnsImmediatelyWhenAwaitReplicationIsRanAgainstAMas settings.setMaster(true); init(settings); OperationContextNoop txn; - OpTimeWithTermZero time(100, 1); + OpTimeWithTermOne time(100, 1); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoWaiting; @@ -719,7 +720,7 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenRunningAwaitReplicationAgainstASec HostAndPort("node1", 12345)); OperationContextNoop txn; - OpTimeWithTermZero time(100, 1); + OpTimeWithTermOne time(100, 1); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoWaiting; @@ -732,7 +733,7 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenRunningAwaitReplicationAgainstASec ASSERT_EQUALS(ErrorCodes::NotMaster, statusAndDur.status); } -TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWithWZero) { +TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWithWTermOne) { assertStartSuccess(BSON("_id" << "mySet" << "version" << 2 << "members" @@ -747,7 +748,7 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWith HostAndPort("node1", 12345)); OperationContextNoop txn; - OpTimeWithTermZero time(100, 1); + OpTimeWithTermOne time(100, 1); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoWaiting; @@ -756,8 +757,8 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWith // Become primary. ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); ASSERT(getReplCoord()->getMemberState().primary()); @@ -787,12 +788,12 @@ TEST_F(ReplCoordTest, << "_id" << 3))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoWaiting; @@ -860,12 +861,12 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedUntilASufficientNumberOfNodes << "_id" << 3))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoWaiting; @@ -1056,6 +1057,8 @@ TEST_F( // another name if we didn't get a high enough one. } + auto zeroOpTimeInCurrentTerm = OpTime(Timestamp(0, 0), 1); + ReplClientInfo::forClient(txn.getClient()).setLastOp(zeroOpTimeInCurrentTerm); statusAndDur = getReplCoord()->awaitReplicationOfLastOpForClient(&txn, majorityWriteConcern); ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status); statusAndDur = getReplCoord()->awaitReplicationOfLastOpForClient(&txn, multiDCWriteConcern); @@ -1158,14 +1161,14 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenAWriteConcernWithNoTimeoutHasBeenSatisfie << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); ReplicationAwaiter awaiter(getReplCoord(), &txn); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoTimeout; @@ -1217,14 +1220,14 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedWhenAWriteConcernTimesOutBefo << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); ReplicationAwaiter awaiter(getReplCoord(), &txn); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); WriteConcernOptions writeConcern; writeConcern.wTimeout = 50; @@ -1258,14 +1261,14 @@ TEST_F(ReplCoordTest, << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); ReplicationAwaiter awaiter(getReplCoord(), &txn); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoTimeout; @@ -1300,14 +1303,14 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenSteppingDownBeforeSatisfyingAWrite << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); ReplicationAwaiter awaiter(getReplCoord(), &txn); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoTimeout; @@ -1340,14 +1343,14 @@ TEST_F(ReplCoordTest, << "node3"))), HostAndPort("node1")); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); ReplicationAwaiter awaiter(getReplCoord(), &txn); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); WriteConcernOptions writeConcern; writeConcern.wTimeout = WriteConcernOptions::kNoTimeout; @@ -1523,7 +1526,7 @@ TEST_F(ReplCoordTest, ConcurrentStepDownShouldNotSignalTheSameFinishEventMoreTha TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) { OperationContextReplMock txn; - OpTimeWithTermZero optime1(100, 1); + OpTimeWithTermOne optime1(100, 1); // All nodes are caught up getReplCoord()->setMyLastAppliedOpTime(optime1); getReplCoord()->setMyLastDurableOpTime(optime1); @@ -1538,7 +1541,7 @@ TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) { TEST_F(StepDownTest, NodeReturnsExceededTimeLimitWhenStepDownFailsToObtainTheGlobalLockWithinTheAllottedTime) { OperationContextReplMock txn; - OpTimeWithTermZero optime1(100, 1); + OpTimeWithTermOne optime1(100, 1); // All nodes are caught up getReplCoord()->setMyLastAppliedOpTime(optime1); getReplCoord()->setMyLastDurableOpTime(optime1); @@ -1759,8 +1762,8 @@ TEST_F(ReplCoordTest, NodeBecomesPrimaryAgainWhenStepDownTimeoutExpiresInASingle TEST_F(StepDownTest, NodeReturnsExceededTimeLimitWhenNoSecondaryIsCaughtUpWithinStepDownsSecondaryCatchUpPeriod) { OperationContextReplMock txn; - OpTimeWithTermZero optime1(100, 1); - OpTimeWithTermZero optime2(100, 2); + OpTimeWithTermOne optime1(100, 1); + OpTimeWithTermOne optime2(100, 2); // No secondary is caught up auto repl = getReplCoord(); repl->setMyLastAppliedOpTime(optime2); @@ -1941,8 +1944,8 @@ TEST_F(StepDownTest, TEST_F(StepDownTest, NodeReturnsInterruptedWhenInterruptedDuringStepDown) { const unsigned int opID = 100; OperationContextReplMock txn{opID}; - OpTimeWithTermZero optime1(100, 1); - OpTimeWithTermZero optime2(100, 2); + OpTimeWithTermOne optime1(100, 1); + OpTimeWithTermOne optime2(100, 2); // No secondary is caught up auto repl = getReplCoord(); repl->setMyLastAppliedOpTime(optime2); @@ -2092,9 +2095,9 @@ TEST_F(ReplCoordTest, NodeIncludesOtherMembersProgressInOldUpdatePositionCommand << "test2:1234") << BSON("_id" << 2 << "host" << "test3:1234"))), HostAndPort("test1", 1234)); - OpTimeWithTermZero optime1(100, 1); - OpTimeWithTermZero optime2(100, 2); - OpTimeWithTermZero optime3(2, 1); + OpTimeWithTermOne optime1(100, 1); + OpTimeWithTermOne optime2(100, 2); + OpTimeWithTermOne optime3(2, 1); getReplCoord()->setMyLastAppliedOpTime(optime1); getReplCoord()->setMyLastDurableOpTime(optime1); ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optime2)); @@ -2124,7 +2127,7 @@ TEST_F(ReplCoordTest, NodeIncludesOtherMembersProgressInOldUpdatePositionCommand ASSERT_EQUALS(2, memberId); ASSERT_EQUALS(optime3.timestamp, entry["optime"]["ts"].timestamp()); } - ASSERT_EQUALS(0, entry["optime"]["t"].Number()); + ASSERT_EQUALS(1, entry["optime"]["t"].Number()); } ASSERT_EQUALS(3U, memberIds.size()); // Make sure we saw all 3 nodes } @@ -2144,8 +2147,8 @@ TEST_F(ReplCoordTest, HostAndPort("test2", 1234)); OperationContextNoop txn; getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); // Can't unset maintenance mode if it was never set to begin with. Status status = getReplCoord()->setMaintenanceMode(false); @@ -2168,8 +2171,8 @@ TEST_F(ReplCoordTest, HostAndPort("test2", 1234)); OperationContextNoop txn; getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); // valid set ASSERT_OK(getReplCoord()->setMaintenanceMode(true)); ASSERT_TRUE(getReplCoord()->getMemberState().recovering()); @@ -2197,8 +2200,8 @@ TEST_F(ReplCoordTest, AllowAsManyUnsetMaintenanceModesAsThereHaveBeenSetMaintena HostAndPort("test2", 1234)); OperationContextNoop txn; getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); // Can set multiple times ASSERT_OK(getReplCoord()->setMaintenanceMode(true)); ASSERT_OK(getReplCoord()->setMaintenanceMode(true)); @@ -2228,8 +2231,8 @@ TEST_F(ReplCoordTest, SettingAndUnsettingMaintenanceModeShouldNotAffectRollbackS HostAndPort("test2", 1234)); OperationContextNoop txn; getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); // From rollback, entering and exiting maintenance mode doesn't change perceived // state. @@ -2267,8 +2270,8 @@ TEST_F(ReplCoordTest, DoNotAllowMaintenanceModeWhilePrimary) { HostAndPort("test2", 1234)); OperationContextNoop txn; getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); // Can't modify maintenance mode when PRIMARY simulateSuccessfulV1Election(); @@ -2300,8 +2303,8 @@ TEST_F(ReplCoordTest, DoNotAllowSettingMaintenanceModeWhileConductingAnElection) HostAndPort("test2", 1234)); OperationContextNoop txn; getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); // TODO this election shouldn't have to happen. simulateSuccessfulV1Election(); @@ -2360,8 +2363,8 @@ TEST_F(ReplCoordTest, HostAndPort("node1", 12345)); OperationContextNoop txn; - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); getReplCoord()->setMyLastAppliedOpTime(time2); getReplCoord()->setMyLastDurableOpTime(time2); @@ -2404,8 +2407,8 @@ TEST_F(ReplCoordTest, HostAndPort("node1", 12345)); OperationContextNoop txn; - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); getReplCoord()->setMyLastAppliedOpTime(time2); getReplCoord()->setMyLastDurableOpTime(time2); @@ -2434,8 +2437,8 @@ TEST_F(ReplCoordTest, NodeDoesNotIncludeItselfWhenRunningGetHostsWrittenToInMast OperationContextNoop txn; OID client = OID::gen(); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); getExternalState()->setClientHostAndPort(clientHost); HandshakeArgs handshake; @@ -2584,12 +2587,12 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenUpdatePositionContainsInfoAboutSelf) { << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); - OpTime time1({100, 1}, 2); - OpTime time2({100, 2}, 2); + OpTime time1({100, 1}, 1); + OpTime time2({100, 2}, 1); getReplCoord()->setMyLastAppliedOpTime(time1); getReplCoord()->setMyLastDurableOpTime(time1); @@ -2630,13 +2633,13 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenOldUpdatePositionContainsInfoAboutSelf << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); - OpTimeWithTermZero staleTime(10, 0); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); + OpTimeWithTermOne staleTime(10, 0); getReplCoord()->setMyLastAppliedOpTime(time1); getReplCoord()->setMyLastDurableOpTime(time1); @@ -2674,12 +2677,12 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionWhenItsConfigVersionIsIncorrect) << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); - OpTime time1({100, 1}, 3); - OpTime time2({100, 2}, 3); + OpTime time1({100, 1}, 1); + OpTime time2({100, 2}, 1); getReplCoord()->setMyLastAppliedOpTime(time1); getReplCoord()->setMyLastDurableOpTime(time1); @@ -2719,13 +2722,13 @@ TEST_F(ReplCoordTest, DoNotProcessOldUpdatePositionWhenItsConfigVersionIsIncorre << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); - OpTimeWithTermZero staleTime(10, 0); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); + OpTimeWithTermOne staleTime(10, 0); getReplCoord()->setMyLastAppliedOpTime(time1); getReplCoord()->setMyLastDurableOpTime(time1); @@ -2762,12 +2765,12 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionOfMembersWhoseIdsAreNotInTheConf << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); - OpTime time1({100, 1}, 2); - OpTime time2({100, 2}, 2); + OpTime time1({100, 1}, 1); + OpTime time2({100, 2}, 1); getReplCoord()->setMyLastAppliedOpTime(time1); getReplCoord()->setMyLastDurableOpTime(time1); @@ -2805,13 +2808,13 @@ TEST_F(ReplCoordTest, DoNotProcessOldUpdatePositionOfMembersWhoseIdsAreNotInTheC << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); - OpTimeWithTermZero staleTime(10, 0); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); + OpTimeWithTermOne staleTime(10, 0); getReplCoord()->setMyLastAppliedOpTime(time1); getReplCoord()->setMyLastDurableOpTime(time1); @@ -2847,13 +2850,13 @@ TEST_F(ReplCoordTest, << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); simulateSuccessfulV1Election(); - OpTimeWithTermZero time1(100, 1); - OpTimeWithTermZero time2(100, 2); - OpTimeWithTermZero staleTime(10, 0); + OpTimeWithTermOne time1(100, 1); + OpTimeWithTermOne time2(100, 2); + OpTimeWithTermOne staleTime(10, 0); getReplCoord()->setMyLastAppliedOpTime(time1); getReplCoord()->setMyLastDurableOpTime(time1); @@ -2912,11 +2915,11 @@ TEST_F(ReplCoordTest, AwaitReplicationShouldResolveAsNormalDuringAReconfig) { << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 2)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 2)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 2)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 2)); simulateSuccessfulV1Election(); - OpTimeWithTermZero time(100, 2); + OpTimeWithTermOne time(100, 2); // 3 nodes waiting for time WriteConcernOptions writeConcern; @@ -2994,11 +2997,11 @@ TEST_F( << "_id" << 2))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 2)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 2)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 2)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 2)); simulateSuccessfulV1Election(); - OpTimeWithTermZero time(100, 2); + OpTimeWithTermOne time(100, 2); // 3 nodes waiting for time WriteConcernOptions writeConcern; @@ -3055,8 +3058,8 @@ TEST_F(ReplCoordTest, << "_id" << 4))), HostAndPort("node1", 12345)); ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 1)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 1)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 1)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 1)); simulateSuccessfulV1Election(); OpTime time(Timestamp(100, 2), 1); @@ -3227,13 +3230,13 @@ TEST_F(ReplCoordTest, NodeReturnsShutdownInProgressWhenWaitingUntilAnOpTimeDurin << "_id" << 0))), HostAndPort("node1", 12345)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(10, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(10, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(10, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(10, 0)); shutdown(); auto result = getReplCoord()->waitUntilOpTime( - &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern)); + &txn, ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern)); ASSERT_TRUE(result.didWait()); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result.getStatus()); @@ -3248,13 +3251,13 @@ TEST_F(ReplCoordTest, NodeReturnsInterruptedWhenWaitingUntilAnOpTimeIsInterrupte << "_id" << 0))), HostAndPort("node1", 12345)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(10, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(10, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(10, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(10, 0)); txn.setCheckForInterruptStatus(Status(ErrorCodes::Interrupted, "test")); auto result = getReplCoord()->waitUntilOpTime( - &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern)); + &txn, ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern)); ASSERT_TRUE(result.didWait()); ASSERT_EQUALS(ErrorCodes::Interrupted, result.getStatus()); @@ -3284,10 +3287,10 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTi << "_id" << 0))), HostAndPort("node1", 12345)); - getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0)); - getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0)); + getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0)); + getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0)); auto result = getReplCoord()->waitUntilOpTime( - &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern)); + &txn, ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern)); ASSERT_TRUE(result.didWait()); ASSERT_OK(result.getStatus()); @@ -3303,7 +3306,7 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTi HostAndPort("node1", 12345)); - OpTimeWithTermZero time(100, 0); + OpTimeWithTermOne time(100, 0); getReplCoord()->setMyLastAppliedOpTime(time); getReplCoord()->setMyLastDurableOpTime(time); auto result = getReplCoord()->waitUntilOpTime( @@ -3318,7 +3321,7 @@ TEST_F(ReplCoordTest, init(ReplSettings()); OperationContextNoop txn; auto result = getReplCoord()->waitUntilOpTime( - &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern)); + &txn, ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern)); ASSERT_FALSE(result.didWait()); ASSERT_EQUALS(ErrorCodes::NotAReplicaSet, result.getStatus()); @@ -3505,7 +3508,7 @@ TEST_F(ReplCoordTest, IgnoreTheContentsOfMetadataWhenItsConfigVersionDoesNotMatc "lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastOpVisible" << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "configVersion" << 1 << "primaryIndex" << 2 << "term" << 2 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata.getValue()); + getReplCoord()->processReplSetMetadata(metadata.getValue(), true); ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); // higher configVersion @@ -3515,7 +3518,7 @@ TEST_F(ReplCoordTest, IgnoreTheContentsOfMetadataWhenItsConfigVersionDoesNotMatc << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastOpVisible" << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "configVersion" << 100 << "primaryIndex" << 2 << "term" << 2 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata2.getValue()); + getReplCoord()->processReplSetMetadata(metadata2.getValue(), true); ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); } @@ -3550,7 +3553,7 @@ TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeFromMet "lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 1) << "lastOpVisible" << BSON("ts" << Timestamp(10, 0) << "t" << 1) << "configVersion" << 2 << "primaryIndex" << 2 << "term" << 1 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata.getValue()); + getReplCoord()->processReplSetMetadata(metadata.getValue(), true); ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getLastCommittedOpTime()); ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getCurrentCommittedSnapshotOpTime()); @@ -3560,7 +3563,7 @@ TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeFromMet "lastOpCommitted" << BSON("ts" << Timestamp(9, 0) << "t" << 1) << "lastOpVisible" << BSON("ts" << Timestamp(9, 0) << "t" << 1) << "configVersion" << 2 << "primaryIndex" << 2 << "term" << 1 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata2.getValue()); + getReplCoord()->processReplSetMetadata(metadata2.getValue(), true); ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getLastCommittedOpTime()); } @@ -3591,7 +3594,7 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr "lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastOpVisible" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "configVersion" << 2 << "primaryIndex" << 2 << "term" << 3 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata.getValue()); + getReplCoord()->processReplSetMetadata(metadata.getValue(), true); ASSERT_EQUALS(OpTime(Timestamp(10, 0), 3), getReplCoord()->getLastCommittedOpTime()); ASSERT_EQUALS(3, getReplCoord()->getTerm()); ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex()); @@ -3602,7 +3605,7 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr "lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastOpVisible" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "configVersion" << 2 << "primaryIndex" << 1 << "term" << 2 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata2.getValue()); + getReplCoord()->processReplSetMetadata(metadata2.getValue(), true); ASSERT_EQUALS(OpTime(Timestamp(11, 0), 3), getReplCoord()->getLastCommittedOpTime()); ASSERT_EQUALS(3, getReplCoord()->getTerm()); ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex()); @@ -3613,14 +3616,14 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr "lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastOpVisible" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "configVersion" << 2 << "primaryIndex" << 1 << "term" << 3 << "syncSourceIndex" << 1))); - getReplCoord()->processReplSetMetadata(metadata3.getValue()); + getReplCoord()->processReplSetMetadata(metadata3.getValue(), true); ASSERT_EQUALS(OpTime(Timestamp(11, 0), 3), getReplCoord()->getLastCommittedOpTime()); ASSERT_EQUALS(3, getReplCoord()->getTerm()); ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex()); } TEST_F(ReplCoordTest, - TermAndLastCommittedOpTimeUpdateWhenHeartbeatResponseWithMetadataHasFresherValues) { + LastCommittedOpTimeNotUpdatedEvenWhenHeartbeatResponseWithMetadataHasFresherValues) { // Ensure that the metadata is processed if it is contained in a heartbeat response. assertStartSuccess(BSON("_id" << "mySet" @@ -3640,7 +3643,61 @@ TEST_F(ReplCoordTest, auto replCoord = getReplCoord(); auto config = replCoord->getConfig(); - // Higher term - should update term and lastCommittedOpTime. + // Higher term - should update term but not last committed optime. + StatusWith metadata = rpc::ReplSetMetadata::readFromMetadata(BSON( + rpc::kReplSetMetadataFieldName + << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastOpVisible" + << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "configVersion" + << config.getConfigVersion() << "primaryIndex" << 1 << "term" << 3 + << "syncSourceIndex" << 1))); + BSONObjBuilder metadataBuilder; + ASSERT_OK(metadata.getValue().writeToMetadata(&metadataBuilder)); + auto metadataObj = metadataBuilder.obj(); + + auto net = getNet(); + net->enterNetwork(); + + ASSERT_TRUE(net->hasReadyRequests()); + auto noi = net->getNextReadyRequest(); + const auto& request = noi->getRequest(); + ASSERT_EQUALS(HostAndPort("node2", 12345), request.target); + ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData()); + + ReplSetHeartbeatResponse hbResp; + hbResp.setConfigVersion(config.getConfigVersion()); + hbResp.setSetName(config.getReplSetName()); + hbResp.setState(MemberState::RS_SECONDARY); + net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON(true), metadataObj)); + net->runReadyNetworkOperations(); + net->exitNetwork(); + + ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); + ASSERT_EQUALS(3, getReplCoord()->getTerm()); + ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex()); +} + +TEST_F(ReplCoordTest, TermAndLastCommittedOpTimeUpdatedFromHeartbeatWhenArbiter) { + // Ensure that the metadata is processed if it is contained in a heartbeat response. + assertStartSuccess(BSON("_id" + << "mySet" + << "version" << 2 << "members" + << BSON_ARRAY(BSON("host" + << "node1:12345" + << "_id" << 0 << "arbiterOnly" << true) + << BSON("host" + << "node2:12345" + << "_id" << 1)) << "protocolVersion" << 1), + HostAndPort("node1", 12345)); + ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime()); + OperationContextNoop txn; + getReplCoord()->updateTerm(&txn, 1); + ASSERT_EQUALS(1, getReplCoord()->getTerm()); + + auto replCoord = getReplCoord(); + auto config = replCoord->getConfig(); + + // Higher term - should update term and lastCommittedOpTime since arbiters learn of the + // commit point via heartbeats. StatusWith metadata = rpc::ReplSetMetadata::readFromMetadata(BSON( rpc::kReplSetMetadataFieldName << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastOpVisible" @@ -3836,7 +3893,7 @@ TEST_F(ReplCoordTest, } TEST_F(ReplCoordTest, - CancelAndRescheduleElectionTimeoutWhenProcessingHeartbeatResponseFromPrimary) { + RescheduleElectionTimeoutWhenProcessingHeartbeatResponseFromPrimaryInSameTerm) { assertStartSuccess(BSON("_id" << "mySet" << "protocolVersion" << 1 << "version" << 2 << "members" @@ -3868,6 +3925,8 @@ TEST_F(ReplCoordTest, ReplSetHeartbeatResponse hbResp; hbResp.setSetName("mySet"); hbResp.setState(MemberState::RS_PRIMARY); + hbResp.setTerm(replCoord->getTerm()); + // Heartbeat response is scheduled with a delay so that we can be sure that // the election was rescheduled due to the heartbeat response. auto heartbeatWhen = net->now() + Seconds(1); @@ -3881,6 +3940,54 @@ TEST_F(ReplCoordTest, replCoord->getElectionTimeout_forTest()); } +TEST_F(ReplCoordTest, + DontRescheduleElectionTimeoutWhenProcessingHeartbeatResponseFromPrimaryInDiffertTerm) { + assertStartSuccess(BSON("_id" + << "mySet" + << "protocolVersion" << 1 << "version" << 2 << "members" + << BSON_ARRAY(BSON("host" + << "node1:12345" + << "_id" << 0) + << BSON("host" + << "node2:12345" + << "_id" << 1))), + HostAndPort("node1", 12345)); + + ReplicationCoordinatorImpl* replCoord = getReplCoord(); + ASSERT_TRUE(replCoord->setFollowerMode(MemberState::RS_SECONDARY)); + + auto electionTimeoutWhen = replCoord->getElectionTimeout_forTest(); + ASSERT_NOT_EQUALS(Date_t(), electionTimeoutWhen); + + auto net = getNet(); + net->enterNetwork(); + ASSERT_TRUE(net->hasReadyRequests()); + auto noi = net->getNextReadyRequest(); + auto&& request = noi->getRequest(); + log() << "processing " << request.cmdObj; + ASSERT_EQUALS(HostAndPort("node2", 12345), request.target); + + ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData()); + + // Respond to node1's heartbeat command to indicate that node2 is PRIMARY. + ReplSetHeartbeatResponse hbResp; + hbResp.setSetName("mySet"); + hbResp.setState(MemberState::RS_PRIMARY); + hbResp.setTerm(replCoord->getTerm() - 1); + + // Heartbeat response is scheduled with a delay so that we can be sure that + // the election was rescheduled due to the heartbeat response. + auto heartbeatWhen = net->now() + Seconds(1); + net->scheduleResponse(noi, heartbeatWhen, makeResponseStatus(hbResp.toBSON(true))); + net->runUntil(heartbeatWhen); + ASSERT_EQUALS(heartbeatWhen, net->now()); + net->runReadyNetworkOperations(); + net->exitNetwork(); + + ASSERT_GREATER_THAN(heartbeatWhen + replCoord->getConfig().getElectionTimeoutPeriod(), + replCoord->getElectionTimeout_forTest()); +} + TEST_F(ReplCoordTest, CancelAndRescheduleElectionTimeoutWhenProcessingHeartbeatResponseWithoutState) { assertStartSuccess(BSON("_id" @@ -4241,7 +4348,7 @@ TEST_F(ReplCoordTest, NewStyleUpdatePositionCmdHasMetadata) { // Set last committed optime via metadata. rpc::ReplSetMetadata syncSourceMetadata(optime.getTerm(), optime, optime, 1, OID(), -1, 1); - getReplCoord()->processReplSetMetadata(syncSourceMetadata); + getReplCoord()->processReplSetMetadata(syncSourceMetadata, true); getReplCoord()->onSnapshotCreate(optime, SnapshotName(1)); BSONObjBuilder cmdBuilder; diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index cb763bed4ff..df4a83c6f07 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -236,7 +236,8 @@ void ReplicationCoordinatorMock::processReplSetGetConfig(BSONObjBuilder* result) // TODO } -void ReplicationCoordinatorMock::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) {} +void ReplicationCoordinatorMock::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata, + bool advanceCommitPoint) {} void ReplicationCoordinatorMock::cancelAndRescheduleElectionTimeout() {} diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 861799f00d2..24eba706cfe 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -150,7 +150,8 @@ public: virtual void processReplSetGetConfig(BSONObjBuilder* result); - virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata); + void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata, + bool advanceCommitPoint) override; virtual void cancelAndRescheduleElectionTimeout() override; diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp index e6d90aa82ec..0c202e906a1 100644 --- a/src/mongo/db/repl/replset_commands.cpp +++ b/src/mongo/db/repl/replset_commands.cpp @@ -681,7 +681,7 @@ public: // New style update position command has metadata, which may inform the // upstream of a higher term. auto metadata = metadataResult.getValue(); - replCoord->processReplSetMetadata(metadata); + replCoord->processReplSetMetadata(metadata, false /*don't advance the commit point*/); } // In the case of an update from a member with an invalid replica set config, diff --git a/src/mongo/db/write_concern.cpp b/src/mongo/db/write_concern.cpp index 3c9086ca39a..9dc04670dc4 100644 --- a/src/mongo/db/write_concern.cpp +++ b/src/mongo/db/write_concern.cpp @@ -45,6 +45,7 @@ #include "mongo/db/storage/storage_engine.h" #include "mongo/db/write_concern_options.h" #include "mongo/rpc/protocol.h" +#include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" namespace mongo { @@ -72,6 +73,8 @@ namespace { const std::string kLocalDB = "local"; } // namespace +MONGO_FP_DECLARE(hangBeforeWaitingForWriteConcern); + StatusWith extractWriteConcern(OperationContext* txn, const BSONObj& cmdObj, const std::string& dbName) { @@ -234,6 +237,8 @@ Status waitForWriteConcern(OperationContext* txn, // This check does not hold for writes done through dbeval because it runs with a global X lock. dassert(!txn->lockState()->isLocked() || txn->getClient()->isInDirectClient()); + MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeWaitingForWriteConcern); + // Next handle blocking on disk Timer syncTimer; diff --git a/src/mongo/shell/assert.js b/src/mongo/shell/assert.js index 0d1e225a990..ba9333fb9cf 100644 --- a/src/mongo/shell/assert.js +++ b/src/mongo/shell/assert.js @@ -452,16 +452,33 @@ assert.writeOK = function(res, msg) { }; assert.writeError = function(res, msg) { + return assert.writeErrorWithCode(res, null, msg); +}; + +assert.writeErrorWithCode = function(res, expectedCode, msg) { var errMsg = null; + var foundCode = null; if (res instanceof WriteResult) { - if (!res.hasWriteError() && !res.hasWriteConcernError()) { + if (res.hasWriteError()) { + foundCode = res.getWriteError().code; + } else if (res.hasWriteConcernError()) { + foundCode = res.getWriteConcernError().code; + } else { errMsg = "no write error: " + tojson(res); } } else if (res instanceof BulkWriteResult) { // Can only happen with bulk inserts - if (!res.hasWriteErrors() && !res.hasWriteConcernError()) { + if (res.hasWriteErrors()) { + if (res.getWriteErrorCount() > 1 && expectedCode != null) { + errMsg = "can't check for specific code when there was more than one write error"; + } else { + foundCode = res.getWriteErrorAt(0).code; + } + } else if (res.hasWriteConcernError()) { + foundCode = res.getWriteConcernError().code; + } else { errMsg = "no write errors: " + tojson(res); } } else if (res instanceof WriteCommandError) { @@ -473,6 +490,12 @@ assert.writeError = function(res, msg) { } } + if (!errMsg && expectedCode) { + if (foundCode != expectedCode) { + errMsg = "found code " + foundCode + " does not match expected code " + expectedCode; + } + } + if (errMsg) { if (msg) errMsg = errMsg + ": " + msg; -- cgit v1.2.1