summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavi Vetriselvan <pvselvan@umich.edu>2019-06-19 14:00:25 -0400
committerPavi Vetriselvan <pvselvan@umich.edu>2019-06-19 14:03:34 -0400
commitd3c0e4ad46fcba5aac61ecec1409e9df6e11f66e (patch)
tree21232be9fb836878507588e11a435d1f9193af7e
parent5f21969e6ca6c7805df0165ff81677aed1199958 (diff)
downloadmongo-d3c0e4ad46fcba5aac61ecec1409e9df6e11f66e.tar.gz
SERVER-41163 apply operations separately during initial sync commit transaction oplog application
-rw-r--r--jstests/replsets/initial_sync_update_missing_doc1.js79
-rw-r--r--jstests/replsets/initial_sync_update_missing_doc2.js82
-rw-r--r--jstests/replsets/initial_sync_update_missing_doc3.js54
-rw-r--r--jstests/replsets/initial_sync_update_missing_doc_with_prepare.js82
-rw-r--r--jstests/replsets/initial_sync_update_reinsert_missing_doc_with_prepare.js71
-rw-r--r--jstests/replsets/libs/initial_sync_update_missing_doc.js121
-rw-r--r--src/mongo/db/repl/apply_ops.cpp13
-rw-r--r--src/mongo/db/repl/apply_ops.h7
-rw-r--r--src/mongo/db/repl/data_replicator_external_state_mock.cpp4
-rw-r--r--src/mongo/db/repl/idempotency_test_fixture.cpp3
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp3
-rw-r--r--src/mongo/db/repl/oplog_applier.cpp6
-rw-r--r--src/mongo/db/repl/oplog_applier.h8
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp5
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.h4
-rw-r--r--src/mongo/db/repl/oplog_applier_test.cpp7
-rw-r--r--src/mongo/db/repl/replication_recovery.cpp3
-rw-r--r--src/mongo/db/repl/sync_tail.cpp62
-rw-r--r--src/mongo/db/repl/sync_tail.h10
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp94
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.cpp40
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.h9
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp7
-rw-r--r--src/mongo/shell/replsettest.js4
24 files changed, 537 insertions, 241 deletions
diff --git a/jstests/replsets/initial_sync_update_missing_doc1.js b/jstests/replsets/initial_sync_update_missing_doc1.js
index 6d4fe624892..45579a2fa21 100644
--- a/jstests/replsets/initial_sync_update_missing_doc1.js
+++ b/jstests/replsets/initial_sync_update_missing_doc1.js
@@ -8,85 +8,46 @@
* secondary will initially fail to apply the update operation in phase 3 and subsequently have
* to attempt to check the source for a new copy of the document. The absence of the document on
* the source indicates that the source is free to ignore the failed update operation.
+ *
*/
(function() {
+ load("jstests/replsets/libs/initial_sync_update_missing_doc.js");
load("jstests/libs/check_log.js");
- var name = 'initial_sync_update_missing_doc1';
- var replSet = new ReplSetTest({
+ const name = 'initial_sync_update_missing_doc1';
+ const replSet = new ReplSetTest({
name: name,
- nodes: [{}, {rsConfig: {arbiterOnly: true}}],
+ nodes: 1,
});
replSet.startSet();
replSet.initiate();
- var primary = replSet.getPrimary();
-
- var coll = primary.getDB('test').getCollection(name);
- assert.writeOK(coll.insert({_id: 0, x: 1}));
-
- // Add a secondary node but make it hang after retrieving the last op on the source
- // but before copying databases.
- var secondary = replSet.add();
- secondary.setSlaveOk();
+ const primary = replSet.getPrimary();
+ const dbName = 'test';
- assert.commandWorked(secondary.getDB('admin').runCommand(
- {configureFailPoint: 'initialSyncHangBeforeCopyingDatabases', mode: 'alwaysOn'}));
- assert.commandWorked(secondary.getDB('admin').runCommand(
- {configureFailPoint: 'initialSyncHangBeforeGettingMissingDocument', mode: 'alwaysOn'}));
- // Skip clearing initial sync progress after a successful initial sync attempt so that we
- // can check initialSyncStatus fields after initial sync is complete.
- assert.commandWorked(secondary.getDB('admin').runCommand(
- {configureFailPoint: 'skipClearInitialSyncState', mode: 'alwaysOn'}));
- replSet.reInitiate();
+ var coll = primary.getDB(dbName).getCollection(name);
+ assert.commandWorked(coll.insert({_id: 0, x: 1}));
- // Wait for fail point message to be logged.
- checkLog.contains(secondary,
- 'initial sync - initialSyncHangBeforeCopyingDatabases fail point enabled');
+ // Add a secondary node with priority: 0 and votes: 0 so that we prevent elections while
+ // it is syncing from the primary.
+ const secondaryConfig = {rsConfig: {votes: 0, priority: 0}};
+ const secondary = reInitiateSetWithSecondary(replSet, secondaryConfig);
- assert.writeOK(coll.update({_id: 0}, {x: 2}, {upsert: false, writeConcern: {w: 1}}));
- assert.writeOK(coll.remove({_id: 0}, {justOne: true, writeConcern: {w: 1}}));
+ // Update and remove document on primary.
+ assert.commandWorked(coll.update({_id: 0}, {x: 2}, {upsert: false}));
+ assert.commandWorked(coll.remove({_id: 0}, {justOne: true}));
- assert.commandWorked(secondary.getDB('admin').runCommand(
- {configureFailPoint: 'initialSyncHangBeforeCopyingDatabases', mode: 'off'}));
+ turnOffHangBeforeCopyingDatabasesFailPoint(secondary);
- checkLog.contains(secondary, 'update of non-mod failed');
- checkLog.contains(secondary, 'Fetching missing document');
- checkLog.contains(
- secondary, 'initial sync - initialSyncHangBeforeGettingMissingDocument fail point enabled');
var res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
assert.eq(res.initialSyncStatus.fetchedMissingDocs, 0);
var firstOplogEnd = res.initialSyncStatus.initialSyncOplogEnd;
- // Insert a document to move forward minValid, even though the document was not found.
- assert.writeOK(primary.getDB('test').getCollection(name + 'b').insert({_id: 1, y: 1}));
-
- assert.commandWorked(secondary.getDB('admin').runCommand(
- {configureFailPoint: 'initialSyncHangBeforeGettingMissingDocument', mode: 'off'}));
- checkLog.contains(secondary,
- 'Missing document not found on source; presumably deleted later in oplog.');
- checkLog.contains(secondary, 'initial sync done');
-
- replSet.awaitReplication();
- replSet.awaitSecondaryNodes();
-
- assert.eq(0,
- secondary.getDB('test').getCollection(name).find().itcount(),
- 'collection successfully synced to secondary');
+ turnOffHangBeforeGettingMissingDocFailPoint(primary, secondary, name, 0 /* numInserted */);
- res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
-
- // Fetch count stays at zero because we are unable to get the document from the sync source.
- assert.eq(res.initialSyncStatus.fetchedMissingDocs, 0);
-
- var finalOplogEnd = res.initialSyncStatus.initialSyncOplogEnd;
- assert(friendlyEqual(firstOplogEnd, finalOplogEnd),
- "minValid was moved forward when missing document was not fetched");
-
- assert.eq(0,
- secondary.getDB('local')['temp_oplog_buffer'].find().itcount(),
- "Oplog buffer was not dropped after initial sync");
+ finishAndValidate(replSet, name, firstOplogEnd, 0 /* numInserted */, 0 /* numCollections */);
replSet.stopSet();
+
})();
diff --git a/jstests/replsets/initial_sync_update_missing_doc2.js b/jstests/replsets/initial_sync_update_missing_doc2.js
index e47f9b7e385..512abbf8e9d 100644
--- a/jstests/replsets/initial_sync_update_missing_doc2.js
+++ b/jstests/replsets/initial_sync_update_missing_doc2.js
@@ -6,89 +6,57 @@
*
* This test updates and deletes a document on the source between phases 1 and 2. The
* secondary will initially fail to apply the update operation in phase 3 and subsequently have
- * to attempt to check the source for a new copy of the document. Before the secondary checks the
- * source, we insert a new copy of the document on the source so that the secondary can fetch it.
+ * to attempt to check the source for a new copy of the document. Before the secondary checks
+ * the source, we insert a new copy of the document on the source so that the secondary can fetch
+ * it.
+ *
*/
(function() {
+ load("jstests/replsets/libs/initial_sync_update_missing_doc.js");
load("jstests/libs/check_log.js");
var name = 'initial_sync_update_missing_doc2';
var replSet = new ReplSetTest({
name: name,
- nodes: [{}, {rsConfig: {arbiterOnly: true}}],
+ nodes: 1,
});
replSet.startSet();
replSet.initiate();
- var primary = replSet.getPrimary();
-
- var coll = primary.getDB('test').getCollection(name);
- assert.writeOK(coll.insert({_id: 0, x: 1}));
-
- // Add a secondary node but make it hang after retrieving the last op on the source
- // but before copying databases.
- var secondary = replSet.add();
- secondary.setSlaveOk();
- assert.commandWorked(secondary.getDB('admin').runCommand(
- {configureFailPoint: 'initialSyncHangBeforeCopyingDatabases', mode: 'alwaysOn'}));
- assert.commandWorked(secondary.getDB('admin').runCommand(
- {configureFailPoint: 'initialSyncHangBeforeGettingMissingDocument', mode: 'alwaysOn'}));
- // Skip clearing initial sync progress after a successful initial sync attempt so that we
- // can check initialSyncStatus fields after initial sync is complete.
- assert.commandWorked(secondary.getDB('admin').runCommand(
- {configureFailPoint: 'skipClearInitialSyncState', mode: 'alwaysOn'}));
- replSet.reInitiate();
+ const primary = replSet.getPrimary();
+ const dbName = 'test';
- // Wait for fail point message to be logged.
- checkLog.contains(secondary,
- 'initial sync - initialSyncHangBeforeCopyingDatabases fail point enabled');
+ var coll = primary.getDB(dbName).getCollection(name);
+ assert.commandWorked(coll.insert({_id: 0, x: 1}));
- assert.writeOK(coll.update({_id: 0}, {x: 2}, {upsert: false, writeConcern: {w: 1}}));
- assert.writeOK(coll.remove({_id: 0}, {justOne: true, writeConcern: {w: 1}}));
+ // Add a secondary node with priority: 0 and votes: 0 so that we prevent elections while
+ // it is syncing from the primary.
+ const secondaryConfig = {rsConfig: {votes: 0, priority: 0}};
+ const secondary = reInitiateSetWithSecondary(replSet, secondaryConfig);
- assert.commandWorked(secondary.getDB('admin').runCommand(
- {configureFailPoint: 'initialSyncHangBeforeCopyingDatabases', mode: 'off'}));
+ // Update and remove document on primary.
+ assert.commandWorked(coll.update({_id: 0}, {x: 2}, {upsert: false}));
+ assert.commandWorked(coll.remove({_id: 0}, {justOne: true}));
- checkLog.contains(secondary, 'update of non-mod failed');
- checkLog.contains(secondary, 'Fetching missing document');
+ turnOffHangBeforeCopyingDatabasesFailPoint(secondary);
- checkLog.contains(
- secondary, 'initial sync - initialSyncHangBeforeGettingMissingDocument fail point enabled');
- var doc = {_id: 0, x: 3};
- // Re-insert deleted document.
- assert.writeOK(coll.insert(doc, {writeConcern: {w: 1}}));
+ // Re-insert deleted document on the sync source. The secondary should be able to fetch and
+ // insert this document after failing to apply the udpate.
+ assert.commandWorked(coll.insert({_id: 0, x: 3}));
var res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
assert.eq(res.initialSyncStatus.fetchedMissingDocs, 0);
var firstOplogEnd = res.initialSyncStatus.initialSyncOplogEnd;
+ // Temporarily increase log levels so that we can see the 'Inserted missing document' log line.
secondary.getDB('test').setLogLevel(1, 'replication');
- assert.commandWorked(secondary.getDB('admin').runCommand(
- {configureFailPoint: 'initialSyncHangBeforeGettingMissingDocument', mode: 'off'}));
-
- checkLog.contains(secondary, 'Inserted missing document');
+ turnOffHangBeforeGettingMissingDocFailPoint(primary, secondary, name, 1 /* numInserted */);
secondary.getDB('test').setLogLevel(0, 'replication');
- checkLog.contains(secondary, 'initial sync done');
-
- replSet.awaitReplication();
- replSet.awaitSecondaryNodes();
-
- var coll = secondary.getDB('test').getCollection(name);
- assert.eq(1, coll.find().itcount(), 'collection successfully synced to secondary');
- assert.eq(doc, coll.findOne(), 'document on secondary matches primary');
-
- res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
- assert.eq(res.initialSyncStatus.fetchedMissingDocs, 1);
- var finalOplogEnd = res.initialSyncStatus.initialSyncOplogEnd;
- assert(!friendlyEqual(firstOplogEnd, finalOplogEnd),
- "minValid was not moved forward when missing document was fetched");
-
- assert.eq(0,
- secondary.getDB('local')['temp_oplog_buffer'].find().itcount(),
- "Oplog buffer was not dropped after initial sync");
+ finishAndValidate(replSet, name, firstOplogEnd, 1 /* numInserted */, 1 /* numCollections */);
replSet.stopSet();
+
})();
diff --git a/jstests/replsets/initial_sync_update_missing_doc3.js b/jstests/replsets/initial_sync_update_missing_doc3.js
index 0aaf5d3b8b4..20060521fd8 100644
--- a/jstests/replsets/initial_sync_update_missing_doc3.js
+++ b/jstests/replsets/initial_sync_update_missing_doc3.js
@@ -15,17 +15,19 @@
(function() {
load("jstests/libs/check_log.js");
+ load("jstests/replsets/libs/initial_sync_update_missing_doc.js");
load("jstests/replsets/libs/two_phase_drops.js"); // For TwoPhaseDropCollectionTest.
var name = 'initial_sync_update_missing_doc3';
var replSet = new ReplSetTest({
name: name,
- nodes: [{}, {rsConfig: {arbiterOnly: true}}],
+ nodes: 1,
});
replSet.startSet();
replSet.initiate();
- var primary = replSet.getPrimary();
+ const primary = replSet.getPrimary();
+ const dbName = 'test';
// Check for 'system.drop' two phase drop support.
if (!TwoPhaseDropCollectionTest.supportsDropPendingNamespaces(replSet)) {
@@ -34,38 +36,25 @@
return;
}
- var coll = primary.getDB('test').getCollection(name);
- assert.writeOK(coll.insert({_id: 0, x: 1}));
+ var coll = primary.getDB(dbName).getCollection(name);
+ assert.commandWorked(coll.insert({_id: 0, x: 1}));
- // Add a secondary node but make it hang after retrieving the last op on the source
- // but before copying databases.
- var secondary = replSet.add();
- secondary.setSlaveOk();
+ // Add a secondary node with priority: 0 so that we prevent elections while it is syncing
+ // from the primary.
+ // We cannot give the secondary votes: 0 because then it will not be able to acknowledge
+ // majority writes. That means the sync source can immediately drop it's collection
+ // because it alone determines the majority commit point.
+ const secondaryConfig = {rsConfig: {priority: 0}};
+ const secondary = reInitiateSetWithSecondary(replSet, secondaryConfig);
- assert.commandWorked(secondary.getDB('admin').runCommand(
- {configureFailPoint: 'initialSyncHangBeforeCopyingDatabases', mode: 'alwaysOn'}));
- assert.commandWorked(secondary.getDB('admin').runCommand(
- {configureFailPoint: 'initialSyncHangBeforeGettingMissingDocument', mode: 'alwaysOn'}));
- replSet.reInitiate();
+ // Update and remove document on primary.
+ assert.commandWorked(coll.update({_id: 0}, {x: 2}, {upsert: false}));
+ assert.commandWorked(coll.remove({_id: 0}, {justOne: true}));
- // Wait for fail point message to be logged.
- checkLog.contains(secondary,
- 'initial sync - initialSyncHangBeforeCopyingDatabases fail point enabled');
+ turnOffHangBeforeCopyingDatabasesFailPoint(secondary);
- assert.writeOK(coll.update({_id: 0}, {x: 2}, {upsert: false, writeConcern: {w: 1}}));
- assert.writeOK(coll.remove({_id: 0}, {justOne: true, writeConcern: {w: 1}}));
-
- assert.commandWorked(secondary.getDB('admin').runCommand(
- {configureFailPoint: 'initialSyncHangBeforeCopyingDatabases', mode: 'off'}));
-
- checkLog.contains(secondary, 'update of non-mod failed');
- checkLog.contains(secondary, 'Fetching missing document');
-
- checkLog.contains(
- secondary, 'initial sync - initialSyncHangBeforeGettingMissingDocument fail point enabled');
- var doc = {_id: 0, x: 3};
// Re-insert deleted document.
- assert.writeOK(coll.insert(doc, {writeConcern: {w: 1}}));
+ assert.commandWorked(coll.insert({_id: 0, x: 3}));
// Mark the collection as drop pending so it gets renamed, but retains the UUID.
assert.commandWorked(primary.getDB('test').runCommand({"drop": name}));
@@ -74,14 +63,9 @@
var firstOplogEnd = res.initialSyncStatus.initialSyncOplogEnd;
secondary.getDB('test').setLogLevel(1, 'replication');
- assert.commandWorked(secondary.getDB('admin').runCommand(
- {configureFailPoint: 'initialSyncHangBeforeGettingMissingDocument', mode: 'off'}));
-
- checkLog.contains(secondary, 'Inserted missing document');
+ turnOffHangBeforeGettingMissingDocFailPoint(primary, secondary, name, 1);
secondary.getDB('test').setLogLevel(0, 'replication');
- checkLog.contains(secondary, 'initial sync done');
-
replSet.awaitReplication();
replSet.awaitSecondaryNodes();
diff --git a/jstests/replsets/initial_sync_update_missing_doc_with_prepare.js b/jstests/replsets/initial_sync_update_missing_doc_with_prepare.js
new file mode 100644
index 00000000000..a294bf665f9
--- /dev/null
+++ b/jstests/replsets/initial_sync_update_missing_doc_with_prepare.js
@@ -0,0 +1,82 @@
+/**
+ * Initial sync runs in several phases - the first 3 are as follows:
+ * 1) fetches the last oplog entry (op_start1) on the source;
+ * 2) copies all non-local databases from the source; and
+ * 3) fetches and applies operations from the source after op_start1.
+ *
+ * This test updates and deletes a document on the source between phases 1 and 2 in a prepared
+ * transaction. The secondary will initially fail to apply the update operation in phase 3 and
+ * subsequently have to attempt to check the source for a new copy of the document. The absence of
+ * the document on the source indicates that the source is free to ignore the failed update
+ * operation.
+ *
+ * @tags: [uses_transactions, uses_prepare_transaction]
+ */
+
+(function() {
+ load("jstests/core/txns/libs/prepare_helpers.js");
+ load("jstests/replsets/libs/initial_sync_update_missing_doc.js");
+ load("jstests/libs/check_log.js");
+
+ const name = 'initial_sync_update_missing_doc_with_prepare';
+ const replSet = new ReplSetTest({
+ name: name,
+ nodes: 1,
+ });
+
+ replSet.startSet();
+ replSet.initiate();
+ const primary = replSet.getPrimary();
+ const dbName = 'test';
+
+ var coll = primary.getDB(dbName).getCollection(name);
+ assert.commandWorked(coll.insert({_id: 0, x: 1}));
+ assert.commandWorked(coll.insert({_id: 1, x: 1}));
+
+ // Add a secondary node with priority: 0 and votes: 0 so that we prevent elections while
+ // it is syncing from the primary.
+ const secondaryConfig = {rsConfig: {votes: 0, priority: 0}};
+ const secondary = reInitiateSetWithSecondary(replSet, secondaryConfig);
+
+ const session = primary.startSession();
+ const sessionDB = session.getDatabase(dbName);
+ const sessionColl = sessionDB.getCollection(name);
+ // Update and remove document on primary in a prepared transaction. Once we receive the
+ // commit for this transaction, we should apply each operation separately (one update, and
+ // one delete).
+ session.startTransaction();
+ assert.commandWorked(sessionColl.update({_id: 0}, {x: 2}, {upsert: false}));
+ assert.commandWorked(sessionColl.remove({_id: 0}, {justOne: true}));
+ const prepareTimestamp = PrepareHelpers.prepareTransaction(session);
+ assert.commandWorked(PrepareHelpers.commitTransaction(session, prepareTimestamp));
+
+ // This transaction is eventually aborted, so this document should exist on the secondary
+ // after initial sync.
+ session.startTransaction();
+ assert.commandWorked(sessionColl.update({_id: 1}, {x: 2}, {upsert: false}));
+ assert.commandWorked(sessionColl.remove({_id: 1}, {justOne: true}));
+ PrepareHelpers.prepareTransaction(session);
+ assert.commandWorked(session.abortTransaction_forTesting());
+
+ turnOffHangBeforeCopyingDatabasesFailPoint(secondary);
+
+ var res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
+ assert.eq(res.initialSyncStatus.fetchedMissingDocs, 0);
+ var firstOplogEnd = res.initialSyncStatus.initialSyncOplogEnd;
+
+ turnOffHangBeforeGettingMissingDocFailPoint(primary, secondary, name, 0 /* numInserted */);
+
+ // Since we aborted the second transaction, we expect this collection to still exist after
+ // initial sync.
+ finishAndValidate(replSet, name, firstOplogEnd, 0 /* numInserted */, 1 /* numCollections */);
+
+ // Make sure the secondary has the correct documents after syncing from the primary. The
+ // second document was deleted in the prepared transaction that was aborted. Therefore, it
+ // should have been properly replication.
+ coll = secondary.getDB(dbName).getCollection(name);
+ assert.docEq(null, coll.findOne({_id: 0}), 'document on secondary matches primary');
+ assert.docEq({_id: 1, x: 1}, coll.findOne({_id: 1}), 'document on secondary matches primary');
+
+ replSet.stopSet();
+
+})(); \ No newline at end of file
diff --git a/jstests/replsets/initial_sync_update_reinsert_missing_doc_with_prepare.js b/jstests/replsets/initial_sync_update_reinsert_missing_doc_with_prepare.js
new file mode 100644
index 00000000000..7f7ac48340f
--- /dev/null
+++ b/jstests/replsets/initial_sync_update_reinsert_missing_doc_with_prepare.js
@@ -0,0 +1,71 @@
+/**
+ * Initial sync runs in several phases - the first 3 are as follows:
+ * 1) fetches the last oplog entry (op_start1) on the source;
+ * 2) copies all non-local databases from the source; and
+ * 3) fetches and applies operations from the source after op_start1.
+ *
+ * This test updates and deletes a document on the source between phases 1 and 2 in a prepared
+ * transaction. The secondary will initially fail to apply the update operation in phase 3 and
+ * subsequently have to attempt to check the source for a new copy of the document. Before the
+ * secondary checks the source, we insert a new copy of the document on the source so that the
+ * secondary can fetch it.
+ *
+ * @tags: [uses_transactions, uses_prepare_transaction]
+ */
+
+(function() {
+ load("jstests/core/txns/libs/prepare_helpers.js");
+ load("jstests/replsets/libs/initial_sync_update_missing_doc.js");
+ load("jstests/libs/check_log.js");
+
+ const name = 'initial_sync_update_missing_doc_with_prepare';
+ const replSet = new ReplSetTest({
+ name: name,
+ nodes: 1,
+ });
+
+ replSet.startSet();
+ replSet.initiate();
+ const primary = replSet.getPrimary();
+ const dbName = 'test';
+
+ const coll = primary.getDB(dbName).getCollection(name);
+ assert.commandWorked(coll.insert({_id: 0, x: 1}));
+
+ // Add a secondary node with priority: 0 and votes: 0 so that we prevent elections while
+ // it is syncing from the primary.
+ const secondaryConfig = {rsConfig: {votes: 0, priority: 0}};
+ const secondary = reInitiateSetWithSecondary(replSet, secondaryConfig);
+
+ const session = primary.startSession();
+ const sessionDB = session.getDatabase(dbName);
+ const sessionColl = sessionDB.getCollection(name);
+ // Update and remove document on primary in a prepared transaction. Once we receive the
+ // commit for this transaction, we should apply each operation separately (one update, and
+ // one delete).
+ session.startTransaction();
+ assert.commandWorked(sessionColl.update({_id: 0}, {x: 2}, {upsert: false}));
+ assert.commandWorked(sessionColl.remove({_id: 0}, {justOne: true}));
+ const prepareTimestamp = PrepareHelpers.prepareTransaction(session);
+ assert.commandWorked(PrepareHelpers.commitTransaction(session, prepareTimestamp));
+
+ turnOffHangBeforeCopyingDatabasesFailPoint(secondary);
+
+ // Re-insert deleted document on the sync source. The secondary should be able to fetch and
+ // insert this document after failing to apply the udpate.
+ assert.commandWorked(coll.insert({_id: 0, x: 3}));
+
+ const res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
+ assert.eq(res.initialSyncStatus.fetchedMissingDocs, 0);
+ const firstOplogEnd = res.initialSyncStatus.initialSyncOplogEnd;
+
+ // Temporarily increase log levels so that we can see the 'Inserted missing document' log line.
+ secondary.getDB('test').setLogLevel(1, 'replication');
+ turnOffHangBeforeGettingMissingDocFailPoint(primary, secondary, name, 1 /* numInserted */);
+ secondary.getDB('test').setLogLevel(0, 'replication');
+
+ finishAndValidate(replSet, name, firstOplogEnd, 1 /* numInserted */, 1 /* numCollections */);
+
+ replSet.stopSet();
+
+})(); \ No newline at end of file
diff --git a/jstests/replsets/libs/initial_sync_update_missing_doc.js b/jstests/replsets/libs/initial_sync_update_missing_doc.js
new file mode 100644
index 00000000000..df1bba79fd5
--- /dev/null
+++ b/jstests/replsets/libs/initial_sync_update_missing_doc.js
@@ -0,0 +1,121 @@
+"use strict";
+/**
+ * Initial sync runs in several phases - the first 3 are as follows:
+ * 1) fetches the last oplog entry (op_start1) on the source;
+ * 2) copies all non-local databases from the source; and
+ * 3) fetches and applies operations from the source after op_start1.
+ *
+ * This library is used to delete documents on the sync source between the first two phases so
+ * that the secondary will fail to apply the update operation in phase three.
+ */
+
+// reInitiate the replica set with a secondary node, which will go through initial sync. This
+// function will hand the secondary in initial sync. turnOffHangBeforeCopyingDatabasesFailPoint
+// must be called after reInitiateSetWithSecondary, followed by
+// turnOffHangBeforeGettingMissingDocFailPoint.
+var reInitiateSetWithSecondary = function(replSet, secondaryConfig) {
+
+ const secondary = replSet.add(secondaryConfig);
+ secondary.setSlaveOk();
+
+ // Make the secondary hang after retrieving the last op on the sync source but before
+ // copying databases.
+ assert.commandWorked(secondary.getDB('admin').runCommand(
+ {configureFailPoint: 'initialSyncHangBeforeCopyingDatabases', mode: 'alwaysOn'}));
+ assert.commandWorked(secondary.getDB('admin').runCommand(
+ {configureFailPoint: 'initialSyncHangBeforeGettingMissingDocument', mode: 'alwaysOn'}));
+
+ // Skip clearing initial sync progress after a successful initial sync attempt so that we
+ // can check initialSyncStatus fields after initial sync is complete.
+ assert.commandWorked(secondary.adminCommand(
+ {configureFailPoint: 'skipClearInitialSyncState', mode: 'alwaysOn'}));
+
+ replSet.reInitiate();
+
+ // Wait for fail point message to be logged.
+ checkLog.contains(secondary,
+ 'initial sync - initialSyncHangBeforeCopyingDatabases fail point enabled');
+
+ return secondary;
+
+};
+
+// Must be called after reInitiateSetWithSecondary. Turns off the
+// initialSyncHangBeforeCopyingDatabases fail point so that the secondary will start copying all
+// non-local databases.
+var turnOffHangBeforeCopyingDatabasesFailPoint = function(secondary) {
+
+ assert.commandWorked(secondary.getDB('admin').runCommand(
+ {configureFailPoint: 'initialSyncHangBeforeCopyingDatabases', mode: 'off'}));
+
+ // The following checks assume that we have updated and deleted a document on the sync source
+ // that the secondary will try to update in phase 3.
+ checkLog.contains(secondary, 'update of non-mod failed');
+ checkLog.contains(secondary, 'Fetching missing document');
+ checkLog.contains(
+ secondary, 'initial sync - initialSyncHangBeforeGettingMissingDocument fail point enabled');
+};
+
+// Must be called after turnOffHangBeforeCopyingDatabasesFailPoint. Turns off the
+// initialSyncHangBeforeGettingMissingDocument fail point so that the secondary can check if the
+// sync source has the missing document.
+var turnOffHangBeforeGettingMissingDocFailPoint = function(primary, secondary, name, numInserted) {
+
+ if (numInserted === 0) {
+ // If we did not re-insert the missing document, insert an arbitrary document to move
+ // forward minValid even though the document was not found.
+ assert.commandWorked(
+ primary.getDB('test').getCollection(name + 'b').insert({_id: 1, y: 1}));
+ }
+
+ assert.commandWorked(secondary.getDB('admin').runCommand(
+ {configureFailPoint: 'initialSyncHangBeforeGettingMissingDocument', mode: 'off'}));
+
+ // If we've re-inserted the missing document between secondaryHangsBeforeGettingMissingDoc and
+ // this function, the secondary will insert the missing document after it fails the update.
+ // Otherwise, it will fail to fetch anything from the sync source because the document was
+ // deleted.
+ if (numInserted > 0) {
+ checkLog.contains(secondary, 'Inserted missing document');
+ } else {
+ checkLog.contains(
+ secondary, 'Missing document not found on source; presumably deleted later in oplog.');
+ }
+ checkLog.contains(secondary, 'initial sync done');
+
+};
+
+var finishAndValidate = function(replSet, name, firstOplogEnd, numInserted, numCollections) {
+
+ replSet.awaitReplication();
+ replSet.awaitSecondaryNodes();
+ const dbName = 'test';
+ const secondary = replSet.getSecondary();
+
+ assert.eq(numCollections,
+ secondary.getDB(dbName).getCollection(name).find().itcount(),
+ 'collection successfully synced to secondary');
+
+ const res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
+
+ // If we haven't re-inserted any documents after deleting them, the fetch count is 0 because we
+ // are unable to get the document from the sync source.
+ assert.eq(res.initialSyncStatus.fetchedMissingDocs, numInserted);
+
+ const finalOplogEnd = res.initialSyncStatus.initialSyncOplogEnd;
+
+ if (numInserted > 0) {
+ assert.neq(firstOplogEnd,
+ finalOplogEnd,
+ "minValid was not moved forward when missing document was fetched");
+ } else {
+ assert.eq(firstOplogEnd,
+ finalOplogEnd,
+ "minValid was moved forward when missing document was not fetched");
+ }
+
+ assert.eq(0,
+ secondary.getDB('local')['temp_oplog_buffer'].find().itcount(),
+ "Oplog buffer was not dropped after initial sync");
+
+};
diff --git a/src/mongo/db/repl/apply_ops.cpp b/src/mongo/db/repl/apply_ops.cpp
index 223568e52a0..54971bbdee8 100644
--- a/src/mongo/db/repl/apply_ops.cpp
+++ b/src/mongo/db/repl/apply_ops.cpp
@@ -486,13 +486,14 @@ Status applyOps(OperationContext* opCtx,
// static
MultiApplier::Operations ApplyOps::extractOperations(const OplogEntry& applyOpsOplogEntry) {
MultiApplier::Operations result;
- extractOperationsTo(applyOpsOplogEntry, applyOpsOplogEntry.toBSON(), &result);
+ extractOperationsTo(applyOpsOplogEntry, applyOpsOplogEntry.toBSON(), &result, boost::none);
return result;
}
void ApplyOps::extractOperationsTo(const OplogEntry& applyOpsOplogEntry,
const BSONObj& topLevelDoc,
- MultiApplier::Operations* operations) {
+ MultiApplier::Operations* operations,
+ boost::optional<Timestamp> commitOplogEntryTS) {
uassert(ErrorCodes::TypeMismatch,
str::stream() << "ApplyOps::extractOperations(): not a command: "
<< redact(applyOpsOplogEntry.toBSON()),
@@ -513,8 +514,16 @@ void ApplyOps::extractOperationsTo(const OplogEntry& applyOpsOplogEntry,
for (const auto& elem : operationDocs) {
auto operationDoc = elem.Obj();
BSONObjBuilder builder(operationDoc);
+
+ // Apply the ts field first if we have a commitOplogEntryTS so that appendElementsUnique
+ // will not overwrite this value.
+ if (commitOplogEntryTS) {
+ builder.append("ts", *commitOplogEntryTS);
+ }
+
builder.appendElementsUnique(topLevelDoc);
auto operation = builder.obj();
+
operations->emplace_back(operation);
}
}
diff --git a/src/mongo/db/repl/apply_ops.h b/src/mongo/db/repl/apply_ops.h
index 0672ce6d05f..c5cca31569f 100644
--- a/src/mongo/db/repl/apply_ops.h
+++ b/src/mongo/db/repl/apply_ops.h
@@ -56,10 +56,15 @@ public:
* This variant allows optimization for extracting multiple applyOps operations. The entry for
* the non-DurableReplOperation fields of the extracted operation must be specified as
* 'topLevelDoc', and need not be any of the applyOps operations.
+ *
+ * If a commitOplogEntryTS Timestamp is passed in, then we are extracting applyOps operations
+ * from a prepare oplog entry during initial sync. These operations must be timestamped at the
+ * commit oplog entry timestamp instead of the prepareTimestamp.
*/
static void extractOperationsTo(const OplogEntry& applyOpsOplogEntry,
const BSONObj& topLevelDoc,
- MultiApplier::Operations* operations);
+ MultiApplier::Operations* operations,
+ boost::optional<Timestamp> commitOplogEntryTS);
};
/**
diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
index 322f91a29dc..cbee6637c67 100644
--- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp
+++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp
@@ -56,7 +56,9 @@ public:
private:
void _run(OplogBuffer* oplogBuffer) final {}
void _shutdown() final {}
- StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) final {
+ StatusWith<OpTime> _multiApply(OperationContext* opCtx,
+ Operations ops,
+ boost::optional<repl::OplogApplication::Mode> mode) final {
return _externalState->multiApplyFn(opCtx, ops, _observer);
}
diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp
index 6a5fe64ba4a..ac0a2510718 100644
--- a/src/mongo/db/repl/idempotency_test_fixture.cpp
+++ b/src/mongo/db/repl/idempotency_test_fixture.cpp
@@ -389,7 +389,8 @@ void IdempotencyTest::testOpsAreIdempotent(std::vector<OplogEntry> ops, Sequence
std::vector<MultiApplier::OperationPtrs> writerVectors(1);
std::vector<MultiApplier::Operations> derivedOps;
// Derive ops for transactions if necessary.
- syncTail.fillWriterVectors(_opCtx.get(), &ops, &writerVectors, &derivedOps);
+ syncTail.fillWriterVectors(
+ _opCtx.get(), &ops, &writerVectors, &derivedOps, OplogApplication::Mode::kInitialSync);
const auto& opPtrs = writerVectors[0];
ASSERT_OK(runOpPtrsInitialSync(opPtrs));
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 784a5cfba2c..2038989393e 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -1190,7 +1190,8 @@ void InitialSyncer::_getNextApplierBatchCallback(
_fetchCount.store(0);
MultiApplier::MultiApplyFn applyBatchOfOperationsFn = [this](OperationContext* opCtx,
MultiApplier::Operations ops) {
- return _oplogApplier->multiApply(opCtx, std::move(ops));
+ return _oplogApplier->multiApply(
+ opCtx, std::move(ops), repl::OplogApplication::Mode::kInitialSync);
};
OpTime lastApplied = ops.back().getOpTime();
invariant(ops.back().getWallClockTime());
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp
index 61b5d790bf5..24a484f1362 100644
--- a/src/mongo/db/repl/oplog_applier.cpp
+++ b/src/mongo/db/repl/oplog_applier.cpp
@@ -289,9 +289,11 @@ StatusWith<OplogApplier::Operations> OplogApplier::getNextApplierBatch(
return std::move(ops);
}
-StatusWith<OpTime> OplogApplier::multiApply(OperationContext* opCtx, Operations ops) {
+StatusWith<OpTime> OplogApplier::multiApply(OperationContext* opCtx,
+ Operations ops,
+ boost::optional<repl::OplogApplication::Mode> mode) {
_observer->onBatchBegin(ops);
- auto lastApplied = _multiApply(opCtx, std::move(ops));
+ auto lastApplied = _multiApply(opCtx, std::move(ops), mode);
_observer->onBatchEnd(lastApplied, {});
return lastApplied;
}
diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h
index 71ea92b4f3f..9650036b496 100644
--- a/src/mongo/db/repl/oplog_applier.h
+++ b/src/mongo/db/repl/oplog_applier.h
@@ -195,7 +195,9 @@ public:
*
* TODO: remove when enqueue() is implemented.
*/
- StatusWith<OpTime> multiApply(OperationContext* opCtx, Operations ops);
+ StatusWith<OpTime> multiApply(OperationContext* opCtx,
+ Operations ops,
+ boost::optional<repl::OplogApplication::Mode> mode);
private:
/**
@@ -221,7 +223,9 @@ private:
* Called from multiApply() to apply a batch of operations in parallel.
* Implemented in subclasses but not visible otherwise.
*/
- virtual StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) = 0;
+ virtual StatusWith<OpTime> _multiApply(OperationContext* opCtx,
+ Operations ops,
+ boost::optional<repl::OplogApplication::Mode> mode) = 0;
// Used to schedule task for oplog application loop.
// Not owned by us.
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index 8c293a1bbba..9b3126cb781 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -61,8 +61,9 @@ void OplogApplierImpl::_shutdown() {
_syncTail.shutdown();
}
-StatusWith<OpTime> OplogApplierImpl::_multiApply(OperationContext* opCtx, Operations ops) {
- return _syncTail.multiApply(opCtx, std::move(ops));
+StatusWith<OpTime> OplogApplierImpl::_multiApply(
+ OperationContext* opCtx, Operations ops, boost::optional<repl::OplogApplication::Mode> mode) {
+ return _syncTail.multiApply(opCtx, std::move(ops), mode);
}
} // namespace repl
diff --git a/src/mongo/db/repl/oplog_applier_impl.h b/src/mongo/db/repl/oplog_applier_impl.h
index d6e0e523220..741cfd4dcd3 100644
--- a/src/mongo/db/repl/oplog_applier_impl.h
+++ b/src/mongo/db/repl/oplog_applier_impl.h
@@ -67,7 +67,9 @@ private:
void _shutdown() override;
- StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) override;
+ StatusWith<OpTime> _multiApply(OperationContext* opCtx,
+ Operations ops,
+ boost::optional<repl::OplogApplication::Mode> mode) override;
// Not owned by us.
ReplicationCoordinator* const _replCoord;
diff --git a/src/mongo/db/repl/oplog_applier_test.cpp b/src/mongo/db/repl/oplog_applier_test.cpp
index 352359bae58..05040963774 100644
--- a/src/mongo/db/repl/oplog_applier_test.cpp
+++ b/src/mongo/db/repl/oplog_applier_test.cpp
@@ -56,7 +56,9 @@ public:
void _run(OplogBuffer* oplogBuffer) final;
void _shutdown() final;
- StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) final;
+ StatusWith<OpTime> _multiApply(OperationContext* opCtx,
+ Operations ops,
+ boost::optional<repl::OplogApplication::Mode> mode) final;
};
OplogApplierMock::OplogApplierMock(OplogBuffer* oplogBuffer)
@@ -66,7 +68,8 @@ void OplogApplierMock::_run(OplogBuffer* oplogBuffer) {}
void OplogApplierMock::_shutdown() {}
-StatusWith<OpTime> OplogApplierMock::_multiApply(OperationContext* opCtx, Operations ops) {
+StatusWith<OpTime> OplogApplierMock::_multiApply(
+ OperationContext* opCtx, Operations ops, boost::optional<repl::OplogApplication::Mode> mode) {
return OpTime();
}
diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp
index 2e8890ddaa4..4f03e19b894 100644
--- a/src/mongo/db/repl/replication_recovery.cpp
+++ b/src/mongo/db/repl/replication_recovery.cpp
@@ -402,7 +402,8 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx,
OplogApplier::Operations batch;
while (
!(batch = fassert(50763, oplogApplier.getNextApplierBatch(opCtx, batchLimits))).empty()) {
- applyThroughOpTime = uassertStatusOK(oplogApplier.multiApply(opCtx, std::move(batch)));
+ applyThroughOpTime = uassertStatusOK(
+ oplogApplier.multiApply(opCtx, std::move(batch), OplogApplication::Mode::kRecovering));
}
stats.complete(applyThroughOpTime);
invariant(oplogBuffer.isEmpty(),
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 6b33953ca12..2c3e6317dc0 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -796,7 +796,7 @@ void SyncTail::_oplogApplication(ReplicationCoordinator* replCoord,
// Apply the operations in this batch. 'multiApply' returns the optime of the last op that
// was applied, which should be the last optime in the batch.
auto lastOpTimeAppliedInBatch =
- fassertNoTrace(34437, multiApply(&opCtx, ops.releaseBatch()));
+ fassertNoTrace(34437, multiApply(&opCtx, ops.releaseBatch(), boost::none));
invariant(lastOpTimeAppliedInBatch == lastOpTimeInBatch);
// In order to provide resilience in the event of a crash in the middle of batch
@@ -847,6 +847,12 @@ inline bool isCommitApplyOps(const OplogEntry& entry) {
!entry.isPartialTransaction() && !entry.getObject().getBoolField("prepare");
}
+// Returns whether a commitTransaction oplog entry is a part of a prepared transaction.
+inline bool isPreparedCommit(const OplogEntry& entry) {
+ return entry.getCommandType() == OplogEntry::CommandType::kCommitTransaction;
+}
+
+
void SyncTail::shutdown() {
stdx::lock_guard<stdx::mutex> lock(_mutex);
_inShutdown = true;
@@ -1109,7 +1115,8 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
std::vector<MultiApplier::Operations>* derivedOps,
- SessionUpdateTracker* sessionUpdateTracker) {
+ SessionUpdateTracker* sessionUpdateTracker,
+ boost::optional<repl::OplogApplication::Mode> mode) {
const auto serviceContext = opCtx->getServiceContext();
const auto storageEngine = serviceContext->getStorageEngine();
@@ -1137,7 +1144,8 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
if (sessionUpdateTracker) {
if (auto newOplogWrites = sessionUpdateTracker->updateSession(op)) {
derivedOps->emplace_back(std::move(*newOplogWrites));
- _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ _fillWriterVectors(
+ opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode);
}
}
@@ -1199,13 +1207,13 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
// messing up the state of the opCtx. In particular we do not want to
// set the ReadSource to kLastApplied.
ReadSourceScope readSourceScope(opCtx);
- derivedOps->emplace_back(
- readTransactionOperationsFromOplogChain(opCtx, op, partialTxnList));
+ derivedOps->emplace_back(readTransactionOperationsFromOplogChain(
+ opCtx, op, partialTxnList, boost::none));
partialTxnList.clear();
}
// Transaction entries cannot have different session updates.
_fillWriterVectors(
- opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode);
} else {
// The applyOps entry was not generated as part of a transaction.
invariant(!op.getPrevWriteOpTimeInTransaction());
@@ -1213,7 +1221,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
// Nested entries cannot have different session updates.
_fillWriterVectors(
- opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode);
}
} catch (...) {
fassertFailedWithStatusNoTrace(
@@ -1225,6 +1233,32 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
continue;
}
+ // If we see a commitTransaction command that is a part of a prepared transaction during
+ // initial sync, find the prepare oplog entry, extract applyOps operations, and fill writers
+ // with the extracted operations.
+ if (isPreparedCommit(op) && (mode == OplogApplication::Mode::kInitialSync)) {
+ auto logicalSessionId = op.getSessionId();
+ auto& partialTxnList = partialTxnOps[*logicalSessionId];
+
+ {
+ // Traverse the oplog chain with its own snapshot and read timestamp.
+ ReadSourceScope readSourceScope(opCtx);
+
+ // Get the previous oplog entry, which should be a prepare oplog entry.
+ const auto prevOplogEntry = getPreviousOplogEntry(opCtx, op);
+ invariant(prevOplogEntry.shouldPrepare());
+
+ // Extract the operations from the applyOps entry.
+ auto commitOplogEntryOpTime = op.getOpTime();
+ derivedOps->emplace_back(readTransactionOperationsFromOplogChain(
+ opCtx, prevOplogEntry, partialTxnList, commitOplogEntryOpTime.getTimestamp()));
+ }
+
+ _fillWriterVectors(
+ opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode);
+ continue;
+ }
+
auto& writer = (*writerVectors)[hash % numWriters];
if (writer.empty()) {
writer.reserve(8); // Skip a few growth rounds
@@ -1236,14 +1270,15 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
void SyncTail::fillWriterVectors(OperationContext* opCtx,
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
- std::vector<MultiApplier::Operations>* derivedOps) {
+ std::vector<MultiApplier::Operations>* derivedOps,
+ boost::optional<repl::OplogApplication::Mode> mode) {
SessionUpdateTracker sessionUpdateTracker;
- _fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker);
+ _fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker, mode);
auto newOplogWrites = sessionUpdateTracker.flushAll();
if (!newOplogWrites.empty()) {
derivedOps->emplace_back(std::move(newOplogWrites));
- _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode);
}
}
@@ -1275,10 +1310,13 @@ void SyncTail::_applyOps(std::vector<MultiApplier::OperationPtrs>& writerVectors
}
}
-StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) {
+StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx,
+ MultiApplier::Operations ops,
+ boost::optional<repl::OplogApplication::Mode> mode) {
invariant(!ops.empty());
LOG(2) << "replication batch size is " << ops.size();
+
// Stop all readers until we're done. This also prevents doc-locking engines from deleting old
// entries from the oplog until we finish writing.
Lock::ParallelBatchWriterMode pbwm(opCtx->lockState());
@@ -1318,7 +1356,7 @@ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::O
std::vector<MultiApplier::Operations> derivedOps;
std::vector<MultiApplier::OperationPtrs> writerVectors(_writerPool->getStats().numThreads);
- fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps);
+ fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps, mode);
// Wait for writes to finish before applying ops.
_writerPool->waitForIdle();
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index fd314fd3564..5f9961c8b22 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -228,12 +228,15 @@ public:
* to at least the last optime of the batch. If 'minValid' is already greater than or equal
* to the last optime of this batch, it will not be updated.
*/
- StatusWith<OpTime> multiApply(OperationContext* opCtx, MultiApplier::Operations ops);
+ StatusWith<OpTime> multiApply(OperationContext* opCtx,
+ MultiApplier::Operations ops,
+ boost::optional<repl::OplogApplication::Mode> mode);
void fillWriterVectors(OperationContext* opCtx,
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
- std::vector<MultiApplier::Operations>* derivedOps);
+ std::vector<MultiApplier::Operations>* derivedOps,
+ boost::optional<repl::OplogApplication::Mode> mode);
private:
class OpQueueBatcher;
@@ -244,7 +247,8 @@ private:
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
std::vector<MultiApplier::Operations>* derivedOps,
- SessionUpdateTracker* sessionUpdateTracker);
+ SessionUpdateTracker* sessionUpdateTracker,
+ boost::optional<repl::OplogApplication::Mode> mode);
/**
* Doles out all the work to the writer pool threads. Does not modify writerVectors, but passes
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index 514192587ba..9ec52ed34e3 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -408,7 +408,7 @@ DEATH_TEST_F(SyncTailTest, MultiApplyAbortsWhenNoOperationsAreGiven, "!ops.empty
getStorageInterface(),
noopApplyOperationFn,
writerPool.get());
- syncTail.multiApply(_opCtx.get(), {}).getStatus().ignore();
+ syncTail.multiApply(_opCtx.get(), {}, boost::none).getStatus().ignore();
}
bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx,
@@ -434,7 +434,7 @@ bool _testOplogEntryIsForCappedCollection(OperationContext* opCtx,
SyncTail syncTail(
nullptr, consistencyMarkers, storageInterface, applyOperationFn, writerPool.get());
- auto lastOpTime = unittest::assertGet(syncTail.multiApply(opCtx, {op}));
+ auto lastOpTime = unittest::assertGet(syncTail.multiApply(opCtx, {op}, boost::none));
ASSERT_EQUALS(op.getOpTime(), lastOpTime);
ASSERT_EQUALS(1U, operationsApplied.size());
@@ -598,7 +598,7 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) {
// being put in the oplog and updating the transaction table, but not actually being applied
// because they are part of a pending transaction.
const auto expectedStartOpTime = _insertOp1->getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1}, boost::none));
ASSERT_EQ(1U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(oplogDocs().back(), _insertOp1->getRaw());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
@@ -613,7 +613,7 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) {
// Apply a batch with only the second operation. This should result in the second oplog entry
// being put in the oplog, but with no effect because the operation is part of a pending
// transaction.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp2}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp2}, boost::none));
ASSERT_EQ(2U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(oplogDocs().back(), _insertOp2->getRaw());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
@@ -629,7 +629,7 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionSeparate) {
// Apply a batch with only the commit. This should result in the commit being put in the
// oplog, and the two previous entries being applied.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitOp}, boost::none));
ASSERT_EQ(3U, oplogDocs().size());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_EQ(2U, _insertedDocs[_nss2].size());
@@ -656,7 +656,8 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionAllAtOnce) {
// Apply both inserts and the commit in a single batch. We expect no oplog entries to
// be inserted (because we've set skipWritesToOplog), and both entries to be committed.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2, *_commitOp}));
+ ASSERT_OK(
+ syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2, *_commitOp}, boost::none));
ASSERT_EQ(0U, oplogDocs().size());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_EQ(2U, _insertedDocs[_nss2].size());
@@ -710,7 +711,7 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionTwoBatches) {
// Insert the first entry in its own batch. This should result in the oplog entry being written
// but the entry should not be applied as it is part of a pending transaction.
const auto expectedStartOpTime = insertOps[0].getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOps[0]}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOps[0]}, boost::none));
ASSERT_EQ(1U, oplogDocs().size());
ASSERT_EQ(0U, _insertedDocs[_nss1].size());
ASSERT_EQ(0U, _insertedDocs[_nss2].size());
@@ -723,8 +724,8 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyUnpreparedTransactionTwoBatches) {
// Insert the rest of the entries, including the commit. These entries should be added to the
// oplog, and all the entries including the first should be applied.
- ASSERT_OK(
- syncTail.multiApply(_opCtx.get(), {insertOps[1], insertOps[2], insertOps[3], commitOp}));
+ ASSERT_OK(syncTail.multiApply(
+ _opCtx.get(), {insertOps[1], insertOps[2], insertOps[3], commitOp}, boost::none));
ASSERT_EQ(5U, oplogDocs().size());
ASSERT_EQ(3U, _insertedDocs[_nss1].size());
ASSERT_EQ(1U, _insertedDocs[_nss2].size());
@@ -847,7 +848,8 @@ TEST_F(MultiOplogEntrySyncTailTest, MultiApplyTwoTransactionsOneBatch) {
// once.
ASSERT_OK(syncTail.multiApply(
_opCtx.get(),
- {insertOps1[0], insertOps1[1], commitOp1, insertOps2[0], insertOps2[1], commitOp2}));
+ {insertOps1[0], insertOps1[1], commitOp1, insertOps2[0], insertOps2[1], commitOp2},
+ boost::none));
ASSERT_EQ(6U, oplogDocs().size());
ASSERT_EQ(4, replOpCounters.getInsert()->load() - insertsBefore);
ASSERT_EQ(4U, _insertedDocs[_nss1].size());
@@ -956,7 +958,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea
// being put in the oplog and updating the transaction table, but not actually being applied
// because they are part of a pending transaction.
const auto expectedStartOpTime = _insertOp1->getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}, boost::none));
ASSERT_EQ(2U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(_insertOp1->getRaw(), oplogDocs()[0]);
ASSERT_BSONOBJ_EQ(_insertOp2->getRaw(), oplogDocs()[1]);
@@ -972,7 +974,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea
// Apply a batch with only the prepare. This should result in the prepare being put in the
// oplog, and the two previous entries being applied (but in a transaction) along with the
// nested insert in the prepare oplog entry.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}, boost::none));
ASSERT_EQ(3U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(_prepareWithPrevOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
@@ -986,7 +988,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionStea
// Apply a batch with only the commit. This should result in the commit being put in the
// oplog, and the three previous entries being committed.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}, boost::none));
ASSERT_BSONOBJ_EQ(_commitPrepareWithPrevOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_EQ(2U, _insertedDocs[_nss2].size());
@@ -1006,7 +1008,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactio
// being put in the oplog and updating the transaction table, but not actually being applied
// because they are part of a pending transaction.
const auto expectedStartOpTime = _insertOp1->getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}, boost::none));
checkTxnTable(_lsid,
_txnNum,
_insertOp1->getOpTime(),
@@ -1017,7 +1019,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactio
// Apply a batch with only the prepare. This should result in the prepare being put in the
// oplog, and the two previous entries being applied (but in a transaction) along with the
// nested insert in the prepare oplog entry.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}, boost::none));
checkTxnTable(_lsid,
_txnNum,
_prepareWithPrevOp->getOpTime(),
@@ -1027,7 +1029,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortPreparedTransactio
// Apply a batch with only the abort. This should result in the abort being put in the
// oplog and the transaction table being updated accordingly.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortPrepareWithPrevOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortPrepareWithPrevOp}, boost::none));
ASSERT_BSONOBJ_EQ(_abortPrepareWithPrevOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_EQ(2U, _insertedDocs[_nss2].size());
@@ -1051,7 +1053,8 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit
// being put in the oplog and updating the transaction table, but not actually being applied
// because they are part of a pending transaction.
const auto expectedStartOpTime = _insertOp1->getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
+ ASSERT_OK(syncTail.multiApply(
+ _opCtx.get(), {*_insertOp1, *_insertOp2}, OplogApplication::Mode::kInitialSync));
ASSERT_EQ(2U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(_insertOp1->getRaw(), oplogDocs()[0]);
ASSERT_BSONOBJ_EQ(_insertOp2->getRaw(), oplogDocs()[1]);
@@ -1066,7 +1069,8 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit
// Apply a batch with only the prepare applyOps. This should result in the prepare being put in
// the oplog, but, since this is initial sync, nothing else.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}));
+ ASSERT_OK(syncTail.multiApply(
+ _opCtx.get(), {*_prepareWithPrevOp}, OplogApplication::Mode::kInitialSync));
ASSERT_EQ(3U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(_prepareWithPrevOp->getRaw(), oplogDocs().back());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
@@ -1080,7 +1084,8 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionInit
// Apply a batch with only the commit. This should result in the commit being put in the
// oplog, and the three previous entries being applied.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}));
+ ASSERT_OK(syncTail.multiApply(
+ _opCtx.get(), {*_commitPrepareWithPrevOp}, OplogApplication::Mode::kInitialSync));
ASSERT_BSONOBJ_EQ(_commitPrepareWithPrevOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_EQ(2U, _insertedDocs[_nss2].size());
@@ -1115,7 +1120,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco
// Apply a batch with the insert operations. This should have no effect, because this is
// recovery.
const auto expectedStartOpTime = _insertOp1->getOpTime();
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_insertOp1, *_insertOp2}, boost::none));
ASSERT_TRUE(oplogDocs().empty());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -1128,7 +1133,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco
// Apply a batch with only the prepare applyOps. This should have no effect, since this is
// recovery.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_prepareWithPrevOp}, boost::none));
ASSERT_TRUE(oplogDocs().empty());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -1141,7 +1146,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyPreparedTransactionReco
// Apply a batch with only the commit. This should result in the the three previous entries
// being applied.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitPrepareWithPrevOp}, boost::none));
ASSERT_TRUE(oplogDocs().empty());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_EQ(2U, _insertedDocs[_nss2].size());
@@ -1161,7 +1166,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplySingleApplyOpsPreparedT
// Apply a batch with only the prepare applyOps. This should result in the prepare being put in
// the oplog, and the nested insert being applied (but in a transaction).
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}, boost::none));
ASSERT_EQ(1U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(_singlePrepareApplyOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
@@ -1174,7 +1179,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplySingleApplyOpsPreparedT
// Apply a batch with only the commit. This should result in the commit being put in the
// oplog, and prepared insert being committed.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}, boost::none));
ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -1202,7 +1207,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTr
// Apply a batch with only the prepare applyOps. This should result in the prepare being put in
// the oplog, and the nested insert being applied (but in a transaction).
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {emptyPrepareApplyOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {emptyPrepareApplyOp}, boost::none));
ASSERT_EQ(1U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(emptyPrepareApplyOp.getRaw(), oplogDocs().back());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
@@ -1215,7 +1220,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyEmptyApplyOpsPreparedTr
// Apply a batch with only the commit. This should result in the commit being put in the
// oplog, and prepared insert being committed.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}, boost::none));
ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -1234,7 +1239,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortSingleApplyOpsPrep
const auto expectedStartOpTime = _singlePrepareApplyOp->getOpTime();
// Apply a batch with only the prepare applyOps. This should result in the prepare being put in
// the oplog, and the nested insert being applied (but in a transaction).
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}, boost::none));
checkTxnTable(_lsid,
_txnNum,
_singlePrepareApplyOp->getOpTime(),
@@ -1244,7 +1249,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest, MultiApplyAbortSingleApplyOpsPrep
// Apply a batch with only the abort. This should result in the abort being put in the
// oplog and the transaction table being updated accordingly.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortSinglePrepareApplyOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_abortSinglePrepareApplyOp}, boost::none));
ASSERT_BSONOBJ_EQ(_abortSinglePrepareApplyOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -1269,7 +1274,8 @@ TEST_F(MultiOplogEntryPreparedTransactionTest,
// Apply a batch with only the prepare applyOps. This should result in the prepare being put in
// the oplog, but, since this is initial sync, nothing else.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}));
+ ASSERT_OK(syncTail.multiApply(
+ _opCtx.get(), {*_singlePrepareApplyOp}, OplogApplication::Mode::kInitialSync));
ASSERT_EQ(1U, oplogDocs().size());
ASSERT_BSONOBJ_EQ(_singlePrepareApplyOp->getRaw(), oplogDocs().back());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
@@ -1283,7 +1289,8 @@ TEST_F(MultiOplogEntryPreparedTransactionTest,
// Apply a batch with only the commit. This should result in the commit being put in the
// oplog, and the previous entry being applied.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}));
+ ASSERT_OK(syncTail.multiApply(
+ _opCtx.get(), {*_commitSinglePrepareApplyOp}, OplogApplication::Mode::kInitialSync));
ASSERT_BSONOBJ_EQ(_commitSinglePrepareApplyOp->getRaw(), oplogDocs().back());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -1319,7 +1326,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest,
// Apply a batch with only the prepare applyOps. This should have no effect, since this is
// recovery.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_singlePrepareApplyOp}, boost::none));
ASSERT_TRUE(oplogDocs().empty());
ASSERT_TRUE(_insertedDocs[_nss1].empty());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -1332,7 +1339,7 @@ TEST_F(MultiOplogEntryPreparedTransactionTest,
// Apply a batch with only the commit. This should result in the previous entry being
// applied.
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {*_commitSinglePrepareApplyOp}, boost::none));
ASSERT_TRUE(oplogDocs().empty());
ASSERT_EQ(1U, _insertedDocs[_nss1].size());
ASSERT_TRUE(_insertedDocs[_nss2].empty());
@@ -2551,7 +2558,7 @@ TEST_F(SyncTailTxnTableTest, SimpleWriteWithTxn) {
auto writerPool = OplogApplier::makeWriterPool();
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp}, boost::none));
checkTxnTable(sessionInfo, {Timestamp(1, 0), 1}, date);
}
@@ -2582,7 +2589,7 @@ TEST_F(SyncTailTxnTableTest, WriteWithTxnMixedWithDirectWriteToTxnTable) {
auto writerPool = OplogApplier::makeWriterPool();
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp}, boost::none));
ASSERT_FALSE(docExists(
_opCtx.get(),
@@ -2626,7 +2633,7 @@ TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectDeleteToTxnTa
auto writerPool = OplogApplier::makeWriterPool();
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp, insertOp2}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, deleteOp, insertOp2}, boost::none));
checkTxnTable(sessionInfo, {Timestamp(3, 0), 2}, date);
}
@@ -2658,7 +2665,7 @@ TEST_F(SyncTailTxnTableTest, InterleavedWriteWithTxnMixedWithDirectUpdateToTxnTa
auto writerPool = OplogApplier::makeWriterPool();
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, updateOp}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOp, updateOp}, boost::none));
checkTxnTable(sessionInfo, newWriteOpTime, date);
}
@@ -2719,7 +2726,8 @@ TEST_F(SyncTailTxnTableTest, RetryableWriteThenMultiStatementTxnWriteOnSameSessi
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {retryableInsertOp, txnInsertOp, txnCommitOp}));
+ ASSERT_OK(syncTail.multiApply(
+ _opCtx.get(), {retryableInsertOp, txnInsertOp, txnCommitOp}, boost::none));
repl::checkTxnTable(_opCtx.get(),
*sessionInfo.getSessionId(),
@@ -2785,7 +2793,8 @@ TEST_F(SyncTailTxnTableTest, MultiStatementTxnWriteThenRetryableWriteOnSameSessi
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {txnInsertOp, txnCommitOp, retryableInsertOp}));
+ ASSERT_OK(syncTail.multiApply(
+ _opCtx.get(), {txnInsertOp, txnCommitOp, retryableInsertOp}, boost::none));
repl::checkTxnTable(_opCtx.get(),
*sessionInfo.getSessionId(),
@@ -2850,7 +2859,8 @@ TEST_F(SyncTailTxnTableTest, MultiApplyUpdatesTheTransactionTable) {
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
ASSERT_OK(syncTail.multiApply(
_opCtx.get(),
- {opSingle, opDiffTxnSmaller, opDiffTxnLarger, opSameTxnSooner, opSameTxnLater, opNoTxn}));
+ {opSingle, opDiffTxnSmaller, opDiffTxnLarger, opSameTxnSooner, opSameTxnLater, opNoTxn},
+ boost::none));
// The txnNum and optime of the only write were saved.
auto resultSingleDoc =
@@ -2922,7 +2932,7 @@ TEST_F(SyncTailTxnTableTest, SessionMigrationNoOpEntriesShouldUpdateTxnTable) {
auto writerPool = OplogApplier::makeWriterPool();
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOplog}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {insertOplog}, boost::none));
checkTxnTable(insertSessionInfo, {Timestamp(40, 0), 1}, outerInsertDate);
}
@@ -2945,7 +2955,7 @@ TEST_F(SyncTailTxnTableTest, PreImageNoOpEntriesShouldNotUpdateTxnTable) {
auto writerPool = OplogApplier::makeWriterPool();
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {preImageOplog}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {preImageOplog}, boost::none));
ASSERT_FALSE(docExists(_opCtx.get(),
NamespaceString::kSessionTransactionsTableNamespace,
@@ -2970,7 +2980,7 @@ TEST_F(SyncTailTxnTableTest, NonMigrateNoOpEntriesShouldNotUpdateTxnTable) {
auto writerPool = OplogApplier::makeWriterPool();
SyncTail syncTail(
nullptr, getConsistencyMarkers(), getStorageInterface(), multiSyncApply, writerPool.get());
- ASSERT_OK(syncTail.multiApply(_opCtx.get(), {oplog}));
+ ASSERT_OK(syncTail.multiApply(_opCtx.get(), {oplog}, boost::none));
ASSERT_FALSE(docExists(
_opCtx.get(),
diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp
index cf0a9d49c94..8f376b44d18 100644
--- a/src/mongo/db/repl/transaction_oplog_application.cpp
+++ b/src/mongo/db/repl/transaction_oplog_application.cpp
@@ -100,12 +100,8 @@ Status _applyTransactionFromOplogChain(OperationContext* opCtx,
ReadSourceScope readSourceScope(opCtx);
// Get the corresponding prepare applyOps oplog entry.
- const auto prepareOpTime = entry.getPrevWriteOpTimeInTransaction();
- invariant(prepareOpTime);
- TransactionHistoryIterator iter(prepareOpTime.get());
- invariant(iter.hasNext());
- const auto prepareOplogEntry = iter.nextFatalOnErrors(opCtx);
- ops = readTransactionOperationsFromOplogChain(opCtx, prepareOplogEntry, {});
+ const auto prepareOplogEntry = getPreviousOplogEntry(opCtx, entry);
+ ops = readTransactionOperationsFromOplogChain(opCtx, prepareOplogEntry, {}, boost::none);
}
const auto dbName = entry.getNss().db().toString();
@@ -137,6 +133,21 @@ Status _applyTransactionFromOplogChain(OperationContext* opCtx,
}
} // namespace
+/**
+ * Helper used to get previous oplog entry from the same transaction.
+ */
+const repl::OplogEntry getPreviousOplogEntry(OperationContext* opCtx,
+ const repl::OplogEntry& entry) {
+ const auto prevOpTime = entry.getPrevWriteOpTimeInTransaction();
+ invariant(prevOpTime);
+ TransactionHistoryIterator iter(prevOpTime.get());
+ invariant(iter.hasNext());
+ const auto prevOplogEntry = iter.next(opCtx);
+
+ return prevOplogEntry;
+}
+
+
Status applyCommitTransaction(OperationContext* opCtx,
const OplogEntry& entry,
repl::OplogApplication::Mode mode) {
@@ -150,8 +161,7 @@ Status applyCommitTransaction(OperationContext* opCtx,
auto commitCommand = CommitTransactionOplogObject::parse(ctx, entry.getObject());
invariant(commitCommand.getCommitTimestamp());
- if (mode == repl::OplogApplication::Mode::kRecovering ||
- mode == repl::OplogApplication::Mode::kInitialSync) {
+ if (mode == repl::OplogApplication::Mode::kRecovering) {
return _applyTransactionFromOplogChain(opCtx,
entry,
mode,
@@ -216,7 +226,8 @@ Status applyAbortTransaction(OperationContext* opCtx,
repl::MultiApplier::Operations readTransactionOperationsFromOplogChain(
OperationContext* opCtx,
const OplogEntry& commitOrPrepare,
- const std::vector<OplogEntry*>& cachedOps) {
+ const std::vector<OplogEntry*>& cachedOps,
+ boost::optional<Timestamp> commitOplogEntryTS) {
repl::MultiApplier::Operations ops;
// Get the previous oplog entry.
@@ -245,7 +256,8 @@ repl::MultiApplier::Operations readTransactionOperationsFromOplogChain(
const auto& operationEntry = iter.nextFatalOnErrors(opCtx);
invariant(operationEntry.isPartialTransaction());
auto prevOpsEnd = ops.size();
- repl::ApplyOps::extractOperationsTo(operationEntry, commitOrPrepareObj, &ops);
+ repl::ApplyOps::extractOperationsTo(
+ operationEntry, commitOrPrepareObj, &ops, commitOplogEntryTS);
// Because BSONArrays do not have fast way of determining size without iterating through
// them, and we also have no way of knowing how many oplog entries are in a transaction
// without iterating, reversing each applyOps and then reversing the whole array is
@@ -260,12 +272,14 @@ repl::MultiApplier::Operations readTransactionOperationsFromOplogChain(
for (auto* cachedOp : cachedOps) {
const auto& operationEntry = *cachedOp;
invariant(operationEntry.isPartialTransaction());
- repl::ApplyOps::extractOperationsTo(operationEntry, commitOrPrepareObj, &ops);
+ repl::ApplyOps::extractOperationsTo(
+ operationEntry, commitOrPrepareObj, &ops, commitOplogEntryTS);
}
// Reconstruct the operations from the commit or prepare oplog entry.
if (commitOrPrepare.getCommandType() == OplogEntry::CommandType::kApplyOps) {
- repl::ApplyOps::extractOperationsTo(commitOrPrepare, commitOrPrepareObj, &ops);
+ repl::ApplyOps::extractOperationsTo(
+ commitOrPrepare, commitOrPrepareObj, &ops, commitOplogEntryTS);
}
return ops;
}
@@ -284,7 +298,7 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
// The prepare time of the transaction is set explicitly below.
auto ops = [&] {
ReadSourceScope readSourceScope(opCtx);
- return readTransactionOperationsFromOplogChain(opCtx, entry, {});
+ return readTransactionOperationsFromOplogChain(opCtx, entry, {}, boost::none);
}();
if (oplogApplicationMode == repl::OplogApplication::Mode::kRecovering ||
diff --git a/src/mongo/db/repl/transaction_oplog_application.h b/src/mongo/db/repl/transaction_oplog_application.h
index dc145d2c701..5ad20f48875 100644
--- a/src/mongo/db/repl/transaction_oplog_application.h
+++ b/src/mongo/db/repl/transaction_oplog_application.h
@@ -51,13 +51,20 @@ Status applyAbortTransaction(OperationContext* opCtx,
repl::OplogApplication::Mode mode);
/**
+ * Helper used to get previous oplog entry from the same transaction.
+ */
+const repl::OplogEntry getPreviousOplogEntry(OperationContext* opCtx,
+ const repl::OplogEntry& entry);
+
+/**
* Follow an oplog chain and copy the operations to destination. Operations will be copied in
* forward oplog order (increasing optimes).
*/
repl::MultiApplier::Operations readTransactionOperationsFromOplogChain(
OperationContext* opCtx,
const repl::OplogEntry& entry,
- const std::vector<repl::OplogEntry*>& cachedOps);
+ const std::vector<repl::OplogEntry*>& cachedOps,
+ boost::optional<Timestamp> commitOplogEntryTS);
/**
* Apply `prepareTransaction` oplog entry.
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index 10a7c57f407..6208fe8944d 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -1386,7 +1386,8 @@ public:
storageInterface,
{},
writerPool.get());
- ASSERT_EQUALS(op2.getOpTime(), unittest::assertGet(oplogApplier.multiApply(_opCtx, ops)));
+ ASSERT_EQUALS(op2.getOpTime(),
+ unittest::assertGet(oplogApplier.multiApply(_opCtx, ops, boost::none)));
AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_IX);
assertMultikeyPaths(
@@ -1495,7 +1496,7 @@ public:
storageInterface,
options,
writerPool.get());
- auto lastTime = unittest::assertGet(oplogApplier.multiApply(_opCtx, ops));
+ auto lastTime = unittest::assertGet(oplogApplier.multiApply(_opCtx, ops, boost::none));
ASSERT_EQ(lastTime.getTimestamp(), insertTime2.asTimestamp());
// Wait for the index build to finish before making any assertions.
@@ -2478,7 +2479,7 @@ public:
auto writerPool = repl::OplogApplier::makeWriterPool(1);
repl::SyncTail syncTail(
nullptr, _consistencyMarkers, storageInterface, applyOperationFn, writerPool.get());
- auto lastOpTime = unittest::assertGet(syncTail.multiApply(_opCtx, {insertOp}));
+ auto lastOpTime = unittest::assertGet(syncTail.multiApply(_opCtx, {insertOp}, boost::none));
ASSERT_EQ(insertOp.getOpTime(), lastOpTime);
joinGuard.dismiss();
diff --git a/src/mongo/shell/replsettest.js b/src/mongo/shell/replsettest.js
index 8d41cacaef4..a9d13d29672 100644
--- a/src/mongo/shell/replsettest.js
+++ b/src/mongo/shell/replsettest.js
@@ -2293,6 +2293,10 @@ var ReplSetTest = function(opts) {
}
baseOptions = Object.merge(baseOptions, this.nodeOptions["n" + n]);
options = Object.merge(baseOptions, options);
+ if (options.hasOwnProperty("rsConfig")) {
+ this.nodeOptions["n" + n] =
+ Object.merge(this.nodeOptions["n" + n], {rsConfig: options.rsConfig});
+ }
delete options.rsConfig;
options.restart = options.restart || restart;