summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2017-01-24 14:06:46 -0500
committerBenety Goh <benety@mongodb.com>2017-01-24 17:09:46 -0500
commit4a6efad4d422b9a06ff0b7e98bfc9b7cc63b5864 (patch)
tree4db363699df81580a546eb41b3e2bc43e431c177
parent76af3d246d482d62520b386e5c1f0b777c367fc6 (diff)
downloadmongo-4a6efad4d422b9a06ff0b7e98bfc9b7cc63b5864.tar.gz
SERVER-27123 Only update the commit point as a secondary from oplog queries against your sync source
(cherry picked from commit 87f49488f1b5c872daa71fd2fd9b5d744409a817) SERVER-27680 Merge stopOplogFetcher and pauseRsBgSyncProducer failpoint into single stopReplProducer failpoint (cherry picked from commit 21948042b6da5fb5bf15897f9808a70551f5af09) SERVER-27053 Don't acknowledge writes if the term has changed. (cherry picked from commit 8347e322cd46e8ee847e1730a7e94ea8e3981c53)
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_legacy.yml2
-rw-r--r--jstests/libs/check_log.js67
-rw-r--r--jstests/libs/write_concern_util.js39
-rw-r--r--jstests/replsets/disallow_adding_initialized_node1.js15
-rw-r--r--jstests/replsets/disallow_adding_initialized_node2.js17
-rw-r--r--jstests/replsets/double_rollback.js22
-rw-r--r--jstests/replsets/read_after_optime.js29
-rw-r--r--jstests/replsets/read_committed_after_rollback.js6
-rw-r--r--jstests/replsets/read_committed_stale_history.js159
-rw-r--r--jstests/replsets/rslib.js141
-rw-r--r--jstests/replsets/server8070.js6
-rw-r--r--jstests/replsets/write_concern_after_stepdown.js101
-rw-r--r--jstests/replsets/write_concern_after_stepdown_and_stepup.js126
-rw-r--r--src/mongo/db/repl/bgsync.cpp79
-rw-r--r--src/mongo/db/repl/replication_coordinator.h13
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp60
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h28
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp49
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp367
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp3
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h3
-rw-r--r--src/mongo/db/repl/replset_commands.cpp2
-rw-r--r--src/mongo/db/write_concern.cpp5
-rw-r--r--src/mongo/shell/assert.js27
25 files changed, 1093 insertions, 275 deletions
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml b/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml
index 5a75af572a0..0fcf3cdfe7a 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_legacy.yml
@@ -17,6 +17,8 @@ selector:
- jstests/replsets/sync2.js
# PV0 does not persist the last vote
- jstests/replsets/last_vote.js
+ # PV0's w:majority guarantees aren't strong enough for this test to pass.
+ - jstests/replsets/write_concern_after_stepdown_and_stepup.js
executor:
js_test:
diff --git a/jstests/libs/check_log.js b/jstests/libs/check_log.js
new file mode 100644
index 00000000000..81bab58ce2a
--- /dev/null
+++ b/jstests/libs/check_log.js
@@ -0,0 +1,67 @@
+/*
+ * Helper functions which connect to a server, and check its logs for particular strings.
+ */
+var checkLog;
+
+(function() {
+ "use strict";
+
+ if (checkLog) {
+ return; // Protect against this file being double-loaded.
+ }
+
+ checkLog = (function() {
+ /*
+ * Calls the 'getLog' function at regular intervals on the provided connection 'conn' until
+ * the provided 'msg' is found in the logs, or 5 minutes have elapsed. Throws an exception
+ * on timeout.
+ */
+ var contains = function(conn, msg) {
+ assert.soon(
+ function() {
+ var logMessages =
+ assert.commandWorked(conn.adminCommand({getLog: 'global'})).log;
+ for (var i = 0; i < logMessages.length; i++) {
+ if (logMessages[i].indexOf(msg) != -1) {
+ return true;
+ }
+ }
+ return false;
+ },
+ 'Could not find log entries containing the following message: ' + msg,
+ 5 * 60 * 1000,
+ 300);
+ };
+
+ /*
+ * Calls the 'getLog' function at regular intervals on the provided connection 'conn' until
+ * the provided 'msg' is found in the logs exactly 'expectedCount' times, or 5 minutes have
+ * elapsed.
+ * Throws an exception on timeout.
+ */
+ var containsWithCount = function(conn, msg, expectedCount) {
+ var count = 0;
+ assert.soon(
+ function() {
+ var logMessages =
+ assert.commandWorked(conn.adminCommand({getLog: 'global'})).log;
+ for (var i = 0; i < logMessages.length; i++) {
+ if (logMessages[i].indexOf(msg) != -1) {
+ count++;
+ }
+ }
+
+ return expectedCount === count;
+ },
+ 'Expected ' + expectedCount + ', but instead saw ' + count +
+ ' log entries containing the following message: ' + msg,
+ 5 * 60 * 1000,
+ 300);
+ };
+
+ return {
+ contains: contains,
+ containsWithCount: containsWithCount
+ };
+ })();
+})();
diff --git a/jstests/libs/write_concern_util.js b/jstests/libs/write_concern_util.js
index bfbd3b024a0..162a73739c5 100644
--- a/jstests/libs/write_concern_util.js
+++ b/jstests/libs/write_concern_util.js
@@ -1,18 +1,29 @@
/**
* Utilities for testing writeConcern.
*/
-// Stops replication at a server.
+
+load("jstests/libs/check_log.js");
+
+// Stops replication on the given server(s).
function stopServerReplication(conn) {
- var errMsg = 'Failed to enable rsSyncApplyStop failpoint.';
+ if (conn.length) {
+ conn.forEach(function(n) {
+ stopServerReplication(n);
+ });
+ return;
+ }
+ var errMsg = 'Failed to enable stopReplProducer failpoint.';
assert.commandWorked(
- conn.getDB('admin').runCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'alwaysOn'}),
+ conn.getDB('admin').runCommand({configureFailPoint: 'stopReplProducer', mode: 'alwaysOn'}),
errMsg);
+
+ // Wait until the fail point is actually hit.
+ checkLog.contains(conn, 'bgsync - stopReplProducer fail point enabled');
}
// Stops replication at all replicaset secondaries.
function stopReplicationOnSecondaries(rs) {
- var secondaries = rs.getSecondaries();
- secondaries.forEach(stopServerReplication);
+ stopServerReplication(rs.getSecondaries());
}
// Stops replication at all shard secondaries.
@@ -20,23 +31,29 @@ function stopReplicationOnSecondariesOfAllShards(st) {
st._rsObjects.forEach(stopReplicationOnSecondaries);
}
-// Restarts replication at a server.
+// Restarts replication on the given server(s).
function restartServerReplication(conn) {
- var errMsg = 'Failed to disable rsSyncApplyStop failpoint.';
+ if (conn.length) {
+ conn.forEach(function(n) {
+ restartServerReplication(n);
+ });
+ return;
+ }
+
+ var errMsg = 'Failed to disable stopReplProducer failpoint.';
assert.commandWorked(
- conn.getDB('admin').runCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'off'}),
+ conn.getDB('admin').runCommand({configureFailPoint: 'stopReplProducer', mode: 'off'}),
errMsg);
}
// Restarts replication at all nodes in a replicaset.
function restartReplSetReplication(rs) {
- rs.nodes.forEach(restartServerReplication);
+ restartServerReplication(rs.nodes);
}
// Restarts replication at all replicaset secondaries.
function restartReplicationOnSecondaries(rs) {
- var secondaries = rs.getSecondaries();
- secondaries.forEach(restartServerReplication);
+ restartServerReplication(rs.getSecondaries());
}
// Restarts replication at all nodes in a sharded cluster.
diff --git a/jstests/replsets/disallow_adding_initialized_node1.js b/jstests/replsets/disallow_adding_initialized_node1.js
index 8d4491975b6..ba39e6010ad 100644
--- a/jstests/replsets/disallow_adding_initialized_node1.js
+++ b/jstests/replsets/disallow_adding_initialized_node1.js
@@ -3,8 +3,10 @@
// Initialize two replica sets A and B with the same name: A_0; B_0
// Add B_0 to the replica set A. This operation should fail on replica set A should fail on
// detecting an inconsistent replica set ID in the heartbeat response metadata from B_0.
+
(function() {
'use strict';
+ load("jstests/libs/check_log.js");
var name = 'disallow_adding_initialized_node1';
var replSetA = new ReplSetTest({name: name, nodes: [{rsConfig: {_id: 10}}, ]});
@@ -45,20 +47,9 @@
assert.eq(primaryB, newPrimaryB);
// Mismatch replica set IDs in heartbeat responses should be logged.
- var checkLog = function(node, msg) {
- assert.soon(function() {
- var logMessages = assert.commandWorked(node.adminCommand({getLog: 'global'})).log;
- for (var i = 0; i < logMessages.length; i++) {
- if (logMessages[i].indexOf(msg) != -1) {
- return true;
- }
- }
- return false;
- }, 'Did not see a log entry containing the following message: ' + msg, 10000, 1000);
- };
var msgB = "replica set IDs do not match, ours: " + configB.settings.replicaSetId +
"; remote node's: " + configA.settings.replicaSetId;
- checkLog(primaryB, msgB);
+ checkLog.contains(primaryB, msgB);
var statusA = assert.commandWorked(primaryA.adminCommand({replSetGetStatus: 1}));
var statusB = assert.commandWorked(primaryB.adminCommand({replSetGetStatus: 1}));
diff --git a/jstests/replsets/disallow_adding_initialized_node2.js b/jstests/replsets/disallow_adding_initialized_node2.js
index c4125f7c069..793e6a3f02e 100644
--- a/jstests/replsets/disallow_adding_initialized_node2.js
+++ b/jstests/replsets/disallow_adding_initialized_node2.js
@@ -8,8 +8,10 @@
// This test requires users to persist across a restart.
// @tags: [requires_persistence]
+
(function() {
'use strict';
+ load("jstests/libs/check_log.js");
var name = 'disallow_adding_initialized_node2';
var replSetA = new ReplSetTest(
@@ -52,23 +54,12 @@
assert.eq(primaryB, newPrimaryB);
// Mismatch replica set IDs in heartbeat responses should be logged.
- var checkLog = function(node, msg) {
- assert.soon(function() {
- var logMessages = assert.commandWorked(node.adminCommand({getLog: 'global'})).log;
- for (var i = 0; i < logMessages.length; i++) {
- if (logMessages[i].indexOf(msg) != -1) {
- return true;
- }
- }
- return false;
- }, 'Did not see a log entry containing the following message: ' + msg, 10000, 1000);
- };
var msgA = "replica set IDs do not match, ours: " + configA.settings.replicaSetId +
"; remote node's: " + configB.settings.replicaSetId;
var msgB = "replica set IDs do not match, ours: " + configB.settings.replicaSetId +
"; remote node's: " + configA.settings.replicaSetId;
- checkLog(primaryA, msgA);
- checkLog(primaryB, msgB);
+ checkLog.contains(primaryA, msgA);
+ checkLog.contains(primaryB, msgB);
var statusA = assert.commandWorked(primaryA.adminCommand({replSetGetStatus: 1}));
var statusB = assert.commandWorked(primaryB.adminCommand({replSetGetStatus: 1}));
diff --git a/jstests/replsets/double_rollback.js b/jstests/replsets/double_rollback.js
index 2286b80e315..d3018ec69d6 100644
--- a/jstests/replsets/double_rollback.js
+++ b/jstests/replsets/double_rollback.js
@@ -9,8 +9,13 @@
* it rolled back, which could then lead to a double-rollback when node 2 was reconnected
* to node 1 and tried to apply its oplog despite not being in a consistent state.
*/
+
(function() {
'use strict';
+ load("jstests/libs/check_log.js");
+
+ load("jstests/libs/check_log.js");
+ load("jstests/replsets/rslib.js");
var name = "double_rollback";
var dbName = "test";
@@ -100,18 +105,7 @@
jsTestLog("Wait for failpoint on node 2 to pause rollback before it finishes");
// Wait for fail point message to be logged.
- var checkLog = function(node, msg) {
- assert.soon(function() {
- var logMessages = assert.commandWorked(node.adminCommand({getLog: 'global'})).log;
- for (var i = 0; i < logMessages.length; i++) {
- if (logMessages[i].indexOf(msg) != -1) {
- return true;
- }
- }
- return false;
- }, 'Did not see a log entry containing the following message: ' + msg, timeout);
- };
- checkLog(nodes[2], 'rollback - rollbackHangBeforeFinish fail point enabled');
+ checkLog.contains(nodes[2], 'rollback - rollbackHangBeforeFinish fail point enabled');
jsTestLog("Repartition to: [1,3,4] and [0,2].");
nodes[2].disconnect(nodes[1]);
@@ -128,8 +122,8 @@
// for a sync source it can use to reach minvalid and get back into SECONDARY state. Node 0
// is the only node it can reach, but since node 0 doesn't contain node 2's minvalid oplog entry
// node 2 will refuse to use it as a sync source.
- checkLog(nodes[2],
- "remote oplog does not contain entry with optime matching our required optime");
+ checkLog.contains(
+ nodes[2], "remote oplog does not contain entry with optime matching our required optime");
var node0RBID = nodes[0].adminCommand('replSetGetRBID').rbid;
var node1RBID = nodes[1].adminCommand('replSetGetRBID').rbid;
diff --git a/jstests/replsets/read_after_optime.js b/jstests/replsets/read_after_optime.js
index 30cf7782679..7c5217660f6 100644
--- a/jstests/replsets/read_after_optime.js
+++ b/jstests/replsets/read_after_optime.js
@@ -2,6 +2,7 @@
(function() {
"use strict";
+ load("jstests/libs/check_log.js");
var replTest = new ReplSetTest({nodes: 2});
replTest.startSet();
@@ -33,30 +34,6 @@
assert.gt(timeoutResult.waitedMS, 500);
};
- var countLogMessages = function(msg) {
- var total = 0;
- var logMessages = assert.commandWorked(testDB.adminCommand({getLog: 'global'})).log;
- for (var i = 0; i < logMessages.length; i++) {
- if (logMessages[i].indexOf(msg) != -1) {
- total++;
- }
- }
- return total;
- };
-
- var checkLog = function(msg, expectedCount) {
- var count;
- assert.soon(
- function() {
- count = countLogMessages(msg);
- return expectedCount == count;
- },
- 'Expected ' + expectedCount + ', but instead saw ' + count +
- ' log entries containing the following message: ' + msg,
- 60000,
- 300);
- };
-
// Run the time out test 3 times with replication debug log level increased to 2
// for first and last run. The time out message should be logged twice.
testDB.setLogLevel(2, 'command');
@@ -65,7 +42,7 @@
var msg = 'Command on database ' + testDB.getName() +
' timed out waiting for read concern to be satisfied. Command:';
- checkLog(msg, 1);
+ checkLog.containsWithCount(testDB.getMongo(), msg, 1);
// Read concern timed out message should not be logged.
runTimeoutTest();
@@ -74,7 +51,7 @@
runTimeoutTest();
testDB.setLogLevel(0, 'command');
- checkLog(msg, 2);
+ checkLog.containsWithCount(testDB.getMongo(), msg, 2);
// Test read on future afterOpTime that will eventually occur.
var insertFunc = startParallelShell(
diff --git a/jstests/replsets/read_committed_after_rollback.js b/jstests/replsets/read_committed_after_rollback.js
index 68ba334ac9c..4101d9a242c 100644
--- a/jstests/replsets/read_committed_after_rollback.js
+++ b/jstests/replsets/read_committed_after_rollback.js
@@ -149,8 +149,10 @@ load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
// now be visible as a committed read to both oldPrimary and newPrimary.
assert.commandWorked(
pureSecondary.adminCommand({configureFailPoint: "rsSyncApplyStop", mode: "off"}));
- assert.commandWorked(
- newPrimaryColl.runCommand({getLastError: 1, w: 'majority', wtimeout: 30000}));
+ // Do a write to the new primary so that the old primary can establish a sync source to learn
+ // about the new commit.
+ assert.writeOK(newPrimary.getDB(name).unrelatedCollection.insert(
+ {a: 1}, {writeConcern: {w: 'majority', wtimeout: replTest.kDefaultTimeoutMS}}));
assert.eq(doCommittedRead(newPrimaryColl), 'new');
assert.eq(doCommittedRead(oldPrimaryColl), 'new');
}());
diff --git a/jstests/replsets/read_committed_stale_history.js b/jstests/replsets/read_committed_stale_history.js
new file mode 100644
index 00000000000..99eac7ec5c4
--- /dev/null
+++ b/jstests/replsets/read_committed_stale_history.js
@@ -0,0 +1,159 @@
+/*
+ * Tests that a node on a stale branch of history won't incorrectly mark its ops as committed even
+ * when hearing about a commit point with a higher optime from a new primary.
+ */
+(function() {
+ 'use strict';
+
+ load("jstests/libs/check_log.js");
+ load("jstests/libs/write_concern_util.js");
+ load("jstests/replsets/rslib.js");
+
+ var name = "readCommittedStaleHistory";
+ var dbName = "wMajorityCheck";
+ var collName = "stepdown";
+
+ var rst = new ReplSetTest({
+ name: name,
+ nodes: [{}, {}, {rsConfig: {priority: 0}}, ],
+ nodeOptions: {enableMajorityReadConcern: ""},
+ useBridge: true
+ });
+
+ if (!startSetIfSupportsReadMajority(rst)) {
+ jsTest.log("skipping test since storage engine doesn't support committed reads");
+ return;
+ }
+
+ var nodes = rst.nodes;
+ rst.initiate();
+
+ /**
+ * Waits for the given node to be in state primary *and* have finished drain mode and thus
+ * be available for writes.
+ */
+ function waitForPrimary(node) {
+ assert.soon(function() {
+ return node.adminCommand('ismaster').ismaster;
+ });
+ }
+
+ function stepUp(node) {
+ var primary = rst.getPrimary();
+ if (primary != node) {
+ assert.throws(function() {
+ primary.adminCommand({replSetStepDown: 60 * 5});
+ });
+ }
+ waitForPrimary(node);
+ }
+
+ // Asserts that the given document is not visible in the committed snapshot on the given node.
+ function checkDocNotCommitted(node, doc) {
+ var docs =
+ node.getDB(dbName).getCollection(collName).find(doc).readConcern('majority').toArray();
+ assert.eq(0, docs.length, tojson(docs));
+ }
+
+ jsTestLog("Make sure node 0 is primary.");
+ rst.getPrimary();
+ stepUp(nodes[0]);
+ var primary = rst.getPrimary();
+ var secondaries = rst.getSecondaries();
+ assert.eq(nodes[0], primary);
+ // Wait for all data bearing nodes to get up to date.
+ assert.writeOK(nodes[0].getDB(dbName).getCollection(collName).insert(
+ {a: 1}, {writeConcern: {w: 3, wtimeout: rst.kDefaultTimeoutMS}}));
+
+ // Stop the secondaries from replicating.
+ stopServerReplication(secondaries);
+ // Stop the primary from being able to complete stepping down.
+ assert.commandWorked(
+ nodes[0].adminCommand({configureFailPoint: 'blockHeartbeatStepdown', mode: 'alwaysOn'}));
+
+ jsTestLog("Do a write that won't ever reach a majority of nodes");
+ assert.writeOK(nodes[0].getDB(dbName).getCollection(collName).insert({a: 2}));
+
+ // Ensure that the write that was just done is not visible in the committed snapshot.
+ checkDocNotCommitted(nodes[0], {a: 2});
+
+ // Prevent the primary from rolling back later on.
+ assert.commandWorked(
+ nodes[0].adminCommand({configureFailPoint: 'rollbackHangBeforeStart', mode: 'alwaysOn'}));
+
+ jsTest.log("Disconnect primary from all secondaries");
+ nodes[0].disconnect(nodes[1]);
+ nodes[0].disconnect(nodes[2]);
+
+ // Ensure the soon-to-be primary cannot see the write from the old primary.
+ assert.eq(null, nodes[1].getDB(dbName).getCollection(collName).findOne({a: 2}));
+
+ jsTest.log("Wait for a new primary to be elected");
+ // Allow the secondaries to replicate again.
+ restartServerReplication(secondaries);
+
+ waitForPrimary(nodes[1]);
+
+ jsTest.log("Do a write to the new primary");
+ assert.writeOK(nodes[1].getDB(dbName).getCollection(collName).insert(
+ {a: 3}, {writeConcern: {w: 2, wtimeout: rst.kDefaultTimeoutMS}}));
+
+ // Ensure the new primary still cannot see the write from the old primary.
+ assert.eq(null, nodes[1].getDB(dbName).getCollection(collName).findOne({a: 2}));
+
+ // Ensure the stale primary still hasn't committed the write it did that never reached
+ // the other nodes.
+ checkDocNotCommitted(nodes[0], {a: 2});
+
+ jsTest.log("Reconnect the old primary to the rest of the nodes");
+ nodes[1].reconnect(nodes[0]);
+ nodes[2].reconnect(nodes[0]);
+
+ // Sleep 10 seconds to allow some heartbeats to be processed, so we can verify that the
+ // heartbeats don't cause the stale primary to incorrectly advance the commit point.
+ sleep(10000);
+
+ // Ensure the new primary still cannot see the write from the old primary.
+ assert.eq(null, nodes[1].getDB(dbName).getCollection(collName).findOne({a: 2}));
+
+ // Ensure the stale primary still hasn't committed the write it did that never reached
+ // the other nodes.
+ checkDocNotCommitted(nodes[0], {a: 2});
+
+ jsTest.log("Allow the old primary to finish stepping down and become secondary");
+ var res = null;
+ try {
+ res = nodes[0].adminCommand({configureFailPoint: 'blockHeartbeatStepdown', mode: 'off'});
+ } catch (e) {
+ // Expected - once we disable the fail point the stepdown will proceed and it's racy whether
+ // the stepdown closes all connections before or after the configureFailPoint command
+ // returns
+ }
+ if (res) {
+ assert.commandWorked(res);
+ }
+ rst.waitForState(nodes[0], ReplSetTest.State.SECONDARY);
+
+ // At this point the former primary will attempt to go into rollback, but the
+ // 'rollbackHangBeforeStart' will prevent it from doing so.
+ checkDocNotCommitted(nodes[0], {a: 2});
+ checkLog.contains(nodes[0], 'rollback - rollbackHangBeforeStart fail point enabled');
+ checkDocNotCommitted(nodes[0], {a: 2});
+
+ jsTest.log("Allow the original primary to roll back its write and catch up to the new primary");
+ assert.commandWorked(
+ nodes[0].adminCommand({configureFailPoint: 'rollbackHangBeforeStart', mode: 'off'}));
+
+ assert.soonNoExcept(function() {
+ return null == nodes[0].getDB(dbName).getCollection(collName).findOne({a: 2});
+ }, "Original primary never rolled back its write");
+
+ rst.awaitReplication();
+
+ // Ensure that the old primary got the write that the new primary did and sees it as committed.
+ assert.neq(
+ null,
+ nodes[0].getDB(dbName).getCollection(collName).find({a: 3}).readConcern('majority').next());
+
+ rst.stopSet();
+}());
diff --git a/jstests/replsets/rslib.js b/jstests/replsets/rslib.js
index 48e934a88d8..267397b87a3 100644
--- a/jstests/replsets/rslib.js
+++ b/jstests/replsets/rslib.js
@@ -1,5 +1,9 @@
var wait, occasionally, reconnect, getLatestOp, waitForAllMembers, reconfig, awaitOpTime;
var waitUntilAllNodesCaughtUp;
+var waitForState;
+var awaitRSClientHosts;
+var getLastOpTime;
+var startSetIfSupportsReadMajority;
(function() {
"use strict";
@@ -192,4 +196,141 @@ var waitUntilAllNodesCaughtUp;
timeout);
};
+ /**
+ * Waits for the given node to reach the given state, ignoring network errors.
+ */
+ waitForState = function(node, state) {
+ assert.soonNoExcept(function() {
+ assert.commandWorked(node.adminCommand(
+ {replSetTest: 1, waitForMemberState: state, timeoutMillis: 60 * 1000 * 5}));
+ return true;
+ });
+ };
+
+ /**
+ * Starts each node in the given replica set if the storage engine supports readConcern
+ *'majority'.
+ * Returns true if the replica set was started successfully and false otherwise.
+ *
+ * @param replSetTest - The instance of {@link ReplSetTest} to start
+ * @param options - The options passed to {@link ReplSetTest.startSet}
+ */
+ startSetIfSupportsReadMajority = function(replSetTest, options) {
+ try {
+ replSetTest.startSet(options);
+ } catch (e) {
+ var conn = MongoRunner.runMongod();
+ if (!conn.getDB("admin").serverStatus().storageEngine.supportsCommittedReads) {
+ MongoRunner.stopMongod(conn);
+ return false;
+ }
+ throw e;
+ }
+ return true;
+ };
+
+ /**
+ * Waits for the specified hosts to enter a certain state.
+ */
+ awaitRSClientHosts = function(conn, host, hostOk, rs, timeout) {
+ var hostCount = host.length;
+ if (hostCount) {
+ for (var i = 0; i < hostCount; i++) {
+ awaitRSClientHosts(conn, host[i], hostOk, rs);
+ }
+
+ return;
+ }
+
+ timeout = timeout || 5 * 60 * 1000;
+
+ if (hostOk == undefined)
+ hostOk = {
+ ok: true
+ };
+ if (host.host)
+ host = host.host;
+ if (rs)
+ rs = rs.name;
+
+ print("Awaiting " + host + " to be " + tojson(hostOk) + " for " + conn + " (rs: " + rs +
+ ")");
+
+ var tests = 0;
+
+ assert.soon(function() {
+ var rsClientHosts = conn.adminCommand('connPoolStats').replicaSets;
+ if (tests++ % 10 == 0) {
+ printjson(rsClientHosts);
+ }
+
+ for (var rsName in rsClientHosts) {
+ if (rs && rs != rsName)
+ continue;
+
+ for (var i = 0; i < rsClientHosts[rsName].hosts.length; i++) {
+ var clientHost = rsClientHosts[rsName].hosts[i];
+ if (clientHost.addr != host)
+ continue;
+
+ // Check that *all* host properties are set correctly
+ var propOk = true;
+ for (var prop in hostOk) {
+ // Use special comparator for tags because isMaster can return the fields in
+ // different order. The fields of the tags should be treated like a set of
+ // strings and 2 tags should be considered the same if the set is equal.
+ if (prop == 'tags') {
+ if (!clientHost.tags) {
+ propOk = false;
+ break;
+ }
+
+ for (var hostTag in hostOk.tags) {
+ if (clientHost.tags[hostTag] != hostOk.tags[hostTag]) {
+ propOk = false;
+ break;
+ }
+ }
+
+ for (var clientTag in clientHost.tags) {
+ if (clientHost.tags[clientTag] != hostOk.tags[clientTag]) {
+ propOk = false;
+ break;
+ }
+ }
+
+ continue;
+ }
+
+ if (isObject(hostOk[prop])) {
+ if (!friendlyEqual(hostOk[prop], clientHost[prop])) {
+ propOk = false;
+ break;
+ }
+ } else if (clientHost[prop] != hostOk[prop]) {
+ propOk = false;
+ break;
+ }
+ }
+
+ if (propOk) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }, 'timed out waiting for replica set client to recognize hosts', timeout);
+ };
+
+ /**
+ * Returns the last opTime of the connection based from replSetGetStatus. Can only
+ * be used on replica set nodes.
+ */
+ getLastOpTime = function(conn) {
+ var replSetStatus =
+ assert.commandWorked(conn.getDB("admin").runCommand({replSetGetStatus: 1}));
+ var connStatus = replSetStatus.members.filter(m => m.self)[0];
+ return connStatus.optime;
+ };
}());
diff --git a/jstests/replsets/server8070.js b/jstests/replsets/server8070.js
index e91e95e99a4..f9d9a3feb16 100644
--- a/jstests/replsets/server8070.js
+++ b/jstests/replsets/server8070.js
@@ -100,8 +100,8 @@ assert(syncingTo !== getHostName() + ":" + replSet.ports[1], "node 3 is syncing
jsTest.log("Pause 3's bgsync thread");
var rsBgSyncProduceResult3 =
- member3.runCommand({configureFailPoint: 'rsBgSyncProduce', mode: 'alwaysOn'});
-assert.eq(1, rsBgSyncProduceResult3.ok, "member 3 rsBgSyncProduce admin command failed");
+ member3.runCommand({configureFailPoint: 'stopReplProducer', mode: 'alwaysOn'});
+assert.eq(1, rsBgSyncProduceResult3.ok, "member 3 stopReplProducer admin command failed");
// count documents in member 3
assert.eq(26,
@@ -123,7 +123,7 @@ assert.soon(function() {
}, "Replication member 3 did not apply ops 25-75");
jsTest.log("Start 3's bgsync thread");
-member3.runCommand({configureFailPoint: 'rsBgSyncProduce', mode: 'off'});
+member3.runCommand({configureFailPoint: 'stopReplProducer', mode: 'off'});
jsTest.log("Node 3 shouldn't hit rollback");
var end = (new Date()).getTime() + 10000;
diff --git a/jstests/replsets/write_concern_after_stepdown.js b/jstests/replsets/write_concern_after_stepdown.js
new file mode 100644
index 00000000000..22f6f627fda
--- /dev/null
+++ b/jstests/replsets/write_concern_after_stepdown.js
@@ -0,0 +1,101 @@
+/*
+ * Tests that heartbeats containing writes from a different branch of history can't cause a stale
+ * primary to incorrectly acknowledge a w:majority write that's about to be rolled back.
+ */
+(function() {
+ 'use strict';
+
+ load("jstests/replsets/rslib.js");
+ load("jstests/libs/write_concern_util.js");
+
+ var name = "writeConcernStepDownAndBackUp";
+ var dbName = "wMajorityCheck";
+ var collName = "stepdownAndBackUp";
+
+ var rst = new ReplSetTest(
+ {name: name, nodes: [{}, {}, {rsConfig: {priority: 0}}, ], useBridge: true});
+ var nodes = rst.startSet();
+ rst.initiate();
+
+ function waitForPrimary(node) {
+ assert.soon(function() {
+ return node.adminCommand('ismaster').ismaster;
+ });
+ }
+
+ function stepUp(node) {
+ var primary = rst.getPrimary();
+ if (primary != node) {
+ assert.throws(function() {
+ primary.adminCommand({replSetStepDown: 60 * 5});
+ });
+ }
+ waitForPrimary(node);
+ }
+
+ jsTestLog("Make sure node 0 is primary.");
+ stepUp(nodes[0]);
+ var primary = rst.getPrimary();
+ var secondaries = rst.getSecondaries();
+ assert.eq(nodes[0], primary);
+ // Wait for all data bearing nodes to get up to date.
+ assert.writeOK(nodes[0].getDB(dbName).getCollection(collName).insert(
+ {a: 1}, {writeConcern: {w: 3, wtimeout: rst.kDefaultTimeoutMS}}));
+
+ // Stop the secondaries from replicating.
+ stopServerReplication(secondaries);
+ // Stop the primary from being able to complete stepping down.
+ assert.commandWorked(
+ nodes[0].adminCommand({configureFailPoint: 'blockHeartbeatStepdown', mode: 'alwaysOn'}));
+
+ jsTestLog("Do w:majority write that will block waiting for replication.");
+ var doMajorityWrite = function() {
+ var res = db.getSiblingDB('wMajorityCheck')
+ .stepdownAndBackUp.insert({a: 2}, {writeConcern: {w: 'majority'}});
+ assert.writeErrorWithCode(res, ErrorCodes.PrimarySteppedDown);
+ };
+
+ var joinMajorityWriter = startParallelShell(doMajorityWrite, nodes[0].port);
+
+ jsTest.log("Disconnect primary from all secondaries");
+ nodes[0].disconnect(nodes[1]);
+ nodes[0].disconnect(nodes[2]);
+
+ jsTest.log("Wait for a new primary to be elected");
+ // Allow the secondaries to replicate again.
+ restartServerReplication(secondaries);
+
+ waitForPrimary(nodes[1]);
+
+ jsTest.log("Do a write to the new primary");
+ assert.writeOK(nodes[1].getDB(dbName).getCollection(collName).insert(
+ {a: 3}, {writeConcern: {w: 2, wtimeout: rst.kDefaultTimeoutMS}}));
+
+ jsTest.log("Reconnect the old primary to the rest of the nodes");
+ // Only allow the old primary to connect to the other nodes, not the other way around.
+ // This is so that the old priamry will detect that it needs to step down and step itself down,
+ // rather than one of the other nodes detecting this and sending it a replSetStepDown command,
+ // which would cause the old primary to kill all operations and close all connections, making
+ // the way that the insert in the parallel shell fails be nondeterministic. Rather than
+ // handling all possible failure modes in the parallel shell, allowing heartbeat connectivity in
+ // only one direction makes it easier for the test to fail deterministically.
+ nodes[1].acceptConnectionsFrom(nodes[0]);
+ nodes[2].acceptConnectionsFrom(nodes[0]);
+
+ joinMajorityWriter();
+
+ // Allow the old primary to finish stepping down so that shutdown can finish.
+ var res = null;
+ try {
+ res = nodes[0].adminCommand({configureFailPoint: 'blockHeartbeatStepdown', mode: 'off'});
+ } catch (e) {
+ // Expected - once we disable the fail point the stepdown will proceed and it's racy whether
+ // the stepdown closes all connections before or after the configureFailPoint command
+ // returns
+ }
+ if (res) {
+ assert.commandWorked(res);
+ }
+
+ rst.stopSet();
+}());
diff --git a/jstests/replsets/write_concern_after_stepdown_and_stepup.js b/jstests/replsets/write_concern_after_stepdown_and_stepup.js
new file mode 100644
index 00000000000..beb2bd16e0a
--- /dev/null
+++ b/jstests/replsets/write_concern_after_stepdown_and_stepup.js
@@ -0,0 +1,126 @@
+/*
+ * Tests that heartbeats containing writes from a different branch of history can't cause a stale
+ * primary to incorrectly acknowledge a w:majority write that's about to be rolled back, even if the
+ * stale primary is re-elected primary before waiting for the write concern acknowledgement.
+ */
+(function() {
+ 'use strict';
+
+ var name = "writeConcernStepDownAndBackUp";
+ var dbName = "wMajorityCheck";
+ var collName = "stepdownAndBackUp";
+
+ var rst = new ReplSetTest(
+ {name: name, nodes: [{}, {}, {rsConfig: {priority: 0}}, ], useBridge: true});
+ var nodes = rst.startSet();
+ rst.initiate();
+
+ var timeout = 5 * 60 * 1000;
+
+ function waitForState(node, state) {
+ assert.soonNoExcept(function() {
+ assert.commandWorked(node.adminCommand(
+ {replSetTest: 1, waitForMemberState: state, timeoutMillis: timeout}));
+ return true;
+ });
+ }
+
+ function waitForPrimary(node) {
+ assert.soon(function() {
+ return node.adminCommand('ismaster').ismaster;
+ });
+ }
+
+ function stepUp(node) {
+ var primary = rst.getPrimary();
+ if (primary != node) {
+ assert.throws(function() {
+ primary.adminCommand({replSetStepDown: 60 * 5});
+ });
+ }
+ waitForPrimary(node);
+ }
+
+ jsTestLog("Make sure node 0 is primary.");
+ stepUp(nodes[0]);
+ var primary = rst.getPrimary();
+ var secondaries = rst.getSecondaries();
+ assert.eq(nodes[0], primary);
+ // Wait for all data bearing nodes to get up to date.
+ assert.writeOK(nodes[0].getDB(dbName).getCollection(collName).insert(
+ {a: 1}, {writeConcern: {w: 3, wtimeout: timeout}}));
+
+ // Stop the secondaries from replicating.
+ secondaries.forEach(function(node) {
+ assert.commandWorked(
+ node.adminCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'alwaysOn'}));
+ });
+ // Stop the primary from calling into awaitReplication()
+ assert.commandWorked(nodes[0].adminCommand(
+ {configureFailPoint: 'hangBeforeWaitingForWriteConcern', mode: 'alwaysOn'}));
+
+ jsTestLog("Do w:majority write that won't enter awaitReplication() until after the primary " +
+ "has stepped down and back up");
+ var doMajorityWrite = function() {
+ assert.commandWorked(db.adminCommand({ismaster: 1}));
+
+ assert.throws(function() {
+ db.getSiblingDB('wMajorityCheck')
+ .stepdownAndBackUp.insert({a: 2}, {writeConcern: {w: 'majority'}});
+ });
+ };
+
+ var joinMajorityWriter = startParallelShell(doMajorityWrite, nodes[0].port);
+
+ jsTest.log("Disconnect primary from all secondaries");
+ nodes[0].disconnect(nodes[1]);
+ nodes[0].disconnect(nodes[2]);
+
+ jsTest.log("Wait for a new primary to be elected");
+ // Allow the secondaries to replicate again.
+ secondaries.forEach(function(node) {
+ assert.commandWorked(
+ node.adminCommand({configureFailPoint: 'rsSyncApplyStop', mode: 'off'}));
+ });
+
+ waitForPrimary(nodes[1]);
+
+ jsTest.log("Do a write to the new primary");
+ assert.writeOK(nodes[1].getDB(dbName).getCollection(collName).insert(
+ {a: 3}, {writeConcern: {w: 2, wtimeout: timeout}}));
+
+ jsTest.log("Reconnect the old primary to the rest of the nodes");
+ nodes[0].reconnect(nodes[1]);
+ nodes[0].reconnect(nodes[2]);
+
+ jsTest.log("Wait for the old primary to step down, roll back its write, and apply the " +
+ "new writes from the new primary");
+ waitForState(nodes[0], ReplSetTest.State.SECONDARY);
+ rst.awaitReplication();
+
+ // At this point all 3 nodes should have the same data
+ assert.soonNoExcept(function() {
+ nodes.forEach(function(node) {
+ assert.eq(null,
+ node.getDB(dbName).getCollection(collName).findOne({a: 2}),
+ "Node " + node.host + " contained op that should have been rolled back");
+ assert.neq(null,
+ node.getDB(dbName).getCollection(collName).findOne({a: 3}),
+ "Node " + node.host +
+ " was missing op from branch of history that should have persisted");
+ });
+ return true;
+ });
+
+ jsTest.log("Make the original primary become primary once again");
+ stepUp(nodes[0]);
+
+ jsTest.log("Unblock the thread waiting for replication of the now rolled-back write, ensure " +
+ "that the write concern failed");
+ assert.commandWorked(nodes[0].adminCommand(
+ {configureFailPoint: 'hangBeforeWaitingForWriteConcern', mode: 'off'}));
+
+ joinMajorityWriter();
+
+ rst.stopSet();
+}());
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 37ba22bf1a7..75a91db536d 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -117,7 +117,7 @@ Status checkRemoteOplogStart(stdx::function<StatusWith<BSONObj>()> getNextOperat
} // namespace
-MONGO_FP_DECLARE(rsBgSyncProduce);
+MONGO_FP_DECLARE(stopReplProducer);
BackgroundSync* BackgroundSync::s_instance = 0;
stdx::mutex BackgroundSync::s_mutex;
@@ -133,6 +133,9 @@ static ServerStatusMetricField<Counter64> displayOpsRead("repl.network.ops", &op
static Counter64 networkByteStats;
static ServerStatusMetricField<Counter64> displayBytesRead("repl.network.bytes", &networkByteStats);
+// Failpoint which causes rollback to hang before starting.
+MONGO_FP_DECLARE(rollbackHangBeforeStart);
+
// The count of items in the buffer
static Counter64 bufferCountGauge;
static ServerStatusMetricField<Counter64> displayBufferCount("repl.buffer.count",
@@ -262,6 +265,21 @@ void BackgroundSync::_producerThread() {
}
void BackgroundSync::_produce(OperationContext* txn) {
+ if (MONGO_FAIL_POINT(stopReplProducer)) {
+ // This log output is used in js tests so please leave it.
+ log() << "bgsync - stopReplProducer fail point "
+ "enabled. Blocking until fail point is disabled.";
+
+ // TODO(SERVER-27120): Remove the return statement and uncomment the while loop.
+ // Currently we cannot block here or we prevent primaries from being fully elected since
+ // we'll never call _signalNoNewDataForApplier.
+ // while (MONGO_FAIL_POINT(stopReplProducer) && !inShutdown()) {
+ // mongo::sleepsecs(1);
+ // }
+ mongo::sleepsecs(1);
+ return;
+ }
+
// this oplog reader does not do a handshake because we don't want the server it's syncing
// from to track how far it has synced
{
@@ -280,10 +298,6 @@ void BackgroundSync::_produce(OperationContext* txn) {
}
}
- while (MONGO_FAIL_POINT(rsBgSyncProduce)) {
- sleepmillis(0);
- }
-
// find a target to sync from the last optime fetched
OpTime lastOpTimeFetched;
@@ -478,26 +492,6 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
bool syncSourceHasSyncSource = false;
OpTime sourcesLastOp;
- // Forward metadata (containing liveness information) to replication coordinator.
- bool receivedMetadata =
- queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName);
- if (receivedMetadata) {
- auto metadataResult =
- rpc::ReplSetMetadata::readFromMetadata(queryResponse.otherFields.metadata);
- if (!metadataResult.isOK()) {
- error() << "invalid replication metadata from sync source " << source << ": "
- << metadataResult.getStatus() << ": " << queryResponse.otherFields.metadata;
- return;
- }
- const auto& metadata = metadataResult.getValue();
- _replCoord->processReplSetMetadata(metadata);
- if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) {
- _replCoord->cancelAndRescheduleElectionTimeout();
- }
- syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1;
- sourcesLastOp = metadata.getLastOpVisible();
- }
-
const auto& documents = queryResponse.documents;
auto firstDocToApply = documents.cbegin();
auto lastDocToApply = documents.cend();
@@ -576,6 +570,32 @@ void BackgroundSync::_fetcherCallback(const StatusWith<Fetcher::QueryResponse>&
return;
}
+ if (MONGO_FAIL_POINT(stopReplProducer)) {
+ return;
+ }
+
+ // Process replset metadata. It is important that this happen after we've validated the
+ // first batch, so we don't progress our knowledge of the commit point from a
+ // response that triggers a rollback.
+ bool receivedMetadata =
+ queryResponse.otherFields.metadata.hasElement(rpc::kReplSetMetadataFieldName);
+ if (receivedMetadata) {
+ auto metadataResult =
+ rpc::ReplSetMetadata::readFromMetadata(queryResponse.otherFields.metadata);
+ if (!metadataResult.isOK()) {
+ error() << "invalid replication metadata from sync source " << source << ": "
+ << metadataResult.getStatus() << ": " << queryResponse.otherFields.metadata;
+ return;
+ }
+ const auto& metadata = metadataResult.getValue();
+ _replCoord->processReplSetMetadata(metadata, true /*advance commit point*/);
+ if (metadata.getPrimaryIndex() != rpc::ReplSetMetadata::kNoPrimary) {
+ _replCoord->cancelAndRescheduleElectionTimeout();
+ }
+ syncSourceHasSyncSource = metadata.getSyncSourceIndex() != -1;
+ sourcesLastOp = metadata.getLastOpVisible();
+ }
+
// The count of the bytes of the documents read off the network.
int networkDocumentBytes = 0;
Timestamp lastTS;
@@ -741,6 +761,15 @@ void BackgroundSync::_rollback(OperationContext* txn,
const HostAndPort& source,
boost::optional<int> requiredRBID,
stdx::function<DBClientBase*()> getConnection) {
+ if (MONGO_FAIL_POINT(rollbackHangBeforeStart)) {
+ // This log output is used in js tests so please leave it.
+ log() << "rollback - rollbackHangBeforeStart fail point "
+ "enabled. Blocking until fail point is disabled.";
+ while (MONGO_FAIL_POINT(rollbackHangBeforeStart) && !inShutdown()) {
+ mongo::sleepsecs(1);
+ }
+ }
+
// Set state to ROLLBACK while we are in this function. This prevents serving reads, even from
// the oplog. This can fail if we are elected PRIMARY, in which case we better not do any
// rolling back. If we successfully enter ROLLBACK we will only exit this function fatally or
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index 18ccf45a224..57a2c426817 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -466,14 +466,13 @@ public:
virtual void processReplSetGetConfig(BSONObjBuilder* result) = 0;
/**
- * Processes the ReplSetMetadata returned from a command run against another replica set
- * member and updates protocol version 1 information (most recent optime that is committed,
- * member id of the current PRIMARY, the current config version and the current term).
- *
- * TODO(dannenberg): Move this method to be testing only if it does not end up being used
- * to process the find and getmore metadata responses from the DataReplicator.
+ * Processes the ReplSetMetadata returned from a command run against another
+ * replica set member and so long as the config version in the metadata matches the replica set
+ * config version this node currently has, updates the current term and optionally updates
+ * this node's notion of the commit point.
*/
- virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) = 0;
+ virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata,
+ bool advanceCommitPoint) = 0;
/**
* Elections under protocol version 1 are triggered by a timer.
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 7c30cd983dd..67102a30033 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1445,10 +1445,38 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitRepl
return StatusAndDuration(Status::OK(), Milliseconds(timer->millis()));
}
- if (replMode == modeReplSet && !_memberState.primary()) {
- return StatusAndDuration(
- Status(ErrorCodes::NotMaster, "Not master while waiting for replication"),
- Milliseconds(timer->millis()));
+ auto checkForStepDown = [&]() -> Status {
+ if (replMode == modeReplSet && !_memberState.primary()) {
+ return {ErrorCodes::NotMaster, "Primary stepped down while waiting for replication"};
+ }
+
+ // Relax term checking under 3.2 because some commands (eg. createIndexes) might not return
+ // a term in the response metadata to mongos which may pass the no-term OpTime back to
+ // mongod eventually.
+ if (opTime.getTerm() != OpTime::kUninitializedTerm &&
+ _cachedTerm != OpTime::kUninitializedTerm && opTime.getTerm() != _cachedTerm) {
+ return {
+ ErrorCodes::NotMaster,
+ str::stream() << "Term changed from " << opTime.getTerm() << " to " << _cachedTerm
+ << " while waiting for replication, indicating that this node must "
+ "have stepped down."};
+ }
+
+ if (_stepDownPending) {
+ return {ErrorCodes::NotMaster,
+ "Received stepdown request while waiting for replication"};
+ }
+ return Status::OK();
+ };
+
+ Status stepdownStatus = checkForStepDown();
+ if (!stepdownStatus.isOK()) {
+ return StatusAndDuration(stepdownStatus, Milliseconds(timer->millis()));
+ }
+
+ auto interruptStatus = txn->checkForInterruptNoAssert();
+ if (!interruptStatus.isOK()) {
+ return StatusAndDuration(interruptStatus, Milliseconds(timer->millis()));
}
if (writeConcern.wMode.empty()) {
@@ -1470,6 +1498,7 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitRepl
return StatusAndDuration(interruptedStatus, elapsed);
}
+
if (!waitInfo.master) {
return StatusAndDuration(Status(ErrorCodes::NotMaster,
"Not master anymore while waiting for replication"
@@ -1506,6 +1535,11 @@ ReplicationCoordinator::StatusAndDuration ReplicationCoordinatorImpl::_awaitRepl
} else {
condVar.wait_for(*lock, waitTime);
}
+
+ stepdownStatus = checkForStepDown();
+ if (!stepdownStatus.isOK()) {
+ return StatusAndDuration(stepdownStatus, elapsed);
+ }
}
Status status = _checkIfWriteConcernCanBeSatisfied_inlock(writeConcern);
@@ -2001,10 +2035,11 @@ void ReplicationCoordinatorImpl::processReplSetGetConfig(BSONObjBuilder* result)
result->append("config", _rsConfig.toBSON());
}
-void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) {
+void ReplicationCoordinatorImpl::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata,
+ bool advanceCommitPoint) {
EventHandle evh;
- _scheduleWorkAndWaitForCompletion([this, &evh, &replMetadata](const CallbackArgs& args) {
- evh = _processReplSetMetadata_incallback(replMetadata);
+ _scheduleWorkAndWaitForCompletion([&](const CallbackArgs& args) {
+ evh = _processReplSetMetadata_incallback(replMetadata, advanceCommitPoint);
});
if (evh.isValid()) {
_replExecutor.waitForEvent(evh);
@@ -2017,11 +2052,13 @@ void ReplicationCoordinatorImpl::cancelAndRescheduleElectionTimeout() {
}
EventHandle ReplicationCoordinatorImpl::_processReplSetMetadata_incallback(
- const rpc::ReplSetMetadata& replMetadata) {
+ const rpc::ReplSetMetadata& replMetadata, bool advanceCommitPoint) {
if (replMetadata.getConfigVersion() != _rsConfig.getConfigVersion()) {
return EventHandle();
}
- _setLastCommittedOpTime(replMetadata.getLastOpCommitted());
+ if (advanceCommitPoint) {
+ _setLastCommittedOpTime(replMetadata.getLastOpCommitted());
+ }
return _updateTerm_incallback(replMetadata.getTerm());
}
@@ -2527,6 +2564,7 @@ ReplicationCoordinatorImpl::_updateMemberStateFromTopologyCoordinator_inlock() {
info->condVar->notify_all();
}
_canAcceptNonLocalWrites = false;
+ _stepDownPending = false;
result = kActionCloseAllConnections;
} else {
result = kActionFollowerModeStateChange;
@@ -3103,7 +3141,7 @@ bool ReplicationCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& curre
}
void ReplicationCoordinatorImpl::_updateLastCommittedOpTime_inlock() {
- if (!_getMemberState_inlock().primary()) {
+ if (!_getMemberState_inlock().primary() || _stepDownPending) {
return;
}
@@ -3509,7 +3547,7 @@ EventHandle ReplicationCoordinatorImpl::_updateTerm_incallback(
if (localUpdateTermResult == TopologyCoordinator::UpdateTermResult::kTriggerStepDown) {
log() << "stepping down from primary, because a new term has begun: " << term;
_topCoord->prepareForStepDown();
- return _stepDownStart();
+ return _stepDownStart(false);
}
return EventHandle();
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 3c6bbaefdbb..e2b2bea1298 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -217,7 +217,8 @@ public:
virtual void processReplSetGetConfig(BSONObjBuilder* result) override;
- virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) override;
+ virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata,
+ bool advanceCommitPoint) override;
virtual void cancelAndRescheduleElectionTimeout() override;
@@ -1024,7 +1025,10 @@ private:
*/
void _requestRemotePrimaryStepdown(const HostAndPort& target);
- ReplicationExecutor::EventHandle _stepDownStart();
+ /**
+ * Schedules stepdown to run with the global exclusive lock.
+ */
+ ReplicationExecutor::EventHandle _stepDownStart(bool hasMutex);
/**
* Completes a step-down of the current node. Must be run with a global
@@ -1063,9 +1067,11 @@ private:
* Utility method that schedules or performs actions specified by a HeartbeatResponseAction
* returned by a TopologyCoordinator::processHeartbeatResponse(V1) call with the given
* value of "responseStatus".
+ * 'hasMutex' is true if the caller is holding _mutex. TODO(SERVER-27083): Remove this.
*/
void _handleHeartbeatResponseAction(const HeartbeatResponseAction& action,
- const StatusWith<ReplSetHeartbeatResponse>& responseStatus);
+ const StatusWith<ReplSetHeartbeatResponse>& responseStatus,
+ bool hasMutex);
/**
* Bottom half of processHeartbeat(), which runs in the replication executor.
@@ -1115,11 +1121,13 @@ private:
/**
* Callback that processes the ReplSetMetadata returned from a command run against another
- * replica set member and updates protocol version 1 information (most recent optime that is
- * committed, member id of the current PRIMARY, the current config version and the current term)
+ * replica set member and so long as the config version in the metadata matches the replica set
+ * config version this node currently has, updates the current term and optionally updates
+ * this node's notion of the commit point.
* Returns the finish event which is invalid if the process has already finished.
*/
- EventHandle _processReplSetMetadata_incallback(const rpc::ReplSetMetadata& replMetadata);
+ EventHandle _processReplSetMetadata_incallback(const rpc::ReplSetMetadata& replMetadata,
+ bool advanceCommitPoint);
/**
* Blesses a snapshot to be used for new committed reads.
@@ -1281,6 +1289,14 @@ private:
// TODO: ideally this should only change on rollbacks NOT on mongod restarts also.
int _rbid; // (M)
+ // Indicates that we've received a request to stepdown from PRIMARY (likely via a heartbeat)
+ // TODO(SERVER-27083): This bool is redundant of the same-named bool in TopologyCoordinatorImpl,
+ // but due to mutex ordering between _mutex and _topoMutex we can't inspect the
+ // TopologyCoordinator field in awaitReplication() where this bool is used. Once we get rid
+ // of topoMutex and start guarding access to the TopologyCoordinator via _mutex we should
+ // consolidate the two bools.
+ bool _stepDownPending = false; // (M)
+
// list of information about clients waiting on replication. Does *not* own the WaiterInfos.
std::vector<WaiterInfo*> _replicationWaiterList; // (M)
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
index 4c23bab4f5b..f89f1592170 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat.cpp
@@ -64,6 +64,8 @@ typedef ReplicationExecutor::CallbackHandle CBHandle;
using executor::RemoteCommandRequest;
+MONGO_FP_DECLARE(blockHeartbeatStepdown);
+
void ReplicationCoordinatorImpl::_doMemberHeartbeat(ReplicationExecutor::CallbackArgs cbData,
const HostAndPort& target,
int targetIndex) {
@@ -152,7 +154,9 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
if (replMetadata.isOK()) {
// Asynchronous stepdown could happen, but it will be queued in executor after
// this function, so we cannot and don't need to wait for it to finish.
- _processReplSetMetadata_incallback(replMetadata.getValue());
+ // Arbiters are the only nodes allowed to advance their commit point via heartbeats.
+ bool advanceCommitPoint = getMemberState().arbiter();
+ _processReplSetMetadata_incallback(replMetadata.getValue(), advanceCommitPoint);
}
}
const Date_t now = _replExecutor.now();
@@ -164,10 +168,11 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
networkTime = cbData.response.getValue().elapsedMillis;
// TODO(sz) Because the term is duplicated in ReplSetMetaData, we can get rid of this
// and update tests.
- _updateTerm_incallback(hbStatusResponse.getValue().getTerm());
- // Postpone election timeout if we have a successful heartbeat response from the primary.
const auto& hbResponse = hbStatusResponse.getValue();
- if (hbResponse.hasState() && hbResponse.getState().primary()) {
+ _updateTerm_incallback(hbResponse.getTerm());
+ // Postpone election timeout if we have a successful heartbeat response from the primary.
+ if (hbResponse.hasState() && hbResponse.getState().primary() &&
+ hbResponse.getTerm() == _topCoord->getTerm()) {
cancelAndRescheduleElectionTimeout();
}
} else {
@@ -206,7 +211,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
_scheduleHeartbeatToTarget(
target, targetIndex, std::max(now, action.getNextHeartbeatStartDate()));
- _handleHeartbeatResponseAction(action, hbStatusResponse);
+ _handleHeartbeatResponseAction(action, hbStatusResponse, false /*we're not holding _mutex*/);
}
void ReplicationCoordinatorImpl::_updateOpTimesFromHeartbeat_inlock(int targetIndex,
@@ -226,11 +231,13 @@ void ReplicationCoordinatorImpl::_updateOpTimesFromHeartbeat_inlock(int targetIn
void ReplicationCoordinatorImpl::_handleHeartbeatResponseAction(
const HeartbeatResponseAction& action,
- const StatusWith<ReplSetHeartbeatResponse>& responseStatus) {
+ const StatusWith<ReplSetHeartbeatResponse>& responseStatus,
+ bool hasMutex) {
switch (action.getAction()) {
case HeartbeatResponseAction::NoAction:
// Update the cached member state if different than the current topology member state
if (_memberState != _topCoord->getMemberState()) {
+ invariant(!hasMutex);
stdx::unique_lock<stdx::mutex> lk(_mutex);
const PostMemberStateUpdateAction postUpdateAction =
_updateMemberStateFromTopologyCoordinator_inlock();
@@ -250,7 +257,7 @@ void ReplicationCoordinatorImpl::_handleHeartbeatResponseAction(
log() << "Stepping down from primary in response to heartbeat";
_topCoord->prepareForStepDown();
// Don't need to wait for stepdown to finish.
- _stepDownStart();
+ _stepDownStart(hasMutex);
break;
case HeartbeatResponseAction::StepDownRemotePrimary: {
invariant(action.getPrimaryConfigIndex() != _selfIndex);
@@ -304,11 +311,19 @@ void ReplicationCoordinatorImpl::_requestRemotePrimaryStepdown(const HostAndPort
}
}
-ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart() {
+ReplicationExecutor::EventHandle ReplicationCoordinatorImpl::_stepDownStart(bool hasMutex) {
+ {
+ boost::optional<stdx::lock_guard<stdx::mutex>> lk;
+ if (!hasMutex) {
+ lk.emplace(_mutex);
+ }
+ _stepDownPending = true;
+ }
auto finishEvent = _makeEvent();
if (!finishEvent) {
return finishEvent;
}
+
_replExecutor.scheduleWorkWithGlobalExclusiveLock(stdx::bind(
&ReplicationCoordinatorImpl::_stepDownFinish, this, stdx::placeholders::_1, finishEvent));
return finishEvent;
@@ -321,6 +336,20 @@ void ReplicationCoordinatorImpl::_stepDownFinish(
return;
}
invariant(cbData.txn);
+
+ if (MONGO_FAIL_POINT(blockHeartbeatStepdown)) {
+ // Must reschedule rather than block so we don't take up threads in the replication
+ // executor.
+ sleepmillis(10);
+ _replExecutor.scheduleWorkWithGlobalExclusiveLock(
+ stdx::bind(&ReplicationCoordinatorImpl::_stepDownFinish,
+ this,
+ stdx::placeholders::_1,
+ finishedEvent));
+
+ return;
+ }
+
// TODO Add invariant that we've got global shared or global exclusive lock, when supported
// by lock manager.
stdx::unique_lock<stdx::mutex> lk(_mutex);
@@ -623,7 +652,9 @@ void ReplicationCoordinatorImpl::_handleLivenessTimeout(
_topCoord->setMemberAsDown(now, memberIndex, _getMyLastDurableOpTime_inlock());
// Don't mind potential asynchronous stepdown as this is the last step of
// liveness check.
- _handleHeartbeatResponseAction(action, makeStatusWith<ReplSetHeartbeatResponse>());
+ _handleHeartbeatResponseAction(action,
+ makeStatusWith<ReplSetHeartbeatResponse>(),
+ true /*we're holding _mutex*/);
}
}
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
index 2233e21cc21..40a30c48e5a 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_heartbeat_v1_test.cpp
@@ -351,7 +351,7 @@ TEST_F(ReplCoordHBV1Test, ArbiterRecordsCommittedOpTimeFromHeartbeatMetadata) {
<< 1 << "primaryIndex" << 1 << "term"
<< committedOpTime.getTerm() << "syncSourceIndex" << 1)));
ASSERT_OK(metadata.getStatus());
- getReplCoord()->processReplSetMetadata(metadata.getValue());
+ getReplCoord()->processReplSetMetadata(metadata.getValue(), true);
ASSERT_EQ(getReplCoord()->getMyLastAppliedOpTime().getTimestamp(), expected.getTimestamp());
};
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index 9dfd6f11045..2a52f004700 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/read_concern_response.h"
+#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/repl_set_heartbeat_args.h"
#include "mongo/db/repl/repl_set_heartbeat_args_v1.h"
#include "mongo/db/repl/repl_set_request_votes_args.h"
@@ -81,15 +82,15 @@ using executor::RemoteCommandResponse;
typedef ReplicationCoordinator::ReplSetReconfigArgs ReplSetReconfigArgs;
Status kInterruptedStatus(ErrorCodes::Interrupted, "operation was interrupted");
-// Helper class to wrap Timestamp as an OpTime with term 0.
-struct OpTimeWithTermZero {
- OpTimeWithTermZero(unsigned int sec, unsigned int i) : timestamp(sec, i) {}
+// Helper class to wrap Timestamp as an OpTime with term 1.
+struct OpTimeWithTermOne {
+ OpTimeWithTermOne(unsigned int sec, unsigned int i) : timestamp(sec, i) {}
operator OpTime() const {
- return OpTime(timestamp, 0);
+ return OpTime(timestamp, 1);
}
operator boost::optional<OpTime>() const {
- return OpTime(timestamp, 0);
+ return OpTime(timestamp, 1);
}
OpTime asOpTime() const {
@@ -674,7 +675,7 @@ TEST_F(ReplCoordTest, RollBackIDShouldIncreaseByOneWhenIncrementRollbackIDIsCall
TEST_F(ReplCoordTest, NodeReturnsImmediatelyWhenAwaitReplicationIsRanAgainstAStandaloneNode) {
init("");
OperationContextNoop txn;
- OpTimeWithTermZero time(100, 1);
+ OpTimeWithTermOne time(100, 1);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
@@ -692,7 +693,7 @@ TEST_F(ReplCoordTest, NodeReturnsImmediatelyWhenAwaitReplicationIsRanAgainstAMas
settings.setMaster(true);
init(settings);
OperationContextNoop txn;
- OpTimeWithTermZero time(100, 1);
+ OpTimeWithTermOne time(100, 1);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
@@ -719,7 +720,7 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenRunningAwaitReplicationAgainstASec
HostAndPort("node1", 12345));
OperationContextNoop txn;
- OpTimeWithTermZero time(100, 1);
+ OpTimeWithTermOne time(100, 1);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
@@ -732,7 +733,7 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenRunningAwaitReplicationAgainstASec
ASSERT_EQUALS(ErrorCodes::NotMaster, statusAndDur.status);
}
-TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWithWZero) {
+TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWithWTermOne) {
assertStartSuccess(BSON("_id"
<< "mySet"
<< "version" << 2 << "members"
@@ -747,7 +748,7 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWith
HostAndPort("node1", 12345));
OperationContextNoop txn;
- OpTimeWithTermZero time(100, 1);
+ OpTimeWithTermOne time(100, 1);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
@@ -756,8 +757,8 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenRunningAwaitReplicationAgainstPrimaryWith
// Become primary.
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
ASSERT(getReplCoord()->getMemberState().primary());
@@ -787,12 +788,12 @@ TEST_F(ReplCoordTest,
<< "_id" << 3))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
@@ -860,12 +861,12 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedUntilASufficientNumberOfNodes
<< "_id" << 3))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoWaiting;
@@ -1056,6 +1057,8 @@ TEST_F(
// another name if we didn't get a high enough one.
}
+ auto zeroOpTimeInCurrentTerm = OpTime(Timestamp(0, 0), 1);
+ ReplClientInfo::forClient(txn.getClient()).setLastOp(zeroOpTimeInCurrentTerm);
statusAndDur = getReplCoord()->awaitReplicationOfLastOpForClient(&txn, majorityWriteConcern);
ASSERT_EQUALS(ErrorCodes::WriteConcernFailed, statusAndDur.status);
statusAndDur = getReplCoord()->awaitReplicationOfLastOpForClient(&txn, multiDCWriteConcern);
@@ -1158,14 +1161,14 @@ TEST_F(ReplCoordTest, NodeReturnsOkWhenAWriteConcernWithNoTimeoutHasBeenSatisfie
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
ReplicationAwaiter awaiter(getReplCoord(), &txn);
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
@@ -1217,14 +1220,14 @@ TEST_F(ReplCoordTest, NodeReturnsWriteConcernFailedWhenAWriteConcernTimesOutBefo
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
ReplicationAwaiter awaiter(getReplCoord(), &txn);
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = 50;
@@ -1258,14 +1261,14 @@ TEST_F(ReplCoordTest,
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
ReplicationAwaiter awaiter(getReplCoord(), &txn);
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
@@ -1300,14 +1303,14 @@ TEST_F(ReplCoordTest, NodeReturnsNotMasterWhenSteppingDownBeforeSatisfyingAWrite
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
ReplicationAwaiter awaiter(getReplCoord(), &txn);
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
@@ -1340,14 +1343,14 @@ TEST_F(ReplCoordTest,
<< "node3"))),
HostAndPort("node1"));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
ReplicationAwaiter awaiter(getReplCoord(), &txn);
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
WriteConcernOptions writeConcern;
writeConcern.wTimeout = WriteConcernOptions::kNoTimeout;
@@ -1523,7 +1526,7 @@ TEST_F(ReplCoordTest, ConcurrentStepDownShouldNotSignalTheSameFinishEventMoreTha
TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) {
OperationContextReplMock txn;
- OpTimeWithTermZero optime1(100, 1);
+ OpTimeWithTermOne optime1(100, 1);
// All nodes are caught up
getReplCoord()->setMyLastAppliedOpTime(optime1);
getReplCoord()->setMyLastDurableOpTime(optime1);
@@ -1538,7 +1541,7 @@ TEST_F(StepDownTest, NodeReturnsNotMasterWhenAskedToStepDownAsANonPrimaryNode) {
TEST_F(StepDownTest,
NodeReturnsExceededTimeLimitWhenStepDownFailsToObtainTheGlobalLockWithinTheAllottedTime) {
OperationContextReplMock txn;
- OpTimeWithTermZero optime1(100, 1);
+ OpTimeWithTermOne optime1(100, 1);
// All nodes are caught up
getReplCoord()->setMyLastAppliedOpTime(optime1);
getReplCoord()->setMyLastDurableOpTime(optime1);
@@ -1759,8 +1762,8 @@ TEST_F(ReplCoordTest, NodeBecomesPrimaryAgainWhenStepDownTimeoutExpiresInASingle
TEST_F(StepDownTest,
NodeReturnsExceededTimeLimitWhenNoSecondaryIsCaughtUpWithinStepDownsSecondaryCatchUpPeriod) {
OperationContextReplMock txn;
- OpTimeWithTermZero optime1(100, 1);
- OpTimeWithTermZero optime2(100, 2);
+ OpTimeWithTermOne optime1(100, 1);
+ OpTimeWithTermOne optime2(100, 2);
// No secondary is caught up
auto repl = getReplCoord();
repl->setMyLastAppliedOpTime(optime2);
@@ -1941,8 +1944,8 @@ TEST_F(StepDownTest,
TEST_F(StepDownTest, NodeReturnsInterruptedWhenInterruptedDuringStepDown) {
const unsigned int opID = 100;
OperationContextReplMock txn{opID};
- OpTimeWithTermZero optime1(100, 1);
- OpTimeWithTermZero optime2(100, 2);
+ OpTimeWithTermOne optime1(100, 1);
+ OpTimeWithTermOne optime2(100, 2);
// No secondary is caught up
auto repl = getReplCoord();
repl->setMyLastAppliedOpTime(optime2);
@@ -2092,9 +2095,9 @@ TEST_F(ReplCoordTest, NodeIncludesOtherMembersProgressInOldUpdatePositionCommand
<< "test2:1234") << BSON("_id" << 2 << "host"
<< "test3:1234"))),
HostAndPort("test1", 1234));
- OpTimeWithTermZero optime1(100, 1);
- OpTimeWithTermZero optime2(100, 2);
- OpTimeWithTermZero optime3(2, 1);
+ OpTimeWithTermOne optime1(100, 1);
+ OpTimeWithTermOne optime2(100, 2);
+ OpTimeWithTermOne optime3(2, 1);
getReplCoord()->setMyLastAppliedOpTime(optime1);
getReplCoord()->setMyLastDurableOpTime(optime1);
ASSERT_OK(getReplCoord()->setLastAppliedOptime_forTest(1, 1, optime2));
@@ -2124,7 +2127,7 @@ TEST_F(ReplCoordTest, NodeIncludesOtherMembersProgressInOldUpdatePositionCommand
ASSERT_EQUALS(2, memberId);
ASSERT_EQUALS(optime3.timestamp, entry["optime"]["ts"].timestamp());
}
- ASSERT_EQUALS(0, entry["optime"]["t"].Number());
+ ASSERT_EQUALS(1, entry["optime"]["t"].Number());
}
ASSERT_EQUALS(3U, memberIds.size()); // Make sure we saw all 3 nodes
}
@@ -2144,8 +2147,8 @@ TEST_F(ReplCoordTest,
HostAndPort("test2", 1234));
OperationContextNoop txn;
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
// Can't unset maintenance mode if it was never set to begin with.
Status status = getReplCoord()->setMaintenanceMode(false);
@@ -2168,8 +2171,8 @@ TEST_F(ReplCoordTest,
HostAndPort("test2", 1234));
OperationContextNoop txn;
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
// valid set
ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
ASSERT_TRUE(getReplCoord()->getMemberState().recovering());
@@ -2197,8 +2200,8 @@ TEST_F(ReplCoordTest, AllowAsManyUnsetMaintenanceModesAsThereHaveBeenSetMaintena
HostAndPort("test2", 1234));
OperationContextNoop txn;
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
// Can set multiple times
ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
ASSERT_OK(getReplCoord()->setMaintenanceMode(true));
@@ -2228,8 +2231,8 @@ TEST_F(ReplCoordTest, SettingAndUnsettingMaintenanceModeShouldNotAffectRollbackS
HostAndPort("test2", 1234));
OperationContextNoop txn;
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
// From rollback, entering and exiting maintenance mode doesn't change perceived
// state.
@@ -2267,8 +2270,8 @@ TEST_F(ReplCoordTest, DoNotAllowMaintenanceModeWhilePrimary) {
HostAndPort("test2", 1234));
OperationContextNoop txn;
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
// Can't modify maintenance mode when PRIMARY
simulateSuccessfulV1Election();
@@ -2300,8 +2303,8 @@ TEST_F(ReplCoordTest, DoNotAllowSettingMaintenanceModeWhileConductingAnElection)
HostAndPort("test2", 1234));
OperationContextNoop txn;
getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY);
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
// TODO this election shouldn't have to happen.
simulateSuccessfulV1Election();
@@ -2360,8 +2363,8 @@ TEST_F(ReplCoordTest,
HostAndPort("node1", 12345));
OperationContextNoop txn;
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
getReplCoord()->setMyLastAppliedOpTime(time2);
getReplCoord()->setMyLastDurableOpTime(time2);
@@ -2404,8 +2407,8 @@ TEST_F(ReplCoordTest,
HostAndPort("node1", 12345));
OperationContextNoop txn;
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
getReplCoord()->setMyLastAppliedOpTime(time2);
getReplCoord()->setMyLastDurableOpTime(time2);
@@ -2434,8 +2437,8 @@ TEST_F(ReplCoordTest, NodeDoesNotIncludeItselfWhenRunningGetHostsWrittenToInMast
OperationContextNoop txn;
OID client = OID::gen();
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
getExternalState()->setClientHostAndPort(clientHost);
HandshakeArgs handshake;
@@ -2584,12 +2587,12 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenUpdatePositionContainsInfoAboutSelf) {
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
- OpTime time1({100, 1}, 2);
- OpTime time2({100, 2}, 2);
+ OpTime time1({100, 1}, 1);
+ OpTime time2({100, 2}, 1);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
@@ -2630,13 +2633,13 @@ TEST_F(ReplCoordTest, DoNotProcessSelfWhenOldUpdatePositionContainsInfoAboutSelf
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
- OpTimeWithTermZero staleTime(10, 0);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
+ OpTimeWithTermOne staleTime(10, 0);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
@@ -2674,12 +2677,12 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionWhenItsConfigVersionIsIncorrect)
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
- OpTime time1({100, 1}, 3);
- OpTime time2({100, 2}, 3);
+ OpTime time1({100, 1}, 1);
+ OpTime time2({100, 2}, 1);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
@@ -2719,13 +2722,13 @@ TEST_F(ReplCoordTest, DoNotProcessOldUpdatePositionWhenItsConfigVersionIsIncorre
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
- OpTimeWithTermZero staleTime(10, 0);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
+ OpTimeWithTermOne staleTime(10, 0);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
@@ -2762,12 +2765,12 @@ TEST_F(ReplCoordTest, DoNotProcessUpdatePositionOfMembersWhoseIdsAreNotInTheConf
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
- OpTime time1({100, 1}, 2);
- OpTime time2({100, 2}, 2);
+ OpTime time1({100, 1}, 1);
+ OpTime time2({100, 2}, 1);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
@@ -2805,13 +2808,13 @@ TEST_F(ReplCoordTest, DoNotProcessOldUpdatePositionOfMembersWhoseIdsAreNotInTheC
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
- OpTimeWithTermZero staleTime(10, 0);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
+ OpTimeWithTermOne staleTime(10, 0);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
@@ -2847,13 +2850,13 @@ TEST_F(ReplCoordTest,
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
simulateSuccessfulV1Election();
- OpTimeWithTermZero time1(100, 1);
- OpTimeWithTermZero time2(100, 2);
- OpTimeWithTermZero staleTime(10, 0);
+ OpTimeWithTermOne time1(100, 1);
+ OpTimeWithTermOne time2(100, 2);
+ OpTimeWithTermOne staleTime(10, 0);
getReplCoord()->setMyLastAppliedOpTime(time1);
getReplCoord()->setMyLastDurableOpTime(time1);
@@ -2912,11 +2915,11 @@ TEST_F(ReplCoordTest, AwaitReplicationShouldResolveAsNormalDuringAReconfig) {
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 2));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 2));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 2));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 2));
simulateSuccessfulV1Election();
- OpTimeWithTermZero time(100, 2);
+ OpTimeWithTermOne time(100, 2);
// 3 nodes waiting for time
WriteConcernOptions writeConcern;
@@ -2994,11 +2997,11 @@ TEST_F(
<< "_id" << 2))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 2));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 2));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 2));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 2));
simulateSuccessfulV1Election();
- OpTimeWithTermZero time(100, 2);
+ OpTimeWithTermOne time(100, 2);
// 3 nodes waiting for time
WriteConcernOptions writeConcern;
@@ -3055,8 +3058,8 @@ TEST_F(ReplCoordTest,
<< "_id" << 4))),
HostAndPort("node1", 12345));
ASSERT(getReplCoord()->setFollowerMode(MemberState::RS_SECONDARY));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 1));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 1));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 1));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 1));
simulateSuccessfulV1Election();
OpTime time(Timestamp(100, 2), 1);
@@ -3227,13 +3230,13 @@ TEST_F(ReplCoordTest, NodeReturnsShutdownInProgressWhenWaitingUntilAnOpTimeDurin
<< "_id" << 0))),
HostAndPort("node1", 12345));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(10, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(10, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(10, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(10, 0));
shutdown();
auto result = getReplCoord()->waitUntilOpTime(
- &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
+ &txn, ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern));
ASSERT_TRUE(result.didWait());
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, result.getStatus());
@@ -3248,13 +3251,13 @@ TEST_F(ReplCoordTest, NodeReturnsInterruptedWhenWaitingUntilAnOpTimeIsInterrupte
<< "_id" << 0))),
HostAndPort("node1", 12345));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(10, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(10, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(10, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(10, 0));
txn.setCheckForInterruptStatus(Status(ErrorCodes::Interrupted, "test"));
auto result = getReplCoord()->waitUntilOpTime(
- &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
+ &txn, ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern));
ASSERT_TRUE(result.didWait());
ASSERT_EQUALS(ErrorCodes::Interrupted, result.getStatus());
@@ -3284,10 +3287,10 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTi
<< "_id" << 0))),
HostAndPort("node1", 12345));
- getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermZero(100, 0));
- getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermZero(100, 0));
+ getReplCoord()->setMyLastAppliedOpTime(OpTimeWithTermOne(100, 0));
+ getReplCoord()->setMyLastDurableOpTime(OpTimeWithTermOne(100, 0));
auto result = getReplCoord()->waitUntilOpTime(
- &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
+ &txn, ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern));
ASSERT_TRUE(result.didWait());
ASSERT_OK(result.getStatus());
@@ -3303,7 +3306,7 @@ TEST_F(ReplCoordTest, NodeReturnsOkImmediatelyWhenWaitingUntilOpTimePassesAnOpTi
HostAndPort("node1", 12345));
- OpTimeWithTermZero time(100, 0);
+ OpTimeWithTermOne time(100, 0);
getReplCoord()->setMyLastAppliedOpTime(time);
getReplCoord()->setMyLastDurableOpTime(time);
auto result = getReplCoord()->waitUntilOpTime(
@@ -3318,7 +3321,7 @@ TEST_F(ReplCoordTest,
init(ReplSettings());
OperationContextNoop txn;
auto result = getReplCoord()->waitUntilOpTime(
- &txn, ReadConcernArgs(OpTimeWithTermZero(50, 0), ReadConcernLevel::kLocalReadConcern));
+ &txn, ReadConcernArgs(OpTimeWithTermOne(50, 0), ReadConcernLevel::kLocalReadConcern));
ASSERT_FALSE(result.didWait());
ASSERT_EQUALS(ErrorCodes::NotAReplicaSet, result.getStatus());
@@ -3505,7 +3508,7 @@ TEST_F(ReplCoordTest, IgnoreTheContentsOfMetadataWhenItsConfigVersionDoesNotMatc
"lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastOpVisible"
<< BSON("ts" << Timestamp(10, 0) << "t" << 2) << "configVersion" << 1
<< "primaryIndex" << 2 << "term" << 2 << "syncSourceIndex" << 1)));
- getReplCoord()->processReplSetMetadata(metadata.getValue());
+ getReplCoord()->processReplSetMetadata(metadata.getValue(), true);
ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime());
// higher configVersion
@@ -3515,7 +3518,7 @@ TEST_F(ReplCoordTest, IgnoreTheContentsOfMetadataWhenItsConfigVersionDoesNotMatc
<< BSON("ts" << Timestamp(10, 0) << "t" << 2) << "lastOpVisible"
<< BSON("ts" << Timestamp(10, 0) << "t" << 2) << "configVersion" << 100
<< "primaryIndex" << 2 << "term" << 2 << "syncSourceIndex" << 1)));
- getReplCoord()->processReplSetMetadata(metadata2.getValue());
+ getReplCoord()->processReplSetMetadata(metadata2.getValue(), true);
ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime());
}
@@ -3550,7 +3553,7 @@ TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeFromMet
"lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 1) << "lastOpVisible"
<< BSON("ts" << Timestamp(10, 0) << "t" << 1) << "configVersion" << 2
<< "primaryIndex" << 2 << "term" << 1 << "syncSourceIndex" << 1)));
- getReplCoord()->processReplSetMetadata(metadata.getValue());
+ getReplCoord()->processReplSetMetadata(metadata.getValue(), true);
ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getLastCommittedOpTime());
ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getCurrentCommittedSnapshotOpTime());
@@ -3560,7 +3563,7 @@ TEST_F(ReplCoordTest, UpdateLastCommittedOpTimeWhenTheLastCommittedOpTimeFromMet
"lastOpCommitted" << BSON("ts" << Timestamp(9, 0) << "t" << 1) << "lastOpVisible"
<< BSON("ts" << Timestamp(9, 0) << "t" << 1) << "configVersion" << 2
<< "primaryIndex" << 2 << "term" << 1 << "syncSourceIndex" << 1)));
- getReplCoord()->processReplSetMetadata(metadata2.getValue());
+ getReplCoord()->processReplSetMetadata(metadata2.getValue(), true);
ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getLastCommittedOpTime());
}
@@ -3591,7 +3594,7 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr
"lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastOpVisible"
<< BSON("ts" << Timestamp(10, 0) << "t" << 3) << "configVersion" << 2
<< "primaryIndex" << 2 << "term" << 3 << "syncSourceIndex" << 1)));
- getReplCoord()->processReplSetMetadata(metadata.getValue());
+ getReplCoord()->processReplSetMetadata(metadata.getValue(), true);
ASSERT_EQUALS(OpTime(Timestamp(10, 0), 3), getReplCoord()->getLastCommittedOpTime());
ASSERT_EQUALS(3, getReplCoord()->getTerm());
ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex());
@@ -3602,7 +3605,7 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr
"lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastOpVisible"
<< BSON("ts" << Timestamp(11, 0) << "t" << 3) << "configVersion" << 2
<< "primaryIndex" << 1 << "term" << 2 << "syncSourceIndex" << 1)));
- getReplCoord()->processReplSetMetadata(metadata2.getValue());
+ getReplCoord()->processReplSetMetadata(metadata2.getValue(), true);
ASSERT_EQUALS(OpTime(Timestamp(11, 0), 3), getReplCoord()->getLastCommittedOpTime());
ASSERT_EQUALS(3, getReplCoord()->getTerm());
ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex());
@@ -3613,14 +3616,14 @@ TEST_F(ReplCoordTest, UpdateTermWhenTheTermFromMetadataIsNewerButNeverUpdateCurr
"lastOpCommitted" << BSON("ts" << Timestamp(11, 0) << "t" << 3) << "lastOpVisible"
<< BSON("ts" << Timestamp(11, 0) << "t" << 3) << "configVersion" << 2
<< "primaryIndex" << 1 << "term" << 3 << "syncSourceIndex" << 1)));
- getReplCoord()->processReplSetMetadata(metadata3.getValue());
+ getReplCoord()->processReplSetMetadata(metadata3.getValue(), true);
ASSERT_EQUALS(OpTime(Timestamp(11, 0), 3), getReplCoord()->getLastCommittedOpTime());
ASSERT_EQUALS(3, getReplCoord()->getTerm());
ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex());
}
TEST_F(ReplCoordTest,
- TermAndLastCommittedOpTimeUpdateWhenHeartbeatResponseWithMetadataHasFresherValues) {
+ LastCommittedOpTimeNotUpdatedEvenWhenHeartbeatResponseWithMetadataHasFresherValues) {
// Ensure that the metadata is processed if it is contained in a heartbeat response.
assertStartSuccess(BSON("_id"
<< "mySet"
@@ -3640,7 +3643,61 @@ TEST_F(ReplCoordTest,
auto replCoord = getReplCoord();
auto config = replCoord->getConfig();
- // Higher term - should update term and lastCommittedOpTime.
+ // Higher term - should update term but not last committed optime.
+ StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON(
+ rpc::kReplSetMetadataFieldName
+ << BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastOpVisible"
+ << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "configVersion"
+ << config.getConfigVersion() << "primaryIndex" << 1 << "term" << 3
+ << "syncSourceIndex" << 1)));
+ BSONObjBuilder metadataBuilder;
+ ASSERT_OK(metadata.getValue().writeToMetadata(&metadataBuilder));
+ auto metadataObj = metadataBuilder.obj();
+
+ auto net = getNet();
+ net->enterNetwork();
+
+ ASSERT_TRUE(net->hasReadyRequests());
+ auto noi = net->getNextReadyRequest();
+ const auto& request = noi->getRequest();
+ ASSERT_EQUALS(HostAndPort("node2", 12345), request.target);
+ ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData());
+
+ ReplSetHeartbeatResponse hbResp;
+ hbResp.setConfigVersion(config.getConfigVersion());
+ hbResp.setSetName(config.getReplSetName());
+ hbResp.setState(MemberState::RS_SECONDARY);
+ net->scheduleResponse(noi, net->now(), makeResponseStatus(hbResp.toBSON(true), metadataObj));
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+
+ ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime());
+ ASSERT_EQUALS(3, getReplCoord()->getTerm());
+ ASSERT_EQUALS(-1, getTopoCoord().getCurrentPrimaryIndex());
+}
+
+TEST_F(ReplCoordTest, TermAndLastCommittedOpTimeUpdatedFromHeartbeatWhenArbiter) {
+ // Ensure that the metadata is processed if it is contained in a heartbeat response.
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0 << "arbiterOnly" << true)
+ << BSON("host"
+ << "node2:12345"
+ << "_id" << 1)) << "protocolVersion" << 1),
+ HostAndPort("node1", 12345));
+ ASSERT_EQUALS(OpTime(Timestamp(0, 0), 0), getReplCoord()->getLastCommittedOpTime());
+ OperationContextNoop txn;
+ getReplCoord()->updateTerm(&txn, 1);
+ ASSERT_EQUALS(1, getReplCoord()->getTerm());
+
+ auto replCoord = getReplCoord();
+ auto config = replCoord->getConfig();
+
+ // Higher term - should update term and lastCommittedOpTime since arbiters learn of the
+ // commit point via heartbeats.
StatusWith<rpc::ReplSetMetadata> metadata = rpc::ReplSetMetadata::readFromMetadata(BSON(
rpc::kReplSetMetadataFieldName
<< BSON("lastOpCommitted" << BSON("ts" << Timestamp(10, 0) << "t" << 3) << "lastOpVisible"
@@ -3836,7 +3893,7 @@ TEST_F(ReplCoordTest,
}
TEST_F(ReplCoordTest,
- CancelAndRescheduleElectionTimeoutWhenProcessingHeartbeatResponseFromPrimary) {
+ RescheduleElectionTimeoutWhenProcessingHeartbeatResponseFromPrimaryInSameTerm) {
assertStartSuccess(BSON("_id"
<< "mySet"
<< "protocolVersion" << 1 << "version" << 2 << "members"
@@ -3868,6 +3925,8 @@ TEST_F(ReplCoordTest,
ReplSetHeartbeatResponse hbResp;
hbResp.setSetName("mySet");
hbResp.setState(MemberState::RS_PRIMARY);
+ hbResp.setTerm(replCoord->getTerm());
+
// Heartbeat response is scheduled with a delay so that we can be sure that
// the election was rescheduled due to the heartbeat response.
auto heartbeatWhen = net->now() + Seconds(1);
@@ -3882,6 +3941,54 @@ TEST_F(ReplCoordTest,
}
TEST_F(ReplCoordTest,
+ DontRescheduleElectionTimeoutWhenProcessingHeartbeatResponseFromPrimaryInDiffertTerm) {
+ assertStartSuccess(BSON("_id"
+ << "mySet"
+ << "protocolVersion" << 1 << "version" << 2 << "members"
+ << BSON_ARRAY(BSON("host"
+ << "node1:12345"
+ << "_id" << 0)
+ << BSON("host"
+ << "node2:12345"
+ << "_id" << 1))),
+ HostAndPort("node1", 12345));
+
+ ReplicationCoordinatorImpl* replCoord = getReplCoord();
+ ASSERT_TRUE(replCoord->setFollowerMode(MemberState::RS_SECONDARY));
+
+ auto electionTimeoutWhen = replCoord->getElectionTimeout_forTest();
+ ASSERT_NOT_EQUALS(Date_t(), electionTimeoutWhen);
+
+ auto net = getNet();
+ net->enterNetwork();
+ ASSERT_TRUE(net->hasReadyRequests());
+ auto noi = net->getNextReadyRequest();
+ auto&& request = noi->getRequest();
+ log() << "processing " << request.cmdObj;
+ ASSERT_EQUALS(HostAndPort("node2", 12345), request.target);
+
+ ASSERT_EQUALS("replSetHeartbeat", request.cmdObj.firstElement().fieldNameStringData());
+
+ // Respond to node1's heartbeat command to indicate that node2 is PRIMARY.
+ ReplSetHeartbeatResponse hbResp;
+ hbResp.setSetName("mySet");
+ hbResp.setState(MemberState::RS_PRIMARY);
+ hbResp.setTerm(replCoord->getTerm() - 1);
+
+ // Heartbeat response is scheduled with a delay so that we can be sure that
+ // the election was rescheduled due to the heartbeat response.
+ auto heartbeatWhen = net->now() + Seconds(1);
+ net->scheduleResponse(noi, heartbeatWhen, makeResponseStatus(hbResp.toBSON(true)));
+ net->runUntil(heartbeatWhen);
+ ASSERT_EQUALS(heartbeatWhen, net->now());
+ net->runReadyNetworkOperations();
+ net->exitNetwork();
+
+ ASSERT_GREATER_THAN(heartbeatWhen + replCoord->getConfig().getElectionTimeoutPeriod(),
+ replCoord->getElectionTimeout_forTest());
+}
+
+TEST_F(ReplCoordTest,
CancelAndRescheduleElectionTimeoutWhenProcessingHeartbeatResponseWithoutState) {
assertStartSuccess(BSON("_id"
<< "mySet"
@@ -4241,7 +4348,7 @@ TEST_F(ReplCoordTest, NewStyleUpdatePositionCmdHasMetadata) {
// Set last committed optime via metadata.
rpc::ReplSetMetadata syncSourceMetadata(optime.getTerm(), optime, optime, 1, OID(), -1, 1);
- getReplCoord()->processReplSetMetadata(syncSourceMetadata);
+ getReplCoord()->processReplSetMetadata(syncSourceMetadata, true);
getReplCoord()->onSnapshotCreate(optime, SnapshotName(1));
BSONObjBuilder cmdBuilder;
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index cb763bed4ff..df4a83c6f07 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -236,7 +236,8 @@ void ReplicationCoordinatorMock::processReplSetGetConfig(BSONObjBuilder* result)
// TODO
}
-void ReplicationCoordinatorMock::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata) {}
+void ReplicationCoordinatorMock::processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata,
+ bool advanceCommitPoint) {}
void ReplicationCoordinatorMock::cancelAndRescheduleElectionTimeout() {}
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 861799f00d2..24eba706cfe 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -150,7 +150,8 @@ public:
virtual void processReplSetGetConfig(BSONObjBuilder* result);
- virtual void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata);
+ void processReplSetMetadata(const rpc::ReplSetMetadata& replMetadata,
+ bool advanceCommitPoint) override;
virtual void cancelAndRescheduleElectionTimeout() override;
diff --git a/src/mongo/db/repl/replset_commands.cpp b/src/mongo/db/repl/replset_commands.cpp
index e6d90aa82ec..0c202e906a1 100644
--- a/src/mongo/db/repl/replset_commands.cpp
+++ b/src/mongo/db/repl/replset_commands.cpp
@@ -681,7 +681,7 @@ public:
// New style update position command has metadata, which may inform the
// upstream of a higher term.
auto metadata = metadataResult.getValue();
- replCoord->processReplSetMetadata(metadata);
+ replCoord->processReplSetMetadata(metadata, false /*don't advance the commit point*/);
}
// In the case of an update from a member with an invalid replica set config,
diff --git a/src/mongo/db/write_concern.cpp b/src/mongo/db/write_concern.cpp
index 3c9086ca39a..9dc04670dc4 100644
--- a/src/mongo/db/write_concern.cpp
+++ b/src/mongo/db/write_concern.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/storage/storage_engine.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/rpc/protocol.h"
+#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -72,6 +73,8 @@ namespace {
const std::string kLocalDB = "local";
} // namespace
+MONGO_FP_DECLARE(hangBeforeWaitingForWriteConcern);
+
StatusWith<WriteConcernOptions> extractWriteConcern(OperationContext* txn,
const BSONObj& cmdObj,
const std::string& dbName) {
@@ -234,6 +237,8 @@ Status waitForWriteConcern(OperationContext* txn,
// This check does not hold for writes done through dbeval because it runs with a global X lock.
dassert(!txn->lockState()->isLocked() || txn->getClient()->isInDirectClient());
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeWaitingForWriteConcern);
+
// Next handle blocking on disk
Timer syncTimer;
diff --git a/src/mongo/shell/assert.js b/src/mongo/shell/assert.js
index 0d1e225a990..ba9333fb9cf 100644
--- a/src/mongo/shell/assert.js
+++ b/src/mongo/shell/assert.js
@@ -452,16 +452,33 @@ assert.writeOK = function(res, msg) {
};
assert.writeError = function(res, msg) {
+ return assert.writeErrorWithCode(res, null, msg);
+};
+
+assert.writeErrorWithCode = function(res, expectedCode, msg) {
var errMsg = null;
+ var foundCode = null;
if (res instanceof WriteResult) {
- if (!res.hasWriteError() && !res.hasWriteConcernError()) {
+ if (res.hasWriteError()) {
+ foundCode = res.getWriteError().code;
+ } else if (res.hasWriteConcernError()) {
+ foundCode = res.getWriteConcernError().code;
+ } else {
errMsg = "no write error: " + tojson(res);
}
} else if (res instanceof BulkWriteResult) {
// Can only happen with bulk inserts
- if (!res.hasWriteErrors() && !res.hasWriteConcernError()) {
+ if (res.hasWriteErrors()) {
+ if (res.getWriteErrorCount() > 1 && expectedCode != null) {
+ errMsg = "can't check for specific code when there was more than one write error";
+ } else {
+ foundCode = res.getWriteErrorAt(0).code;
+ }
+ } else if (res.hasWriteConcernError()) {
+ foundCode = res.getWriteConcernError().code;
+ } else {
errMsg = "no write errors: " + tojson(res);
}
} else if (res instanceof WriteCommandError) {
@@ -473,6 +490,12 @@ assert.writeError = function(res, msg) {
}
}
+ if (!errMsg && expectedCode) {
+ if (foundCode != expectedCode) {
+ errMsg = "found code " + foundCode + " does not match expected code " + expectedCode;
+ }
+ }
+
if (errMsg) {
if (msg)
errMsg = errMsg + ": " + msg;