summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathias Stearn <redbeard0531@gmail.com>2016-10-07 18:42:20 -0400
committerMathias Stearn <redbeard0531@gmail.com>2016-10-17 14:36:31 -0400
commit5db0a55a264ee326bff5598249639ef479628f37 (patch)
tree8a92aadd22d5be199af9b87e1f6388f1bd0ec632
parent99f8d760848b0be69b8934b00d33552b1295d5d9 (diff)
downloadmongo-5db0a55a264ee326bff5598249639ef479628f37.tar.gz
SERVER-7200 Write oplog entries on secondaries before applying
Manual backport of 34c6c691a038eac1ac3ee16e1eedc54aab964774 along with fixes and tests from: b5d2b06f8a08171fd96ef8d128c4f7ecedcb8f93 dc83fb0433fcae6e72f035df7458473b59223eb5 fec839b99f4b9e08016112fe8b9492e327af91b8 bf86770c8a5de97b30bc008ad59e34de99065c60
-rw-r--r--jstests/noPassthrough/minvalid.js7
-rw-r--r--jstests/replsets/clean_shutdown_oplog_state.js24
-rw-r--r--jstests/replsets/oplog_replay_on_startup.js419
-rw-r--r--jstests/replsets/oplog_truncated_on_recovery.js45
-rw-r--r--jstests/replsets/slave_delay_clean_shutdown.js61
-rw-r--r--src/mongo/db/instance.cpp4
-rw-r--r--src/mongo/db/repl/SConscript2
-rw-r--r--src/mongo/db/repl/bgsync.cpp13
-rw-r--r--src/mongo/db/repl/minvalid.cpp195
-rw-r--r--src/mongo/db/repl/minvalid.h51
-rw-r--r--src/mongo/db/repl/oplog.cpp41
-rw-r--r--src/mongo/db/repl/oplogreader.cpp3
-rw-r--r--src/mongo/db/repl/replication_coordinator.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h16
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp138
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h4
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp10
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h4
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp35
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp3
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.h2
-rw-r--r--src/mongo/db/repl/replication_coordinator_test_fixture.cpp3
-rw-r--r--src/mongo/db/repl/roll_back_local_operations.cpp19
-rw-r--r--src/mongo/db/repl/roll_back_local_operations.h3
-rw-r--r--src/mongo/db/repl/roll_back_local_operations_test.cpp12
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp20
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp13
-rw-r--r--src/mongo/db/repl/rs_rollback_test.cpp7
-rw-r--r--src/mongo/db/repl/sync_tail.cpp117
-rw-r--r--src/mongo/db/repl/sync_tail.h9
-rw-r--r--src/mongo/shell/assert.js3
-rw-r--r--src/mongo/shell/replsettest.js6
34 files changed, 960 insertions, 335 deletions
diff --git a/jstests/noPassthrough/minvalid.js b/jstests/noPassthrough/minvalid.js
index 6f22e65e2ca..d31f6d58da7 100644
--- a/jstests/noPassthrough/minvalid.js
+++ b/jstests/noPassthrough/minvalid.js
@@ -21,9 +21,10 @@ var lastOp = local.oplog.rs.find().sort({$natural: -1}).limit(1).next();
printjson(lastOp);
print("3: change minvalid");
-// primaries don't populate minvalid by default
-local.replset.minvalid.insert(
- {ts: new Timestamp(lastOp.ts.t, lastOp.ts.i + 1), h: new NumberLong("1234567890")});
+assert.writeOK(local.replset.minvalid.update(
+ {},
+ {$set: {ts: new Timestamp(lastOp.ts.t, lastOp.ts.i + 1), h: new NumberLong("1234567890")}},
+ {upsert: true}));
printjson(local.replset.minvalid.findOne());
print("4: restart");
diff --git a/jstests/replsets/clean_shutdown_oplog_state.js b/jstests/replsets/clean_shutdown_oplog_state.js
index 51dba43ff98..3a856db1245 100644
--- a/jstests/replsets/clean_shutdown_oplog_state.js
+++ b/jstests/replsets/clean_shutdown_oplog_state.js
@@ -57,9 +57,10 @@
var conn = MongoRunner.runMongod(options);
assert.neq(null, conn, "secondary failed to start");
- // Following a clean shutdown of a 3.2 node, the oplog must exactly match the applied
- // operations. Additionally, the begin field must not be in the minValid document and the ts
- // must match the top of the oplog (SERVER-25353).
+ // Following clean shutdown of a node, the oplog must exactly match the applied operations.
+ // Additionally, the begin field must not be in the minValid document, the ts must match the
+ // top of the oplog (SERVER-25353), and the oplogDeleteFromPoint must be null (SERVER-7200 and
+ // SERVER-25071).
var oplogDoc = conn.getCollection('local.oplog.rs')
.find({ns: 'test.coll'})
.sort({$natural: -1})
@@ -68,9 +69,20 @@
var minValidDoc =
conn.getCollection('local.replset.minvalid').find().sort({$natural: -1}).limit(1)[0];
printjson({oplogDoc: oplogDoc, collDoc: collDoc, minValidDoc: minValidDoc});
- assert.eq(collDoc._id, oplogDoc.o._id);
- assert(!('begin' in minValidDoc), 'begin in minValidDoc');
- assert.eq(minValidDoc.ts, oplogDoc.ts);
+ try {
+ assert.eq(collDoc._id, oplogDoc.o._id);
+ assert(!('begin' in minValidDoc), 'begin in minValidDoc');
+ assert.eq(minValidDoc.ts, oplogDoc.ts);
+ if ('oplogDeleteFromPoint' in minValidDoc) {
+ // If present it must be the null timestamp.
+ assert.eq(minValidDoc.oplogDeleteFromPoint, Timestamp());
+ }
+ } catch (e) {
+ jsTest.log(
+ "Look above and make sure clean shutdown finished without resorting to SIGKILL." +
+ "\nUnfortunately that currently doesn't fail the test.");
+ throw e;
+ }
rst.stopSet();
})();
diff --git a/jstests/replsets/oplog_replay_on_startup.js b/jstests/replsets/oplog_replay_on_startup.js
new file mode 100644
index 00000000000..0864c781d0c
--- /dev/null
+++ b/jstests/replsets/oplog_replay_on_startup.js
@@ -0,0 +1,419 @@
+// SERVER-7200 On startup, replica set nodes delete oplog state past the oplog delete point and
+// apply any remaining unapplied ops before coming up as a secondary.
+//
+// @tags: [requires_persistence]
+(function() {
+ "use strict";
+
+ var ns = "test.coll";
+
+ var rst = new ReplSetTest({
+ nodes: 1,
+ });
+
+ rst.startSet();
+ rst.initiate();
+
+ var conn = rst.getPrimary(); // Waits for PRIMARY state.
+ var term = conn.getCollection('local.oplog.rs').find().sort({$natural: -1}).limit(1).next().t;
+ if (typeof(term) == 'undefined') {
+ term = -1; // Use a dummy term for PV0.
+ }
+
+ function runTest({
+ oplogEntries,
+ collectionContents,
+ deletePoint,
+ begin,
+ minValid,
+ expectedState,
+ expectedApplied,
+ }) {
+ if (term != -1) {
+ term++; // Each test gets a new term on PV1 to ensure OpTimes always move forward.
+ }
+
+ conn = rst.restart(0, {noReplSet: true}); // Restart as a standalone node.
+ assert.neq(null, conn, "failed to restart");
+ var oplog = conn.getCollection('local.oplog.rs');
+ var minValidColl = conn.getCollection('local.replset.minvalid');
+ var coll = conn.getCollection(ns);
+
+ // Reset state to empty.
+ assert.commandWorked(oplog.runCommand('emptycapped'));
+ coll.drop();
+ assert.commandWorked(coll.runCommand('create'));
+
+ var ts = (num) => num === null ? Timestamp() : Timestamp(1000, num);
+
+ oplogEntries.forEach((num) => {
+ assert.writeOK(oplog.insert({
+ ts: ts(num),
+ t: term,
+ h: NumberLong(1),
+ op: 'i',
+ ns: ns,
+ o: {_id: num},
+ }));
+ });
+
+ collectionContents.forEach((num) => {
+ assert.writeOK(coll.insert({_id: num}));
+ });
+
+ var injectedMinValidDoc = {
+ _id: ObjectId(),
+
+ // minvalid:
+ ts: ts(minValid),
+ t: term,
+
+ // appliedThrough
+ begin: {
+ ts: ts(begin),
+ t: term,
+ },
+
+ oplogDeleteFromPoint: ts(deletePoint),
+ };
+
+ // This weird mechanism is the only way to bypass mongod's attempt to fill in null
+ // Timestamps.
+ assert.writeOK(minValidColl.remove({}));
+ assert.writeOK(minValidColl.update({}, {$set: injectedMinValidDoc}, {upsert: true}));
+ assert.eq(minValidColl.findOne(),
+ injectedMinValidDoc,
+ "If the Timestamps differ, the server may be filling in the null timestamps");
+
+ try {
+ conn = rst.restart(0); // Restart in replSet mode again.
+ } catch (e) {
+ assert.eq(expectedState, 'FATAL', 'node failed to restart: ' + e);
+ return;
+ }
+
+ // Wait for the node to go to SECONDARY if it is able.
+ assert.soon(
+ () =>
+ conn.adminCommand('serverStatus').metrics.repl.apply.attemptsToBecomeSecondary > 0,
+ () => conn.adminCommand('serverStatus').metrics.repl.apply.attemptsToBecomeSecondary);
+
+ var isMaster = conn.adminCommand('ismaster');
+ switch (expectedState) {
+ case 'SECONDARY':
+ // Primary is also acceptable since once a node becomes secondary, it will try to
+ // become primary if it is eligible and has enough votes (which this node does).
+ // This is supposed to test that we reach secondary, not that we stay there.
+ assert(isMaster.ismaster || isMaster.secondary,
+ 'not PRIMARY or SECONDARY: ' + tojson(isMaster));
+
+ // Wait for node to become primary. This is nesessary to avoid the find below
+ // failing with "NotMasterOrSecondary" errors if it happens to run while the
+ // node is in drain mode while becoming primary.
+ conn = rst.getPrimary();
+ break;
+
+ case 'RECOVERING':
+ assert(!isMaster.ismaster && !isMaster.secondary,
+ 'not in RECOVERING: ' + tojson(isMaster));
+
+ // Restart as a standalone node again so we can read from the collection.
+ conn = rst.restart(0, {noReplSet: true});
+ break;
+
+ case 'FATAL':
+ doassert("server startup didn't fail when it should have");
+ break;
+
+ default:
+ doassert('expectedState ' + expectedState + ' is not supported');
+ }
+
+ // Ensure the oplog has the entries it should have and none that it shouldn't.
+ assert.eq(conn.getCollection('local.oplog.rs')
+ .find({ns: ns, op: 'i'})
+ .sort({$natural: 1})
+ .map((op) => op.o._id),
+ expectedApplied);
+
+ // Ensure that all ops that should have been applied were.
+ conn.setSlaveOk(true);
+ assert.eq(conn.getCollection(ns).find().sort({_id: 1}).map((obj) => obj._id),
+ expectedApplied);
+ }
+
+ //
+ // Normal 3.4 cases
+ //
+
+ runTest({
+ oplogEntries: [1, 2, 3],
+ collectionContents: [1, 2, 3],
+ deletePoint: null,
+ begin: null,
+ minValid: null,
+ expectedState: 'SECONDARY',
+ expectedApplied: [1, 2, 3],
+ });
+
+ runTest({
+ oplogEntries: [1, 2, 3],
+ collectionContents: [1, 2, 3],
+ deletePoint: null,
+ begin: null,
+ minValid: 2,
+ expectedState: 'SECONDARY',
+ expectedApplied: [1, 2, 3],
+ });
+
+ runTest({
+ oplogEntries: [1, 2, 3],
+ collectionContents: [1, 2, 3],
+ deletePoint: null,
+ begin: null,
+ minValid: 3,
+ expectedState: 'SECONDARY',
+ expectedApplied: [1, 2, 3],
+ });
+
+ runTest({
+ oplogEntries: [1, 2, 3],
+ collectionContents: [1, 2, 3],
+ deletePoint: null,
+ begin: 3,
+ minValid: 3,
+ expectedState: 'SECONDARY',
+ expectedApplied: [1, 2, 3],
+ });
+
+ runTest({
+ oplogEntries: [1, 2, 3],
+ collectionContents: [1, 2, 3],
+ deletePoint: 4,
+ begin: 3,
+ minValid: 3,
+ expectedState: 'SECONDARY',
+ expectedApplied: [1, 2, 3],
+ });
+
+ runTest({
+ oplogEntries: [1, 2, 3, 4, 5, 6],
+ collectionContents: [1, 2, 3],
+ deletePoint: 4,
+ begin: 3,
+ minValid: 3,
+ expectedState: 'SECONDARY',
+ expectedApplied: [1, 2, 3],
+ });
+
+ runTest({
+ oplogEntries: [1, 2, 3, /*4,*/ 5, 6],
+ collectionContents: [1, 2, 3],
+ deletePoint: 4,
+ begin: 3,
+ minValid: 3,
+ expectedState: 'SECONDARY',
+ expectedApplied: [1, 2, 3],
+ });
+
+ runTest({
+ oplogEntries: [1, 2, 3, 4, 5, 6],
+ collectionContents: [1, 2, 3],
+ deletePoint: null,
+ begin: 3,
+ minValid: 3,
+ expectedState: 'SECONDARY',
+ expectedApplied: [1, 2, 3, 4, 5, 6],
+ });
+
+ runTest({
+ oplogEntries: [1, 2, 3, 4, 5, 6],
+ collectionContents: [1, 2, 3],
+ deletePoint: null,
+ begin: 3,
+ minValid: 6,
+ expectedState: 'SECONDARY',
+ expectedApplied: [1, 2, 3, 4, 5, 6],
+ });
+
+ //
+ // 3.2 -> 3.4 upgrade cases
+ //
+
+ runTest({
+ oplogEntries: [1, 2, 3],
+ collectionContents: [1, 2, 3],
+ deletePoint: null,
+ begin: 3,
+ minValid: 6,
+ expectedState: 'RECOVERING',
+ expectedApplied: [1, 2, 3],
+ });
+
+ runTest({
+ oplogEntries: [1, 2, 3, 4, 5],
+ collectionContents: [1, 2, 3],
+ deletePoint: null,
+ begin: 3,
+ minValid: 6,
+ expectedState: 'RECOVERING',
+ expectedApplied: [1, 2, 3, 4, 5],
+ });
+
+ runTest({
+ oplogEntries: [1, 2, 3, 4, 5],
+ collectionContents: [1, 2, 3, 4, 5],
+ deletePoint: null,
+ begin: null,
+ minValid: 6,
+ expectedState: 'RECOVERING',
+ expectedApplied: [1, 2, 3, 4, 5],
+ });
+
+ //
+ // 3.4 -> 3.2 -> 3.4 downgrade/reupgrade cases
+ //
+
+ runTest({
+ oplogEntries: [1, 2, 3],
+ collectionContents: [1, 2, 3],
+ deletePoint: 4,
+ begin: 3,
+ minValid: 6,
+ expectedState: 'RECOVERING',
+ expectedApplied: [1, 2, 3],
+ });
+
+ runTest({
+ oplogEntries: [1, 2, 3, 4, 5],
+ collectionContents: [1, 2, 3],
+ deletePoint: 4,
+ begin: 3,
+ minValid: 6,
+ expectedState: 'RECOVERING',
+ expectedApplied: [1, 2, 3],
+ });
+
+ runTest({
+ oplogEntries: [1, 2, 3, /*4,*/ 5, 6],
+ collectionContents: [1, 2, 3],
+ deletePoint: 4,
+ begin: 3,
+ minValid: 6,
+ expectedState: 'RECOVERING',
+ expectedApplied: [1, 2, 3],
+ });
+
+ runTest({
+ oplogEntries: [1, 2, 3],
+ collectionContents: [1, 2, 3],
+ deletePoint: 2,
+ begin: null,
+ minValid: 3,
+ expectedState: 'SECONDARY',
+ expectedApplied: [1, 2, 3],
+ });
+
+ runTest({
+ oplogEntries: [1, 2, 3],
+ collectionContents: [1, 2, 3],
+ deletePoint: 2,
+ begin: 3,
+ minValid: 6,
+ expectedState: 'RECOVERING',
+ expectedApplied: [1, 2, 3],
+ });
+
+ runTest({
+ oplogEntries: [1, 2, 3, 4, 5],
+ collectionContents: [1, 2, 3],
+ deletePoint: 2,
+ begin: 3,
+ minValid: 6,
+ expectedState: 'RECOVERING',
+ expectedApplied: [1, 2, 3, 4, 5],
+ });
+
+ runTest({
+ oplogEntries: [1, 2, 3, 4, 5, 6],
+ collectionContents: [1, 2, 3],
+ deletePoint: 2,
+ begin: 3,
+ minValid: 6,
+ expectedState: 'SECONDARY',
+ expectedApplied: [1, 2, 3, 4, 5, 6],
+ });
+
+ //
+ // These states should be impossible to get into.
+ //
+
+ runTest({
+ oplogEntries: [1, 2, 3],
+ collectionContents: [1, 2, 3, 4],
+ deletePoint: null,
+ begin: 4,
+ minValid: null, // doesn't matter.
+ expectedState: 'FATAL',
+ });
+
+ runTest({
+ oplogEntries: [4, 5, 6],
+ collectionContents: [1, 2],
+ deletePoint: 2,
+ begin: 3,
+ minValid: null, // doesn't matter.
+ expectedState: 'FATAL',
+ });
+
+ runTest({
+ oplogEntries: [4, 5, 6],
+ collectionContents: [1, 2],
+ deletePoint: null,
+ begin: 3,
+ minValid: null, // doesn't matter.
+ expectedState: 'FATAL',
+ });
+
+ runTest({
+ oplogEntries: [1, 2, 3, 4, 5, 6],
+ collectionContents: [1, 2, 3],
+ deletePoint: 2,
+ begin: 3,
+ minValid: 3,
+ expectedState: 'SECONDARY',
+ expectedApplied: [1, 2, 3, 4, 5, 6],
+ });
+
+ runTest({
+ oplogEntries: [1, 2, 3, 4, 5, 6],
+ collectionContents: [1, 2, 3, 4, 5],
+ deletePoint: null,
+ begin: 5,
+ minValid: 3,
+ expectedState: 'SECONDARY',
+ expectedApplied: [1, 2, 3, 4, 5, 6],
+ });
+
+ runTest({
+ oplogEntries: [1, 2, 3, 4, 5, 6],
+ collectionContents: [1, 2, 3, 4, 5],
+ deletePoint: null,
+ begin: 5,
+ minValid: null,
+ expectedState: 'SECONDARY',
+ expectedApplied: [1, 2, 3, 4, 5, 6],
+ });
+
+ runTest({
+ oplogEntries: [1, 2, 3, 4, 5],
+ collectionContents: [1],
+ deletePoint: 4,
+ begin: 1,
+ minValid: 3,
+ expectedState: 'SECONDARY',
+ expectedApplied: [1, 2, 3],
+ });
+
+ rst.stopSet();
+})();
diff --git a/jstests/replsets/oplog_truncated_on_recovery.js b/jstests/replsets/oplog_truncated_on_recovery.js
index 96be5865cc3..4d469178691 100644
--- a/jstests/replsets/oplog_truncated_on_recovery.js
+++ b/jstests/replsets/oplog_truncated_on_recovery.js
@@ -1,10 +1,10 @@
/**
- * This test will ensure that a failed a batch apply will remove the any oplog
+ * This test will ensure that recovery from a failed batch application will remove the oplog
* entries from that batch.
*
* To do this we:
* -- Create single node replica set
- * -- Set minvalid manually on primary way ahead (5 minutes)
+ * -- Set minvalid manually on primary way ahead (5 days)
* -- Write some oplog entries newer than minvalid.start
* -- Ensure restarted primary comes up in recovering and truncates the oplog
* -- Success!
@@ -40,35 +40,36 @@
// Write op
log(assert.writeOK(testDB.foo.save({_id: 1, a: 1}, {writeConcern: {w: 1}})));
- // Set minvalid to something far in the future for the current primary, to
- // simulate recovery.
- // Note: This is so far in the future (5 days) that it will never become
- // secondary.
+ // Set minvalid to something far in the future for the current primary, to simulate recovery.
+ // Note: This is so far in the future (5 days) that it will never become secondary.
var farFutureTS = new Timestamp(
Math.floor(new Date().getTime() / 1000) + (60 * 60 * 24 * 5 /* in five days */), 0);
var rsgs = assert.commandWorked(localDB.adminCommand("replSetGetStatus"));
log(rsgs);
var primaryOpTime = rsgs.members[0].optime;
- var primaryLastTS = rsgs.members[0].optime.ts;
- log(primaryLastTS);
+ log(primaryOpTime);
// Set the start of the failed batch
- primaryOpTime.ts = new Timestamp(primaryOpTime.ts.t, primaryOpTime.ts.i + 1);
+ // TODO this test should restart in stand-alone mode to futz with the state rather than trying
+ // to do it on a running primary.
- log(primaryLastTS);
jsTest.log("future TS: " + tojson(farFutureTS) + ", date:" + tsToDate(farFutureTS));
- // We do an update in case there is a minvalid document on the primary
- // already.
- // If the doc doesn't exist then upsert:true will create it, and the
- // writeConcern ensures
- // that update returns details of the write, like whether an update or
- // insert was performed.
- log(assert.writeOK(
- minvalidColl.update({},
- {ts: farFutureTS, t: NumberLong(-1), begin: primaryOpTime},
- {upsert: true, writeConcern: {w: 1}})));
+ var divergedTS = new Timestamp(primaryOpTime.ts.t, primaryOpTime.ts.i + 1);
+ // We do an update in case there is a minvalid document on the primary already.
+ // If the doc doesn't exist then upsert:true will create it, and the writeConcern ensures
+ // that update returns details of the write, like whether an update or insert was performed.
+ log(assert.writeOK(minvalidColl.update({},
+ {
+ ts: farFutureTS,
+ t: NumberLong(-1),
+ begin: primaryOpTime,
+ oplogDeleteFromPoint: divergedTS
+ },
+ {upsert: true, writeConcern: {w: 1}})));
- log(assert.writeOK(localDB.oplog.rs.insert({_id: 0, ts: primaryOpTime.ts, op: "n", term: -1})));
+ // Insert a diverged oplog entry that will be truncated after restart.
+ log(assert.writeOK(localDB.oplog.rs.insert(
+ {_id: 0, ts: divergedTS, op: "n", h: NumberLong(0), t: NumberLong(-1)})));
log(localDB.oplog.rs.find().toArray());
log(assert.commandWorked(localDB.adminCommand("replSetGetStatus")));
log("restart primary");
@@ -88,7 +89,7 @@
var lastTS = localDB.oplog.rs.find().sort({$natural: -1}).limit(-1).next().ts;
log(localDB.oplog.rs.find().toArray());
- assert.eq(primaryLastTS, lastTS);
+ assert.eq(primaryOpTime.ts, lastTS);
return true;
});
diff --git a/jstests/replsets/slave_delay_clean_shutdown.js b/jstests/replsets/slave_delay_clean_shutdown.js
new file mode 100644
index 00000000000..db08dfab228
--- /dev/null
+++ b/jstests/replsets/slave_delay_clean_shutdown.js
@@ -0,0 +1,61 @@
+// SERVER-21118 don't hang at shutdown or apply ops too soon with slaveDelay.
+//
+// @tags: [requires_persistence]
+load('jstests/replsets/rslib.js');
+(function() {
+ "use strict";
+
+ var ns = "test.coll";
+
+ var rst = new ReplSetTest({
+ nodes: 2,
+ });
+
+ var conf = rst.getReplSetConfig();
+ conf.members[1].votes = 0;
+ conf.members[1].priority = 0;
+ conf.members[1].hidden = true;
+ conf.members[1].slaveDelay = 0; // Set later.
+
+ rst.startSet();
+ rst.initiate(conf);
+
+ var master = rst.getPrimary(); // Waits for PRIMARY state.
+
+ // Push some ops through before setting slave delay.
+ assert.writeOK(master.getCollection(ns).insert([{}, {}, {}], {writeConcern: {w: 2}}));
+
+ // Set slaveDelay and wait for secondary to receive the change.
+ conf = rst.getReplSetConfigFromNode();
+ conf.version++;
+ conf.members[1].slaveDelay = 24 * 60 * 60;
+ reconfig(rst, conf);
+ assert.soon(() => rst.getReplSetConfigFromNode(1).members[1].slaveDelay > 0,
+ () => rst.getReplSetConfigFromNode(1));
+
+ sleep(2000); // The secondary apply loop only checks for slaveDelay changes once per second.
+ var secondary = rst.getSecondary();
+ const lastOp = getLatestOp(secondary);
+
+ assert.writeOK(master.getCollection(ns).insert([{}, {}, {}]));
+ assert.soon(() => secondary.adminCommand('serverStatus').metrics.repl.buffer.count > 0,
+ () => secondary.adminCommand('serverStatus').metrics.repl);
+ assert.neq(getLatestOp(master), lastOp);
+ assert.eq(getLatestOp(secondary), lastOp);
+
+ sleep(2000); // Prevent the test from passing by chance.
+ assert.eq(getLatestOp(secondary), lastOp);
+
+ // Make sure shutdown won't take a long time due to I/O.
+ secondary.adminCommand('fsync');
+
+ // Shutting down shouldn't take long.
+ assert.lt(Date.timeFunc(() => rst.stop(1)), 60 * 1000);
+
+ secondary = rst.restart(1);
+ assert.eq(getLatestOp(secondary), lastOp);
+ sleep(2000); // Prevent the test from passing by chance.
+ assert.eq(getLatestOp(secondary), lastOp);
+
+ rst.stopSet();
+})();
diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp
index 86576565532..2bd2d003b49 100644
--- a/src/mongo/db/instance.cpp
+++ b/src/mongo/db/instance.cpp
@@ -1273,8 +1273,6 @@ void exitCleanly(ExitCode code) {
getGlobalServiceContext()->setKillAllOperations();
- repl::getGlobalReplicationCoordinator()->shutdown();
-
Client& client = cc();
ServiceContext::UniqueOperationContext uniqueTxn;
OperationContext* txn = client.getOperationContext();
@@ -1283,6 +1281,8 @@ void exitCleanly(ExitCode code) {
txn = uniqueTxn.get();
}
+
+ repl::getGlobalReplicationCoordinator()->shutdown(txn);
ShardingState::get(txn)->shutDown(txn);
// We should always be able to acquire the global lock at shutdown.
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 2563de2d9ea..007a37c5c25 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -710,7 +710,9 @@ env.Library(
'roll_back_local_operations.cpp',
],
LIBDEPS=[
+ 'optime',
'$BUILD_DIR/mongo/base',
+ '$BUILD_DIR/mongo/bson/util/bson_extract',
'$BUILD_DIR/mongo/util/foundation',
],
)
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index e34c4f73335..a461d5f2c8c 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -325,6 +325,13 @@ void BackgroundSync::_produce(OperationContext* txn) {
syncSourceReader.resetConnection();
// no more references to oplog reader from here on.
+ // Set the applied point if unset. This is most likely the first time we've established a sync
+ // source since stepping down or otherwise clearing the applied point. We need to set this here,
+ // before the OplogWriter gets a chance to append to the oplog.
+ if (getAppliedThrough(txn).isNull()) {
+ setAppliedThrough(txn, _replCoord->getMyLastAppliedOpTime());
+ }
+
Status fetcherReturnStatus = Status::OK();
auto fetcherCallback = stdx::bind(&BackgroundSync::_fetcherCallback,
this,
@@ -424,13 +431,13 @@ void BackgroundSync::_produce(OperationContext* txn) {
}
// check that we are at minvalid, otherwise we cannot roll back as we may be in an
// inconsistent state
- BatchBoundaries boundaries = getMinValid(txn);
- if (!boundaries.start.isNull() || boundaries.end > lastApplied) {
+ const auto minValid = getMinValid(txn);
+ if (lastApplied < minValid) {
fassertNoTrace(18750,
Status(ErrorCodes::UnrecoverableRollbackError,
str::stream()
<< "need to rollback, but in inconsistent state. "
- << "minvalid: " << boundaries.end.toString()
+ << "minvalid: " << minValid.toString()
<< " > our last optime: " << lastApplied.toString()));
}
diff --git a/src/mongo/db/repl/minvalid.cpp b/src/mongo/db/repl/minvalid.cpp
index 90753cff0f4..02a56cfab2f 100644
--- a/src/mongo/db/repl/minvalid.cpp
+++ b/src/mongo/db/repl/minvalid.cpp
@@ -47,121 +47,142 @@ namespace mongo {
namespace repl {
namespace {
-const char initialSyncFlagString[] = "doingInitialSync";
-const BSONObj initialSyncFlag(BSON(initialSyncFlagString << true));
-const char minvalidNS[] = "local.replset.minvalid";
-const char beginFieldName[] = "begin";
-} // namespace
+const char kInitialSyncFlagFieldName[] = "doingInitialSync";
+const BSONObj kInitialSyncFlag(BSON(kInitialSyncFlagFieldName << true));
+NamespaceString minValidNss("local.replset.minvalid");
+const char kBeginFieldName[] = "begin";
+const char kOplogDeleteFromPointFieldName[] = "oplogDeleteFromPoint";
-// Writes
-void clearInitialSyncFlag(OperationContext* txn) {
+BSONObj getMinValidDocument(OperationContext* txn) {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction transaction(txn, MODE_IS);
+ Lock::DBLock dblk(txn->lockState(), minValidNss.db(), MODE_IS);
+ Lock::CollectionLock lk(txn->lockState(), minValidNss.ns(), MODE_IS);
+ BSONObj doc;
+ bool found = Helpers::getSingleton(txn, minValidNss.ns().c_str(), doc);
+ invariant(found || doc.isEmpty());
+ return doc;
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "getMinValidDocument", minValidNss.ns());
+
+ MONGO_UNREACHABLE;
+}
+
+void updateMinValidDocument(OperationContext* txn, const BSONObj& updateSpec) {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
ScopedTransaction transaction(txn, MODE_IX);
- // TODO: Investigate correctness of taking MODE_IX for DB/Collection locks
- Lock::DBLock dblk(txn->lockState(), "local", MODE_X);
- Helpers::putSingleton(txn, minvalidNS, BSON("$unset" << initialSyncFlag));
+ // For now this needs to be MODE_X because it sometimes creates the collection.
+ Lock::DBLock dblk(txn->lockState(), minValidNss.db(), MODE_X);
+ Helpers::putSingleton(txn, minValidNss.ns().c_str(), updateSpec);
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "clearInitialSyncFlags", minvalidNS);
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "updateMinValidDocument", minValidNss.ns());
+}
+} // namespace
+// Writes
+void clearInitialSyncFlag(OperationContext* txn) {
auto replCoord = repl::ReplicationCoordinator::get(txn);
OpTime time = replCoord->getMyLastAppliedOpTime();
+ updateMinValidDocument(txn,
+ BSON("$unset"
+ << kInitialSyncFlag << "$set"
+ << BSON("ts" << time.getTimestamp() << "t" << time.getTerm()
+ << kBeginFieldName << time.toBSON())));
txn->recoveryUnit()->waitUntilDurable();
replCoord->setMyLastDurableOpTime(time);
LOG(3) << "clearing initial sync flag";
}
void setInitialSyncFlag(OperationContext* txn) {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dblk(txn->lockState(), "local", MODE_X);
- Helpers::putSingleton(txn, minvalidNS, BSON("$set" << initialSyncFlag));
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "setInitialSyncFlags", minvalidNS);
-
+ updateMinValidDocument(txn, BSON("$set" << kInitialSyncFlag));
txn->recoveryUnit()->waitUntilDurable();
LOG(3) << "setting initial sync flag";
}
-void setMinValid(OperationContext* txn, const OpTime& endOpTime, const DurableRequirement durReq) {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dblk(txn->lockState(), "local", MODE_X);
- Helpers::putSingleton(
- txn,
- minvalidNS,
- BSON("$set" << BSON("ts" << endOpTime.getTimestamp() << "t" << endOpTime.getTerm())
- << "$unset" << BSON(beginFieldName << 1)));
+bool getInitialSyncFlag() {
+ OperationContextImpl txn;
+ return getInitialSyncFlag(&txn);
+}
+bool getInitialSyncFlag(OperationContext* txn) {
+ const BSONObj doc = getMinValidDocument(txn);
+ const auto flag = doc[kInitialSyncFlagFieldName].trueValue();
+ LOG(3) << "returning initial sync flag value of " << flag;
+ return flag;
+}
+
+OpTime getMinValid(OperationContext* txn) {
+ const BSONObj doc = getMinValidDocument(txn);
+ const auto opTimeStatus = OpTime::parseFromOplogEntry(doc);
+ // If any of the keys (fields) are missing from the minvalid document, we return
+ // a null OpTime.
+ if (opTimeStatus == ErrorCodes::NoSuchKey) {
+ return {};
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "setMinValid", minvalidNS);
- if (durReq == DurableRequirement::Strong) {
- txn->recoveryUnit()->waitUntilDurable();
+ if (!opTimeStatus.isOK()) {
+ severe() << "Error parsing minvalid entry: " << doc
+ << ", with status:" << opTimeStatus.getStatus();
+ fassertFailedNoTrace(40052);
}
- LOG(3) << "setting minvalid: " << endOpTime.toString() << "(" << endOpTime.toBSON() << ")";
+
+ OpTime minValid = opTimeStatus.getValue();
+ LOG(3) << "returning minvalid: " << minValid.toString() << "(" << minValid.toBSON() << ")";
+
+ return minValid;
+}
+void setMinValid(OperationContext* txn, const OpTime& minValid) {
+ LOG(3) << "setting minvalid to exactly: " << minValid.toString() << "(" << minValid.toBSON()
+ << ")";
+ updateMinValidDocument(
+ txn, BSON("$set" << BSON("ts" << minValid.getTimestamp() << "t" << minValid.getTerm())));
}
-void setMinValid(OperationContext* txn, const BatchBoundaries& boundaries) {
- const OpTime& start(boundaries.start);
- const OpTime& end(boundaries.end);
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dblk(txn->lockState(), "local", MODE_X);
- Helpers::putSingleton(txn,
- minvalidNS,
- BSON("$set" << BSON("ts" << end.getTimestamp() << "t" << end.getTerm()
- << beginFieldName << start.toBSON())));
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "setMinValid", minvalidNS);
- // NOTE: No need to ensure durability here since starting a batch isn't a problem unless
- // writes happen after, in which case this marker (minvalid) will be written already.
- LOG(3) << "setting minvalid: " << boundaries.start.toString() << "("
- << boundaries.start.toBSON() << ") -> " << boundaries.end.toString() << "("
- << boundaries.end.toBSON() << ")";
+void setMinValidToAtLeast(OperationContext* txn, const OpTime& minValid) {
+ LOG(3) << "setting minvalid to at least: " << minValid.toString() << "(" << minValid.toBSON()
+ << ")";
+ updateMinValidDocument(
+ txn, BSON("$max" << BSON("ts" << minValid.getTimestamp() << "t" << minValid.getTerm())));
}
-// Reads
-bool getInitialSyncFlag() {
- OperationContextImpl txn;
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction transaction(&txn, MODE_IS);
- Lock::DBLock dblk(txn.lockState(), "local", MODE_IS);
- Lock::CollectionLock lk(txn.lockState(), minvalidNS, MODE_IS);
- BSONObj mv;
- bool found = Helpers::getSingleton(&txn, minvalidNS, mv);
-
- if (found) {
- const auto flag = mv[initialSyncFlagString].trueValue();
- LOG(3) << "return initial flag value of " << flag;
- return flag;
- }
- LOG(3) << "return initial flag value of false";
- return false;
+void setOplogDeleteFromPoint(OperationContext* txn, const Timestamp& timestamp) {
+ LOG(3) << "setting oplog delete from point to: " << timestamp.toStringPretty();
+ updateMinValidDocument(txn, BSON("$set" << BSON(kOplogDeleteFromPointFieldName << timestamp)));
+}
+
+Timestamp getOplogDeleteFromPoint(OperationContext* txn) {
+ const BSONObj doc = getMinValidDocument(txn);
+ Timestamp out = {};
+ if (auto field = doc[kOplogDeleteFromPointFieldName]) {
+ out = field.timestamp();
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(&txn, "getInitialSyncFlags", minvalidNS);
- MONGO_UNREACHABLE;
+ LOG(3) << "returning oplog delete from point: " << out;
+ return out;
}
-BatchBoundaries getMinValid(OperationContext* txn) {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction transaction(txn, MODE_IS);
- Lock::DBLock dblk(txn->lockState(), "local", MODE_IS);
- Lock::CollectionLock lk(txn->lockState(), minvalidNS, MODE_IS);
- BSONObj mv;
- bool found = Helpers::getSingleton(txn, minvalidNS, mv);
- if (found) {
- auto status = OpTime::parseFromOplogEntry(mv.getObjectField(beginFieldName));
- OpTime start(status.isOK() ? status.getValue() : OpTime{});
- OpTime end(fassertStatusOK(28771, OpTime::parseFromOplogEntry(mv)));
- LOG(3) << "returning minvalid: " << start.toString() << "(" << start.toBSON() << ") -> "
- << end.toString() << "(" << end.toBSON() << ")";
-
- return BatchBoundaries(start, end);
- }
- LOG(3) << "returning empty minvalid";
- return BatchBoundaries{OpTime{}, OpTime{}};
+void setAppliedThrough(OperationContext* txn, const OpTime& optime) {
+ LOG(3) << "setting appliedThrough to: " << optime.toString() << "(" << optime.toBSON() << ")";
+ if (optime.isNull()) {
+ updateMinValidDocument(txn, BSON("$unset" << BSON(kBeginFieldName << 1)));
+ } else {
+ updateMinValidDocument(txn, BSON("$set" << BSON(kBeginFieldName << optime.toBSON())));
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "getMinValid", minvalidNS);
-}
}
+
+OpTime getAppliedThrough(OperationContext* txn) {
+ const BSONObj doc = getMinValidDocument(txn);
+ const auto opTimeStatus = OpTime::parseFromOplogEntry(doc.getObjectField(kBeginFieldName));
+ if (!opTimeStatus.isOK()) {
+ // Return null OpTime on any parse failure, including if "begin" is missing.
+ return {};
+ }
+
+ OpTime appliedThrough = opTimeStatus.getValue();
+ LOG(3) << "returning appliedThrough: " << appliedThrough.toString() << "("
+ << appliedThrough.toBSON() << ")";
+
+ return appliedThrough;
}
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/minvalid.h b/src/mongo/db/repl/minvalid.h
index d92d5d0c4a6..ba28e0f5927 100644
--- a/src/mongo/db/repl/minvalid.h
+++ b/src/mongo/db/repl/minvalid.h
@@ -36,17 +36,6 @@ class OperationContext;
namespace repl {
-struct BatchBoundaries {
- BatchBoundaries(const OpTime s, const OpTime e) : start(s), end(e) {}
- OpTime start;
- OpTime end;
-};
-
-enum class DurableRequirement {
- None, // Does not require any durability of the write.
- Strong, // Requires journal or checkpoint write.
-};
-
/**
* Helper functions for maintaining a single document in the local.replset.minvalid collection.
*
@@ -79,30 +68,42 @@ void setInitialSyncFlag(OperationContext* txn);
/**
* Returns true if the initial sync flag is set (and presumed active).
*/
+bool getInitialSyncFlag(OperationContext* txn);
bool getInitialSyncFlag();
+/**
+ * The minValid value is the earliest (minimum) Timestamp that must be applied in order to
+ * consider the dataset consistent.
+ */
+void setMinValid(OperationContext* txn, const OpTime& minValid);
+OpTime getMinValid(OperationContext* txn);
/**
- * Returns the bounds of the current apply batch, if active. If start is null/missing, and
- * end is equal to the last oplog entry then we are in a consistent state and ready for reads.
+ * Sets minValid only if it is not already higher than endOpTime.
+ * Warning, this compares the term and timestamp independently. Do not use if the current
+ * minValid could be from the other fork of a rollback.
*/
-BatchBoundaries getMinValid(OperationContext* txn);
+void setMinValidToAtLeast(OperationContext* txn, const OpTime& endOpTime);
/**
- * The minValid value is the earliest (minimum) Timestamp that must be applied in order to
- * consider the dataset consistent.
- *
- * This is called when a batch finishes.
- *
- * Wait for durable writes (which will block on journaling/checkpointing) when specified.
- *
+ * On startup all oplog entries with a value >= the oplog delete from point should be deleted.
+ * If null, no documents should be deleted.
*/
-void setMinValid(OperationContext* ctx, const OpTime& endOpTime, const DurableRequirement durReq);
+void setOplogDeleteFromPoint(OperationContext* txn, const Timestamp& timestamp);
+Timestamp getOplogDeleteFromPoint(OperationContext* txn);
/**
- * The bounds indicate an apply is active and we are not in a consistent state to allow reads
- * or transition from a non-visible state to primary/secondary.
+ * The applied through point is a persistent record of where we've applied through. If null, the
+ * applied through point is the top of the oplog.
+ */
+void setAppliedThrough(OperationContext* txn, const OpTime& optime);
+
+/**
+ * You should probably be calling ReplicationCoordinator::getLastAppliedOpTime() instead.
+ *
+ * This reads the value from storage which isn't always updated when the ReplicationCoordinator
+ * is.
*/
-void setMinValid(OperationContext* ctx, const BatchBoundaries& boundaries);
+OpTime getAppliedThrough(OperationContext* txn);
}
}
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 03c2aa06acf..d98db32fd43 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -304,7 +304,7 @@ unique_ptr<OplogDocWriter> _logOpWriter(OperationContext* txn,
}
} // end anon namespace
-// Truncates the oplog to and including the "truncateTimestamp" entry.
+// Truncates the oplog after and including the "truncateTimestamp" entry.
void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) {
const NamespaceString oplogNss(rsOplogName);
ScopedTransaction transaction(txn, MODE_IX);
@@ -318,36 +318,41 @@ void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) {
}
// Scan through oplog in reverse, from latest entry to first, to find the truncateTimestamp.
- bool foundSomethingToTruncate = false;
- RecordId lastRecordId;
- BSONObj lastOplogEntry;
+ RecordId oldestIDToDelete; // Non-null if there is something to delete.
auto oplogRs = oplogCollection->getRecordStore();
- auto oplogReverseCursor = oplogRs->getCursor(txn, false);
- bool first = true;
+ auto oplogReverseCursor = oplogRs->getCursor(txn, /*forward=*/false);
+ size_t count = 0;
while (auto next = oplogReverseCursor->next()) {
- lastOplogEntry = next->data.releaseToBson();
- lastRecordId = next->id;
+ const BSONObj entry = next->data.releaseToBson();
+ const RecordId id = next->id;
+ count++;
- const auto tsElem = lastOplogEntry["ts"];
-
- if (first) {
+ const auto tsElem = entry["ts"];
+ if (count == 1) {
if (tsElem.eoo())
- LOG(2) << "Oplog tail entry: " << lastOplogEntry;
+ LOG(2) << "Oplog tail entry: " << entry;
else
LOG(2) << "Oplog tail entry ts field: " << tsElem;
- first = false;
}
if (tsElem.timestamp() < truncateTimestamp) {
- break;
+ // If count == 1, that means that we have nothing to delete because everything in the
+ // oplog is < truncateTimestamp.
+ if (count != 1) {
+ invariant(!oldestIDToDelete.isNull());
+ oplogCollection->temp_cappedTruncateAfter(
+ txn, oldestIDToDelete, /*inclusive=*/true);
+ }
+ return;
}
- foundSomethingToTruncate = true;
+ oldestIDToDelete = id;
}
- if (foundSomethingToTruncate) {
- oplogCollection->temp_cappedTruncateAfter(txn, lastRecordId, false);
- }
+ severe() << "Reached end of oplog looking for oplog entry before "
+ << truncateTimestamp.toStringPretty()
+ << " but couldn't find any after looking through " << count << " entries.";
+ fassertFailedNoTrace(40296);
}
/* we write to local.oplog.rs:
diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp
index c1b6e37f82f..f0cb34c7a1e 100644
--- a/src/mongo/db/repl/oplogreader.cpp
+++ b/src/mongo/db/repl/oplogreader.cpp
@@ -165,10 +165,9 @@ void OplogReader::connectToSyncSource(OperationContext* txn,
log() << "our last optime : " << lastOpTimeFetched;
log() << "oldest available is " << oldestOpTimeSeen;
log() << "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember";
- setMinValid(txn, {lastOpTimeFetched, oldestOpTimeSeen});
auto status = replCoord->setMaintenanceMode(true);
if (!status.isOK()) {
- warning() << "Failed to transition into maintenance mode.";
+ warning() << "Failed to transition into maintenance mode: " << status;
}
bool worked = replCoord->setFollowerMode(MemberState::RS_RECOVERING);
if (!worked) {
diff --git a/src/mongo/db/repl/replication_coordinator.h b/src/mongo/db/repl/replication_coordinator.h
index 630b9277b48..dd55fcca799 100644
--- a/src/mongo/db/repl/replication_coordinator.h
+++ b/src/mongo/db/repl/replication_coordinator.h
@@ -133,7 +133,7 @@ public:
* components of the replication system to shut down and stop any threads they are using,
* blocking until all replication-related shutdown tasks are complete.
*/
- virtual void shutdown() = 0;
+ virtual void shutdown(OperationContext* txn) = 0;
/**
* Returns a pointer to the ReplicationExecutor.
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index aba9726fc04..b595f606a61 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -80,7 +80,7 @@ public:
* Performs any necessary external state specific shutdown tasks, such as cleaning up
* the threads it started.
*/
- virtual void shutdown() = 0;
+ virtual void shutdown(OperationContext* txn) = 0;
/**
* Creates the oplog, writes the first entry and stores the replica set config document. Sets
@@ -90,10 +90,20 @@ public:
const BSONObj& config,
bool updateReplOpTime) = 0;
+
/**
- * Writes a message about our transition to primary to the oplog.
+ * Called as part of the process of transitioning to primary and run with the global X lock and
+ * the replication coordinator mutex acquired, so no majoirty writes are allowed while in this
+ * state. See the call site in ReplicationCoordinatorImpl for details about when and how it is
+ * called.
+ *
+ * Among other things, this writes a message about our transition to primary to the oplog if
+ * isV1 and and returns the optime of that message. If !isV1, returns the optime of the last op
+ * in the oplog.
+ *
+ * Throws on errors.
*/
- virtual void logTransitionToPrimaryToOplog(OperationContext* txn) = 0;
+ virtual OpTime onTransitionToPrimary(OperationContext* txn, bool isV1ElectionProtocol) = 0;
/**
* Simple wrapper around SyncSourceFeedback::forwardSlaveProgress. Signals to the
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 75b08a4fd76..67abc29502e 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/client.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/op_observer.h"
@@ -70,6 +71,7 @@
#include "mongo/util/net/hostandport.h"
#include "mongo/util/net/message_port.h"
#include "mongo/util/net/sock.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
namespace repl {
@@ -116,7 +118,7 @@ void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext*
repl::startMasterSlave(txn);
}
-void ReplicationCoordinatorExternalStateImpl::shutdown() {
+void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* txn) {
stdx::lock_guard<stdx::mutex> lk(_threadMutex);
if (_startedThreads) {
log() << "Stopping replication applier threads";
@@ -129,6 +131,13 @@ void ReplicationCoordinatorExternalStateImpl::shutdown() {
if (_snapshotThread)
_snapshotThread->shutdown();
+
+ if (getOplogDeleteFromPoint(txn).isNull() &&
+ loadLastOpTime(txn) == getAppliedThrough(txn)) {
+ // Clear the appliedThrough marker to indicate we are consistent with the top of the
+ // oplog.
+ setAppliedThrough(txn, {});
+ }
}
}
@@ -169,24 +178,45 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati
wuow.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "initiate oplog entry", "local.oplog.rs");
+
+ // This initializes the minvalid document with a null "ts" because older versions (<=3.2)
+ // get angry if the minValid document is present but doesn't have a "ts" field.
+ // Consider removing this once we no longer need to support downgrading to 3.2.
+ setMinValidToAtLeast(txn, {});
} catch (const DBException& ex) {
return ex.toStatus();
}
return Status::OK();
}
-void ReplicationCoordinatorExternalStateImpl::logTransitionToPrimaryToOplog(OperationContext* txn) {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction scopedXact(txn, MODE_X);
+OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationContext* txn,
+ bool isV1ElectionProtocol) {
+ invariant(txn->lockState()->isW());
+
+ // Clear the appliedThrough marker so on startup we'll use the top of the oplog. This must be
+ // done before we add anything to our oplog.
+ invariant(getOplogDeleteFromPoint(txn).isNull());
+ setAppliedThrough(txn, {});
+
+ if (isV1ElectionProtocol) {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction scopedXact(txn, MODE_X);
- WriteUnitOfWork wuow(txn);
- txn->getClient()->getServiceContext()->getOpObserver()->onOpMessage(txn,
- BSON("msg"
- << "new primary"));
- wuow.commit();
+ WriteUnitOfWork wuow(txn);
+ txn->getClient()->getServiceContext()->getOpObserver()->onOpMessage(
+ txn,
+ BSON("msg"
+ << "new primary"));
+ wuow.commit();
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
+ txn, "logging transition to primary to oplog", "local.oplog.rs");
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- txn, "logging transition to primary to oplog", "local.oplog.rs");
+ const auto opTimeToReturn = fassertStatusOK(28665, loadLastOpTime(txn));
+
+ dropAllTempCollections(txn);
+
+ return opTimeToReturn;
}
void ReplicationCoordinatorExternalStateImpl::forwardSlaveProgress() {
@@ -301,18 +331,94 @@ void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(const Timestamp
}
void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationContext* txn) {
- auto mv = getMinValid(txn);
+ if (getInitialSyncFlag(txn)) {
+ return; // Initial Sync will take over so no cleanup is needed.
+ }
- if (!mv.start.isNull()) {
- // If we are in the middle of a batch, and recoveringm then we need to truncate the oplog.
- LOG(2) << "Recovering from a failed apply batch, start:" << mv.start.toBSON();
- truncateOplogTo(txn, mv.start.getTimestamp());
+ const auto deleteFromPoint = getOplogDeleteFromPoint(txn);
+ const auto appliedThrough = getAppliedThrough(txn);
+
+ const bool needToDeleteEndOfOplog = !deleteFromPoint.isNull() &&
+ // This version should never have a non-null deleteFromPoint with a null appliedThrough.
+ // This scenario means that we downgraded after unclean shutdown, then the downgraded node
+ // deleted the ragged end of our oplog, then did a clean shutdown.
+ !appliedThrough.isNull() &&
+ // Similarly we should never have an appliedThrough higher than the deleteFromPoint. This
+ // means that the downgraded node deleted our ragged end then applied ahead of our
+ // deleteFromPoint and then had an unclean shutdown before upgrading. We are ok with
+ // applying these ops because older versions wrote to the oplog from a single thread so we
+ // know they are in order.
+ !(appliedThrough.getTimestamp() >= deleteFromPoint);
+ if (needToDeleteEndOfOplog) {
+ log() << "Removing unapplied entries starting at: " << deleteFromPoint;
+ truncateOplogTo(txn, deleteFromPoint);
+ }
+ setOplogDeleteFromPoint(txn, {}); // clear the deleteFromPoint
+
+ if (appliedThrough.isNull()) {
+ // No follow-up work to do.
+ return;
+ }
+
+ // Check if we have any unapplied ops in our oplog. It is important that this is done after
+ // deleting the ragged end of the oplog.
+ const auto topOfOplog = fassertStatusOK(40290, loadLastOpTime(txn));
+ if (appliedThrough == topOfOplog) {
+ return; // We've applied all the valid oplog we have.
+ } else if (appliedThrough > topOfOplog) {
+ severe() << "Applied op " << appliedThrough << " not found. Top of oplog is " << topOfOplog
+ << '.';
+ fassertFailedNoTrace(40313);
+ }
+
+ log() << "Replaying stored operations from " << appliedThrough << " (exclusive) to "
+ << topOfOplog << " (inclusive).";
+
+ DBDirectClient db(txn);
+ auto cursor = db.query(rsOplogName,
+ QUERY("ts" << BSON("$gte" << appliedThrough.getTimestamp())),
+ /*batchSize*/ 0,
+ /*skip*/ 0,
+ /*projection*/ nullptr,
+ QueryOption_OplogReplay);
+
+ // Check that the first document matches our appliedThrough point then skip it since it's
+ // already been applied.
+ if (!cursor->more()) {
+ // This should really be impossible because we check above that the top of the oplog is
+ // strictly > appliedThrough. If this fails it represents a serious bug in either the
+ // storage engine or query's implementation of OplogReplay.
+ severe() << "Couldn't find any entries in the oplog >= " << appliedThrough
+ << " which should be impossible.";
+ fassertFailedNoTrace(40293);
+ }
+ auto firstOpTimeFound = fassertStatusOK(40291, OpTime::parseFromOplogEntry(cursor->nextSafe()));
+ if (firstOpTimeFound != appliedThrough) {
+ severe() << "Oplog entry at " << appliedThrough << " is missing; actual entry found is "
+ << firstOpTimeFound;
+ fassertFailedNoTrace(40292);
+ }
+
+ // Apply remaining ops one at at time, but don't log them because they are already logged.
+ const bool wereWritesReplicated = txn->writesAreReplicated();
+ ON_BLOCK_EXIT([&] { txn->setReplicatedWrites(wereWritesReplicated); });
+ txn->setReplicatedWrites(false);
+
+ while (cursor->more()) {
+ auto entry = cursor->nextSafe();
+ fassertStatusOK(40294, SyncTail::syncApply(txn, entry, true));
+ setAppliedThrough(txn, fassertStatusOK(40295, OpTime::parseFromOplogEntry(entry)));
}
}
StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(OperationContext* txn) {
// TODO: handle WriteConflictExceptions below
try {
+ // If we are doing an initial sync do not read from the oplog.
+ if (getInitialSyncFlag(txn)) {
+ return {ErrorCodes::InitialSyncFailure, "In the middle of an initial sync."};
+ }
+
BSONObj oplogEntry;
if (!Helpers::getLast(txn, rsOplogName.c_str(), oplogEntry)) {
return StatusWith<OpTime>(ErrorCodes::NoMatchingDocument,
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 4ddc917d129..26667fbba96 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -53,11 +53,11 @@ public:
virtual ~ReplicationCoordinatorExternalStateImpl();
virtual void startThreads(const ReplSettings& settings) override;
virtual void startMasterSlave(OperationContext* txn);
- virtual void shutdown();
+ virtual void shutdown(OperationContext* txn);
virtual Status initializeReplSetStorage(OperationContext* txn,
const BSONObj& config,
bool updateReplOpTime);
- virtual void logTransitionToPrimaryToOplog(OperationContext* txn);
+ OpTime onTransitionToPrimary(OperationContext* txn, bool isV1ElectionProtocol) override;
virtual void forwardSlaveProgress();
virtual OID ensureMe(OperationContext* txn);
virtual bool isSelf(const HostAndPort& host);
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index d1f6f567e72..d174145751c 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -69,7 +69,7 @@ Status ReplicationCoordinatorExternalStateMock::initializeReplSetStorage(Operati
return storeLocalConfigDocument(txn, config);
}
-void ReplicationCoordinatorExternalStateMock::shutdown() {}
+void ReplicationCoordinatorExternalStateMock::shutdown(OperationContext*) {}
void ReplicationCoordinatorExternalStateMock::forwardSlaveProgress() {}
OID ReplicationCoordinatorExternalStateMock::ensureMe(OperationContext*) {
@@ -233,8 +233,12 @@ bool ReplicationCoordinatorExternalStateMock::isReadCommittedSupportedByStorageE
return true;
}
-void ReplicationCoordinatorExternalStateMock::logTransitionToPrimaryToOplog(OperationContext* txn) {
- _lastOpTime = OpTime(Timestamp(1, 0), 1);
+OpTime ReplicationCoordinatorExternalStateMock::onTransitionToPrimary(OperationContext* txn,
+ bool isV1ElectionProtocol) {
+ if (isV1ElectionProtocol) {
+ _lastOpTime = OpTime(Timestamp(1, 0), 1);
+ }
+ return fassertStatusOK(40297, _lastOpTime);
}
} // namespace repl
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index c16b0760c0d..4aa05861787 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -54,11 +54,11 @@ public:
virtual ~ReplicationCoordinatorExternalStateMock();
virtual void startThreads(const ReplSettings& settings) override;
virtual void startMasterSlave(OperationContext*);
- virtual void shutdown();
+ virtual void shutdown(OperationContext*);
virtual Status initializeReplSetStorage(OperationContext* txn,
const BSONObj& config,
bool updateReplOpTime);
- virtual void logTransitionToPrimaryToOplog(OperationContext* txn);
+ OpTime onTransitionToPrimary(OperationContext* txn, bool isV1ElectionProtocol) override;
virtual void forwardSlaveProgress();
virtual OID ensureMe(OperationContext*);
virtual bool isSelf(const HostAndPort& host);
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index 45c3c0644d4..8772ee39829 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -351,7 +351,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* txn) {
fassertFailedNoTrace(28545);
}
- // Returns the last optime from the oplog, possibly truncating first if we need to recover.
+ // Read the last op from the oplog after cleaning up any partially applied batches.
_externalState->cleanUpLastApplyBatch(txn);
auto lastOpTimeStatus = _externalState->loadLastOpTime(txn);
@@ -492,7 +492,7 @@ void ReplicationCoordinatorImpl::startReplication(OperationContext* txn) {
}
}
-void ReplicationCoordinatorImpl::shutdown() {
+void ReplicationCoordinatorImpl::shutdown(OperationContext* txn) {
// Shutdown must:
// * prevent new threads from blocking in awaitReplication
// * wake up all existing threads blocking in awaitReplication
@@ -525,7 +525,7 @@ void ReplicationCoordinatorImpl::shutdown() {
// joining the replication executor is blocking so it must be run outside of the mutex
_replExecutor.shutdown();
_replExecutor.join();
- _externalState->shutdown();
+ _externalState->shutdown(txn);
}
const ReplSettings& ReplicationCoordinatorImpl::getSettings() const {
@@ -675,7 +675,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
// _isWaitingForDrainToComplete, set the flag allowing non-local database writes and
// drop the mutex. At this point, no writes can occur from other threads, due to the
// global exclusive lock.
- // 4.) Drop all temp collections.
+ // 4.) Drop all temp collections, and log the drops to the oplog.
// 5.) Log transition to primary in the oplog and set that OpTime as the floor for what we will
// consider to be committed.
// 6.) Drop the global exclusive lock.
@@ -687,6 +687,9 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
// external writes will be processed. This is important so that a new temp collection isn't
// introduced on the new primary before we drop all the temp collections.
+ // When we go to drop all temp collections, we must replicate the drops.
+ invariant(txn->writesAreReplicated());
+
stdx::unique_lock<stdx::mutex> lk(_mutex);
if (!_isWaitingForDrainToComplete) {
return;
@@ -697,32 +700,30 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
ScopedTransaction transaction(txn, MODE_X);
Lock::GlobalWrite globalWriteLock(txn->lockState());
-
lk.lock();
+
if (!_isWaitingForDrainToComplete) {
return;
}
_isWaitingForDrainToComplete = false;
- _canAcceptNonLocalWrites = true;
_drainFinishedCond.notify_all();
- lk.unlock();
-
- _externalState->dropAllTempCollections(txn);
- // This is done for compatibility with PV0 replicas wrt how "n" ops are processed.
- if (isV1ElectionProtocol()) {
- _externalState->logTransitionToPrimaryToOplog(txn);
+ if (!_getMemberState_inlock().primary()) {
+ // We must have decided not to transition to primary while waiting for the applier to drain.
+ // Skip the rest of this function since it should only be done when really transitioning.
+ return;
}
- StatusWith<OpTime> lastOpTime = _externalState->loadLastOpTime(txn);
- fassertStatusOK(28665, lastOpTime.getStatus());
- _setFirstOpTimeOfMyTerm(lastOpTime.getValue());
+ invariant(!_canAcceptNonLocalWrites);
+ _canAcceptNonLocalWrites = true;
+ lk.unlock();
+ _setFirstOpTimeOfMyTerm(_externalState->onTransitionToPrimary(txn, isV1ElectionProtocol()));
lk.lock();
+
// Must calculate the commit level again because firstOpTimeOfMyTerm wasn't set when we logged
- // our election in logTransitionToPrimaryToOplog(), above.
+ // our election in onTransitionToPrimary(), above.
_updateLastCommittedOpTime_inlock();
- lk.unlock();
log() << "transition to primary complete; database writes are now permitted" << rsLog;
}
diff --git a/src/mongo/db/repl/replication_coordinator_impl.h b/src/mongo/db/repl/replication_coordinator_impl.h
index 3cc0d6e4995..0e1a72bab9a 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_impl.h
@@ -113,7 +113,7 @@ public:
virtual void startReplication(OperationContext* txn) override;
- virtual void shutdown() override;
+ virtual void shutdown(OperationContext* txn) override;
virtual ReplicationExecutor* getExecutor() override {
return &_replExecutor;
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index deb917413a8..04de8fef35e 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -2423,9 +2423,10 @@ TEST_F(ReplCoordTest, IsMaster) {
}
TEST_F(ReplCoordTest, LogAMessageWhenShutDownBeforeReplicationStartUpFinished) {
+ OperationContextNoop txn;
init();
startCapturingLogMessages();
- getReplCoord()->shutdown();
+ getReplCoord()->shutdown(&txn);
stopCapturingLogMessages();
ASSERT_EQUALS(1,
countLogLinesContaining("shutdown() called before startReplication() finished"));
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 3ea4d8eef42..7d3f987878f 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -52,7 +52,7 @@ void ReplicationCoordinatorMock::startReplication(OperationContext* txn) {
// TODO
}
-void ReplicationCoordinatorMock::shutdown() {
+void ReplicationCoordinatorMock::shutdown(OperationContext* txn) {
// TODO
}
diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h
index 4a21d9ad705..15315cd5a8f 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_mock.h
@@ -54,7 +54,7 @@ public:
virtual void startReplication(OperationContext* txn);
- virtual void shutdown();
+ virtual void shutdown(OperationContext* txn);
virtual ReplicationExecutor* getExecutor() override {
return nullptr;
diff --git a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
index 412d1165a24..9106d4058b3 100644
--- a/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
+++ b/src/mongo/db/repl/replication_coordinator_test_fixture.cpp
@@ -368,9 +368,10 @@ void ReplCoordTest::simulateSuccessfulElection() {
}
void ReplCoordTest::shutdown() {
+ OperationContextReplMock txn;
invariant(_callShutdown);
_net->exitNetwork();
- _repl->shutdown();
+ _repl->shutdown(&txn);
_callShutdown = false;
}
diff --git a/src/mongo/db/repl/roll_back_local_operations.cpp b/src/mongo/db/repl/roll_back_local_operations.cpp
index 2117458e9f4..92312d0d21c 100644
--- a/src/mongo/db/repl/roll_back_local_operations.cpp
+++ b/src/mongo/db/repl/roll_back_local_operations.cpp
@@ -41,6 +41,10 @@ namespace repl {
namespace {
+OpTime getOpTime(const OplogInterface::Iterator::Value& oplogValue) {
+ return fassertStatusOK(40298, OpTime::parseFromOplogEntry(oplogValue.first));
+}
+
Timestamp getTimestamp(const BSONObj& operation) {
return operation["ts"].timestamp();
}
@@ -116,7 +120,7 @@ StatusWith<RollBackLocalOperations::RollbackCommonPoint> RollBackLocalOperations
_scanned++;
if (getHash(_localOplogValue) == getHash(operation)) {
return StatusWith<RollbackCommonPoint>(
- std::make_pair(getTimestamp(_localOplogValue), _localOplogValue.second));
+ std::make_pair(getOpTime(_localOplogValue), _localOplogValue.second));
}
auto status = _rollbackOperation(_localOplogValue.first);
if (!status.isOK()) {
@@ -139,14 +143,11 @@ StatusWith<RollBackLocalOperations::RollbackCommonPoint> RollBackLocalOperations
"Need to process additional remote operations.");
}
- if (getTimestamp(_localOplogValue) < getTimestamp(operation)) {
- _scanned++;
- return StatusWith<RollbackCommonPoint>(ErrorCodes::NoSuchKey,
- "Unable to determine common point. "
- "Need to process additional remote operations.");
- }
-
- return RollbackCommonPoint(Timestamp(Seconds(1), 0), RecordId());
+ invariant(getTimestamp(_localOplogValue) < getTimestamp(operation));
+ _scanned++;
+ return StatusWith<RollbackCommonPoint>(ErrorCodes::NoSuchKey,
+ "Unable to determine common point. "
+ "Need to process additional remote operations.");
}
StatusWith<RollBackLocalOperations::RollbackCommonPoint> syncRollBackLocalOperations(
diff --git a/src/mongo/db/repl/roll_back_local_operations.h b/src/mongo/db/repl/roll_back_local_operations.h
index 20eb923083d..87a940ce57b 100644
--- a/src/mongo/db/repl/roll_back_local_operations.h
+++ b/src/mongo/db/repl/roll_back_local_operations.h
@@ -34,6 +34,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/record_id.h"
#include "mongo/db/repl/oplog_interface.h"
+#include "mongo/db/repl/optime.h"
#include "mongo/stdx/functional.h"
namespace mongo {
@@ -49,7 +50,7 @@ public:
*/
using RollbackOperationFn = stdx::function<Status(const BSONObj&)>;
- using RollbackCommonPoint = std::pair<Timestamp, RecordId>;
+ using RollbackCommonPoint = std::pair<OpTime, RecordId>;
/**
* Initializes rollback processor with a valid local oplog.
diff --git a/src/mongo/db/repl/roll_back_local_operations_test.cpp b/src/mongo/db/repl/roll_back_local_operations_test.cpp
index 06af9890571..80334710d57 100644
--- a/src/mongo/db/repl/roll_back_local_operations_test.cpp
+++ b/src/mongo/db/repl/roll_back_local_operations_test.cpp
@@ -107,7 +107,7 @@ TEST(RollBackLocalOperationsTest, RollbackMultipleLocalOperations) {
RollBackLocalOperations finder(localOplog, rollbackOperation);
auto result = finder.onRemoteOperation(commonOperation.first);
ASSERT_OK(result.getStatus());
- ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first);
+ ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first);
ASSERT_EQUALS(commonOperation.second, result.getValue().second);
ASSERT_FALSE(i == localOperations.cend());
ASSERT_EQUALS(commonOperation.first, i->first);
@@ -164,7 +164,7 @@ TEST(RollBackLocalOperationsTest, SkipRemoteOperations) {
}
auto result = finder.onRemoteOperation(commonOperation.first);
ASSERT_OK(result.getStatus());
- ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first);
+ ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first);
ASSERT_EQUALS(commonOperation.second, result.getValue().second);
ASSERT_FALSE(i == localOperations.cend());
ASSERT_EQUALS(commonOperation.first, i->first);
@@ -197,7 +197,7 @@ TEST(RollBackLocalOperationsTest, SameTimestampDifferentHashess) {
}
auto result = finder.onRemoteOperation(commonOperation.first);
ASSERT_OK(result.getStatus());
- ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first);
+ ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first);
ASSERT_EQUALS(commonOperation.second, result.getValue().second);
ASSERT_FALSE(i == localOperations.cend());
ASSERT_EQUALS(commonOperation.first, i->first);
@@ -269,7 +269,7 @@ TEST(SyncRollBackLocalOperationsTest, RollbackTwoOperations) {
return Status::OK();
});
ASSERT_OK(result.getStatus());
- ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first);
+ ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first);
ASSERT_EQUALS(commonOperation.second, result.getValue().second);
ASSERT_FALSE(i == localOperations.cend());
ASSERT_EQUALS(commonOperation.first, i->first);
@@ -288,7 +288,7 @@ TEST(SyncRollBackLocalOperationsTest, SkipOneRemoteOperation) {
return Status::OK();
});
ASSERT_OK(result.getStatus());
- ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first);
+ ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first);
ASSERT_EQUALS(commonOperation.second, result.getValue().second);
}
@@ -306,7 +306,7 @@ TEST(SyncRollBackLocalOperationsTest, SameTimestampDifferentHashes) {
return Status::OK();
});
ASSERT_OK(result.getStatus());
- ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first);
+ ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first);
ASSERT_EQUALS(commonOperation.second, result.getValue().second);
ASSERT_TRUE(called);
}
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index 23877df274a..a2b4bc1ccf5 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -79,8 +79,8 @@ MONGO_FP_DECLARE(failInitSyncWithBufferedEntriesLeft);
void truncateAndResetOplog(OperationContext* txn,
ReplicationCoordinator* replCoord,
BackgroundSync* bgsync) {
- // Clear minvalid
- setMinValid(txn, OpTime(), DurableRequirement::None);
+ // Add field to minvalid document to tell us to restart initial sync if we crash
+ setInitialSyncFlag(txn);
AutoGetDb autoDb(txn, "local", MODE_X);
massert(28585, "no local database found", autoDb.getDb());
@@ -341,9 +341,6 @@ Status _initialSync() {
return Status(ErrorCodes::InitialSyncFailure, msg);
}
- // Add field to minvalid document to tell us to restart initial sync if we crash
- setInitialSyncFlag(&txn);
-
log() << "initial sync drop all databases";
dropAllDatabasesExceptLocal(&txn);
@@ -457,19 +454,10 @@ Status _initialSync() {
log() << "initial sync finishing up";
- {
- ScopedTransaction scopedXact(&txn, MODE_IX);
- AutoGetDb autodb(&txn, "local", MODE_X);
- OpTime lastOpTimeWritten(getGlobalReplicationCoordinator()->getMyLastAppliedOpTime());
- log() << "set minValid=" << lastOpTimeWritten;
-
- // Initial sync is now complete. Flag this by setting minValid to the last thing we synced.
- setMinValid(&txn, lastOpTimeWritten, DurableRequirement::None);
- BackgroundSync::get()->setInitialSyncRequestedFlag(false);
- }
-
+ // Initial sync is now complete.
// Clear the initial sync flag -- cannot be done under a db lock, or recursive.
clearInitialSyncFlag(&txn);
+ BackgroundSync::get()->setInitialSyncRequestedFlag(false);
// Clear maint. mode.
while (replCoord->getMaintenanceMode()) {
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index d52032cb6a4..7b40e510127 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -159,7 +159,7 @@ struct FixUpInfo {
set<string> collectionsToResyncData;
set<string> collectionsToResyncMetadata;
- Timestamp commonPoint;
+ OpTime commonPoint;
RecordId commonPointOurDiskloc;
int rbid; // remote server's current rollback sequence #
@@ -391,9 +391,12 @@ void syncFixUp(OperationContext* txn,
// we have items we are writing that aren't from a point-in-time. thus best not to come
// online until we get to that point in freshness.
+ // TODO this is still wrong because we don't record that we are in rollback, and we can't really
+ // recover.
OpTime minValid = fassertStatusOK(28774, OpTime::parseFromOplogEntry(newMinValid));
log() << "minvalid=" << minValid;
- setMinValid(txn, {OpTime{}, minValid});
+ setAppliedThrough(txn, {}); // Use top of oplog.
+ setMinValid(txn, minValid);
// any full collection resyncs required?
if (!fixUpInfo.collectionsToResyncData.empty() ||
@@ -497,8 +500,8 @@ void syncFixUp(OperationContext* txn,
} else {
OpTime minValid = fassertStatusOK(28775, OpTime::parseFromOplogEntry(newMinValid));
log() << "minvalid=" << minValid;
- const OpTime start{fixUpInfo.commonPoint, OpTime::kUninitializedTerm};
- setMinValid(txn, {start, minValid});
+ setMinValid(txn, minValid);
+ setAppliedThrough(txn, fixUpInfo.commonPoint);
}
} catch (const DBException& e) {
err = "can't get/set minvalid: ";
@@ -768,7 +771,7 @@ void syncFixUp(OperationContext* txn,
log() << "rollback 6";
// clean up oplog
- LOG(2) << "rollback truncate oplog after " << fixUpInfo.commonPoint.toStringPretty();
+ LOG(2) << "rollback truncate oplog after " << fixUpInfo.commonPoint;
{
const NamespaceString oplogNss(rsOplogName);
ScopedTransaction transaction(txn, MODE_IX);
diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp
index 835389c3494..3f768d79f4b 100644
--- a/src/mongo/db/repl/rs_rollback_test.cpp
+++ b/src/mongo/db/repl/rs_rollback_test.cpp
@@ -159,7 +159,8 @@ void RSRollbackTest::setUp() {
setGlobalReplicationCoordinator(_coordinator);
setOplogCollectionName();
- repl::setMinValid(_txn.get(), {OpTime{}, OpTime{}});
+ repl::setAppliedThrough(_txn.get(), OpTime{});
+ repl::setMinValid(_txn.get(), OpTime{});
}
void RSRollbackTest::tearDown() {
@@ -175,8 +176,8 @@ void RSRollbackTest::tearDown() {
void noSleep(Seconds seconds) {}
TEST_F(RSRollbackTest, InconsistentMinValid) {
- repl::setMinValid(_txn.get(),
- {OpTime(Timestamp(Seconds(0), 0), 0), OpTime(Timestamp(Seconds(1), 0), 0)});
+ repl::setAppliedThrough(_txn.get(), OpTime(Timestamp(Seconds(0), 0), 0));
+ repl::setMinValid(_txn.get(), OpTime(Timestamp(Seconds(1), 0), 0));
auto status = syncRollback(_txn.get(),
OplogInterfaceMock(kEmptyMockOperations),
RollbackSourceMock(std::unique_ptr<OplogInterface>(
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 7c929b6eae1..a336c8de68d 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -108,6 +108,11 @@ static Counter64 opsAppliedStats;
// The oplog entries applied
static ServerStatusMetricField<Counter64> displayOpsApplied("repl.apply.ops", &opsAppliedStats);
+// Number of times we tried to go live as a secondary.
+static Counter64 attemptsToBecomeSecondary;
+static ServerStatusMetricField<Counter64> displayAttemptsToBecomeSecondary(
+ "repl.apply.attemptsToBecomeSecondary", &attemptsToBecomeSecondary);
+
MONGO_FP_DECLARE(rsSyncApplyStop);
// Number and time of each ApplyOps worker pool round
@@ -495,9 +500,7 @@ void fillWriterVectors(OperationContext* txn,
// Applies a batch of oplog entries, by using a set of threads to apply the operations and then
// writes the oplog entries to the local oplog.
-OpTime SyncTail::multiApply(OperationContext* txn,
- const OpQueue& ops,
- boost::optional<BatchBoundaries> boundaries) {
+OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) {
invariant(_applyFunc);
if (getGlobalServiceContext()->getGlobalStorageEngine()->isMmapV1()) {
@@ -528,12 +531,8 @@ OpTime SyncTail::multiApply(OperationContext* txn,
fassertFailed(28527);
}
- if (boundaries) {
- setMinValid(txn, *boundaries); // Mark us as in the middle of a batch.
- }
-
- applyOps(writerVectors, &_writerPool, _applyFunc, this);
-
+ // Since we write the oplog from a single thread in-order, we don't need to use the
+ // oplogDeleteFromPoint.
OpTime lastOpTime;
{
ON_BLOCK_EXIT([&] { _writerPool.join(); });
@@ -545,25 +544,28 @@ OpTime SyncTail::multiApply(OperationContext* txn,
lastOpTime = writeOpsToOplog(txn, raws);
}
+ setMinValidToAtLeast(txn, lastOpTime); // Mark us as in the middle of a batch.
+
+ applyOps(writerVectors, &_writerPool, _applyFunc, this);
+ _writerPool.join();
+
// Due to SERVER-24933 we can't enter inShutdown while holding the PBWM lock.
invariant(!inShutdownStrict());
- if (boundaries) {
- setMinValid(txn, boundaries->end, DurableRequirement::None); // Mark batch as complete.
- }
+ setAppliedThrough(txn, lastOpTime); // Mark batch as complete.
return lastOpTime;
}
namespace {
-void tryToGoLiveAsASecondary(OperationContext* txn,
- ReplicationCoordinator* replCoord,
- const BatchBoundaries& minValidBoundaries,
- const OpTime& lastWriteOpTime) {
+void tryToGoLiveAsASecondary(OperationContext* txn, ReplicationCoordinator* replCoord) {
if (replCoord->isInPrimaryOrSecondaryState()) {
return;
}
+ // This needs to happen after the attempt so readers can be sure we've already tried.
+ ON_BLOCK_EXIT([] { attemptsToBecomeSecondary.increment(); });
+
ScopedTransaction transaction(txn, MODE_S);
Lock::GlobalRead readLock(txn->lockState());
@@ -578,15 +580,8 @@ void tryToGoLiveAsASecondary(OperationContext* txn,
return;
}
- // If an apply batch is active then we cannot transition.
- if (!minValidBoundaries.start.isNull()) {
- return;
- }
-
- // Must have applied/written to minvalid, so return if not.
- // -- If 'lastWriteOpTime' is null/uninitialized then we can't transition.
- // -- If 'lastWriteOpTime' is less than the end of the last batch then we can't transition.
- if (lastWriteOpTime.isNull() || minValidBoundaries.end > lastWriteOpTime) {
+ // We can't go to SECONDARY until we reach minvalid.
+ if (replCoord->getMyLastAppliedOpTime() < getMinValid(txn)) {
return;
}
@@ -693,9 +688,6 @@ void SyncTail::oplogApplication(StorageInterface* storageInterface) {
? new ApplyBatchFinalizerForJournal(replCoord)
: new ApplyBatchFinalizer(replCoord)};
- auto minValidBoundaries = getMinValid(&txn);
- OpTime originalEndOpTime(minValidBoundaries.end);
- OpTime lastWriteOpTime{replCoord->getMyLastAppliedOpTime()};
while (!inShutdown()) {
OpQueue ops;
@@ -705,7 +697,7 @@ void SyncTail::oplogApplication(StorageInterface* storageInterface) {
return;
}
- tryToGoLiveAsASecondary(&txn, replCoord, minValidBoundaries, lastWriteOpTime);
+ tryToGoLiveAsASecondary(&txn, replCoord);
// Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become
// ready in time, we'll loop again so we can do the above checks periodically.
@@ -717,9 +709,7 @@ void SyncTail::oplogApplication(StorageInterface* storageInterface) {
invariant(!ops.empty());
- const BSONObj lastOp = ops.back().raw;
-
- if (lastOp.isEmpty()) {
+ if (ops.front().raw.isEmpty()) {
// This means that the network thread has coalesced and we have processed all of its
// data.
invariant(ops.getDeque().size() == 1);
@@ -727,63 +717,40 @@ void SyncTail::oplogApplication(StorageInterface* storageInterface) {
replCoord->signalDrainComplete(&txn);
}
- // Reset some values when triggered in case it was from a rollback.
- minValidBoundaries = getMinValid(&txn);
- lastWriteOpTime = replCoord->getMyLastAppliedOpTime();
- originalEndOpTime = minValidBoundaries.end;
-
continue; // This wasn't a real op. Don't try to apply it.
}
- const auto lastOpTime = fassertStatusOK(28773, OpTime::parseFromOplogEntry(lastOp));
- // TODO: Make ">=" once SERVER-21988 is fixed.
- if (lastWriteOpTime > lastOpTime) {
- // Error for the oplog to go back in time.
+ // Extract some info from ops that we'll need after releasing the batch below.
+ const auto firstOpTimeInBatch =
+ fassertStatusOK(40299, OpTime::parseFromOplogEntry(ops.front().raw));
+ const auto lastOpTimeInBatch =
+ fassertStatusOK(28773, OpTime::parseFromOplogEntry(ops.back().raw));
+
+ // Make sure the oplog doesn't go back in time or repeat an entry.
+ if (firstOpTimeInBatch <= replCoord->getMyLastAppliedOpTime()) {
fassert(34361,
Status(ErrorCodes::OplogOutOfOrder,
- str::stream() << "Attempted to apply an earlier oplog entry (ts: "
- << lastOpTime.getTimestamp().toStringPretty()
- << ") when our lastWrittenOptime was "
- << lastWriteOpTime.toString()));
+ str::stream() << "Attempted to apply an oplog entry ("
+ << firstOpTimeInBatch.toString()
+ << ") which is not greater than our last applied OpTime ("
+ << replCoord->getMyLastAppliedOpTime().toString()
+ << ")."));
}
- // Set minValid to the last OpTime that needs to be applied, in this batch or from the
- // (last) failed batch, whichever is larger.
- // This will cause this node to go into RECOVERING state
- // if we should crash and restart before updating finishing.
- minValidBoundaries.start = OpTime(getLastSetTimestamp(), OpTime::kUninitializedTerm);
-
-
- // Take the max of the first endOptime (if we recovered) and the end of our batch.
-
- // Setting end to the max of originalEndOpTime and lastOpTime (the end of the batch)
- // ensures that we keep pushing out the point where we can become consistent
- // and allow reads. If we recover and end up doing smaller batches we must pass the
- // originalEndOpTime before we are good.
- //
- // For example:
- // batch apply, 20-40, end = 40
- // batch failure,
- // restart
- // batch apply, 20-25, end = max(25, 40) = 40
- // batch apply, 25-45, end = 45
- minValidBoundaries.end = std::max(originalEndOpTime, lastOpTime);
-
-
- lastWriteOpTime = multiApply(&txn, ops, minValidBoundaries);
- if (lastWriteOpTime.isNull()) {
+ const bool fail = multiApply(&txn, ops).isNull();
+ if (fail) {
// fassert if oplog application failed for any reasons other than shutdown.
error() << "Failed to apply " << ops.getDeque().size()
- << " operations - batch start:" << minValidBoundaries.start
- << " end:" << minValidBoundaries.end;
+ << " operations - batch start:" << firstOpTimeInBatch
+ << " end:" << lastOpTimeInBatch;
fassert(34360, inShutdownStrict());
// Return without setting minvalid in the case of shutdown.
return;
}
- setNewTimestamp(lastWriteOpTime.getTimestamp());
- minValidBoundaries.start = {};
- finalizer->record(lastWriteOpTime);
+ // Update various things that care about our last applied optime.
+ setNewTimestamp(lastOpTimeInBatch.getTimestamp());
+ finalizer->record(lastOpTimeInBatch);
}
}
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 539a00d933d..313a16bf48b 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -143,6 +143,11 @@ public:
return _deque.back();
}
+ const OplogEntry& front() const {
+ invariant(!_deque.empty());
+ return _deque.front();
+ }
+
private:
std::deque<OplogEntry> _deque;
size_t _size;
@@ -194,9 +199,7 @@ protected:
// Apply a batch of operations, using multiple threads.
// If boundries is supplied, will update minValid document at begin and end of batch.
// Returns the last OpTime applied during the apply batch, ops.end["ts"] basically.
- OpTime multiApply(OperationContext* txn,
- const OpQueue& ops,
- boost::optional<BatchBoundaries> boundaries = {});
+ OpTime multiApply(OperationContext* txn, const OpQueue& ops);
private:
class OpQueueBatcher;
diff --git a/src/mongo/shell/assert.js b/src/mongo/shell/assert.js
index b1f2a94a735..0d1e225a990 100644
--- a/src/mongo/shell/assert.js
+++ b/src/mongo/shell/assert.js
@@ -3,6 +3,9 @@ doassert = function(msg, obj) {
if (typeof(msg) == "function")
msg = msg();
+ if (typeof(msg) == "object")
+ msg = tojson(msg);
+
if (typeof(msg) == "string" && msg.indexOf("assert") == 0)
print(msg);
else
diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js
index 8e6d5a05525..b9776fd0de1 100644
--- a/src/mongo/shell/replsettest.js
+++ b/src/mongo/shell/replsettest.js
@@ -917,6 +917,12 @@ var ReplSetTest = function(opts) {
printjson(this.nodes);
+ // Clean up after noReplSet to ensure it doesn't effect future restarts.
+ if (options.noReplSet) {
+ this.nodes[n].fullOptions.replSet = defaults.replSet;
+ delete this.nodes[n].fullOptions.noReplSet;
+ }
+
wait = wait || false;
if (!wait.toFixed) {
if (wait)