summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorA. Jesse Jiryu Davis <jesse@mongodb.com>2019-08-06 23:00:28 -0400
committerA. Jesse Jiryu Davis <jesse@mongodb.com>2019-08-21 11:55:02 -0400
commit609e2697c2619925073d4ac0f57f33e789d05100 (patch)
tree0c94a13d998f40b9cd16f4da272e98744d6bb98b
parent08f836390d61726235e583f46012f43995695c85 (diff)
downloadmongo-609e2697c2619925073d4ac0f57f33e789d05100.tar.gz
SERVER-42022 Remove missing-document fetcher
-rw-r--r--jstests/noPassthrough/apply_ops_mode.js70
-rw-r--r--jstests/replsets/initial_sync_replSetGetStatus.js1
-rw-r--r--jstests/replsets/initial_sync_update_missing_doc.js (renamed from jstests/replsets/initial_sync_update_missing_doc1.js)29
-rw-r--r--jstests/replsets/initial_sync_update_missing_doc2.js62
-rw-r--r--jstests/replsets/initial_sync_update_missing_doc3.js73
-rw-r--r--jstests/replsets/initial_sync_update_missing_doc_upsert.js32
-rw-r--r--jstests/replsets/initial_sync_update_missing_doc_with_prepare.js28
-rw-r--r--jstests/replsets/initial_sync_update_reinsert_missing_doc_with_prepare.js85
-rw-r--r--jstests/replsets/libs/initial_sync_test.js2
-rw-r--r--jstests/replsets/libs/initial_sync_update_missing_doc.js61
-rw-r--r--src/mongo/db/repl/SConscript18
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp2
-rw-r--r--src/mongo/db/repl/initial_sync_state.h1
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp85
-rw-r--r--src/mongo/db/repl/initial_syncer.h54
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp124
-rw-r--r--src/mongo/db/repl/oplog_applier.cpp2
-rw-r--r--src/mongo/db/repl/oplog_applier.h21
-rw-r--r--src/mongo/db/repl/oplogreader.h115
-rw-r--r--src/mongo/db/repl/replication_auth.cpp (renamed from src/mongo/db/repl/oplogreader.cpp)53
-rw-r--r--src/mongo/db/repl/replication_auth.h48
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp10
-rw-r--r--src/mongo/db/repl/replication_info.cpp2
-rw-r--r--src/mongo/db/repl/replication_recovery.cpp1
-rw-r--r--src/mongo/db/repl/rollback_source_impl.cpp3
-rw-r--r--src/mongo/db/repl/sync_tail.cpp160
-rw-r--r--src/mongo/db/repl/sync_tail.h17
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp89
-rw-r--r--src/mongo/db/repl/sync_tail_test_fixture.cpp1
-rw-r--r--src/mongo/dbtests/repltests.cpp91
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp2
31 files changed, 178 insertions, 1164 deletions
diff --git a/jstests/noPassthrough/apply_ops_mode.js b/jstests/noPassthrough/apply_ops_mode.js
index 2376ed1c30f..b03edf2c855 100644
--- a/jstests/noPassthrough/apply_ops_mode.js
+++ b/jstests/noPassthrough/apply_ops_mode.js
@@ -12,48 +12,54 @@ var standalone = MongoRunner.runMongod();
var db = standalone.getDB("test");
var coll = db.getCollection("apply_ops_mode1");
-coll.drop();
-assert.commandWorked(coll.insert({_id: 1}));
// ------------ Testing normal updates ---------------
var id = ObjectId();
-var updateOp = {op: 'u', ns: coll.getFullName(), o: {_id: id, x: 1}, o2: {_id: id}};
-assert.commandFailed(db.adminCommand({applyOps: [updateOp], alwaysUpsert: false}));
-assert.eq(coll.count({x: 1}), 0);
-
-// Test that 'InitialSync' does not override 'alwaysUpsert: false'.
-assert.commandFailed(db.adminCommand(
- {applyOps: [updateOp], alwaysUpsert: false, oplogApplicationMode: "InitialSync"}));
-assert.eq(coll.count({x: 1}), 0);
-
-// Test parsing failure.
-assert.commandFailedWithCode(
- db.adminCommand({applyOps: [updateOp], oplogApplicationMode: "BadMode"}),
- ErrorCodes.FailedToParse);
-assert.commandFailedWithCode(db.adminCommand({applyOps: [updateOp], oplogApplicationMode: 5}),
- ErrorCodes.TypeMismatch);
-
-// Test default succeeds.
-assert.commandWorked(db.adminCommand({applyOps: [updateOp]}));
-assert.eq(coll.count({x: 1}), 1);
-
-// Use new collection to make logs cleaner.
-coll = db.getCollection("apply_ops_mode2");
-coll.drop();
-updateOp.ns = coll.getFullName();
-assert.commandWorked(coll.insert({_id: 1}));
-
-// Test default succeeds in 'InitialSync' mode.
-assert.commandWorked(db.adminCommand({applyOps: [updateOp], oplogApplicationMode: "InitialSync"}));
-assert.eq(coll.count({x: 1}), 1);
+for (let updateOp of [
+ // An update with a modifier.
+ {op: 'u', ns: coll.getFullName(), o: {$set: {x: 1}}, o2: {_id: id}},
+ // A full-document replace.
+ {op: 'u', ns: coll.getFullName(), o: {_id: id, x: 1}, o2: {_id: id}},
+]) {
+ coll.drop();
+ assert.writeOK(coll.insert({_id: 1}));
+
+ jsTestLog(`Test applyOps with the following op:\n${tojson(updateOp)}`);
+ assert.commandFailed(db.adminCommand({applyOps: [updateOp], alwaysUpsert: false}));
+ assert.eq(coll.count({x: 1}), 0);
+
+ // Test that 'InitialSync' does not override 'alwaysUpsert: false'.
+ assert.commandFailed(db.adminCommand(
+ {applyOps: [updateOp], alwaysUpsert: false, oplogApplicationMode: "InitialSync"}));
+ assert.eq(coll.count({x: 1}), 0);
+
+ // Test parsing failure.
+ assert.commandFailedWithCode(
+ db.adminCommand({applyOps: [updateOp], oplogApplicationMode: "BadMode"}),
+ ErrorCodes.FailedToParse);
+ assert.commandFailedWithCode(db.adminCommand({applyOps: [updateOp], oplogApplicationMode: 5}),
+ ErrorCodes.TypeMismatch);
+
+ // Test default succeeds.
+ assert.commandWorked(db.adminCommand({applyOps: [updateOp]}));
+ assert.eq(coll.count({x: 1}), 1);
+
+ coll.drop();
+ assert.commandWorked(coll.insert({_id: 1}));
+
+ // Test default succeeds in 'InitialSync' mode.
+ assert.commandWorked(
+ db.adminCommand({applyOps: [updateOp], oplogApplicationMode: "InitialSync"}));
+ assert.eq(coll.count({x: 1}), 1);
+}
// ------------ Testing fCV updates ---------------
var adminDB = db.getSiblingDB("admin");
const systemVersionColl = adminDB.getCollection("system.version");
-updateOp = {
+var updateOp = {
op: 'u',
ns: systemVersionColl.getFullName(),
o: {_id: "featureCompatibilityVersion", version: lastStableFCV},
diff --git a/jstests/replsets/initial_sync_replSetGetStatus.js b/jstests/replsets/initial_sync_replSetGetStatus.js
index 7d325997328..96397750f44 100644
--- a/jstests/replsets/initial_sync_replSetGetStatus.js
+++ b/jstests/replsets/initial_sync_replSetGetStatus.js
@@ -61,7 +61,6 @@ checkLog.contains(secondary, 'initial sync - initialSyncHangBeforeFinish fail po
res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
assert(res.initialSyncStatus,
() => "Response should have an 'initialSyncStatus' field: " + tojson(res));
-assert.eq(res.initialSyncStatus.fetchedMissingDocs, 0);
assert.eq(res.initialSyncStatus.appliedOps, 3);
assert.eq(res.initialSyncStatus.failedInitialSyncAttempts, 0);
assert.eq(res.initialSyncStatus.maxFailedInitialSyncAttempts, 10);
diff --git a/jstests/replsets/initial_sync_update_missing_doc1.js b/jstests/replsets/initial_sync_update_missing_doc.js
index 2c1fdc1eea4..532e82ab3bd 100644
--- a/jstests/replsets/initial_sync_update_missing_doc1.js
+++ b/jstests/replsets/initial_sync_update_missing_doc.js
@@ -4,29 +4,24 @@
* 2) copies all non-local databases from the source; and
* 3) fetches and applies operations from the source after op_start1.
*
- * This test updates and deletes a document on the source between phases 1 and 2. The
- * secondary will initially fail to apply the update operation in phase 3 and subsequently have
- * to attempt to check the source for a new copy of the document. The absence of the document on
- * the source indicates that the source is free to ignore the failed update operation.
- *
+ * This test updates and deletes a document on the source between phases 1 and 2. The secondary will
+ * fail to apply the update operation in phase 3 but initial sync completes nevertheless. The
+ * absence of the document on the source indicates that the source is free to ignore the failed
+ * update operation.
*/
(function() {
load("jstests/replsets/libs/initial_sync_update_missing_doc.js");
load("jstests/libs/check_log.js");
-const name = 'initial_sync_update_missing_doc1';
-const replSet = new ReplSetTest({
- name: name,
- nodes: 1,
-});
+const replSet = new ReplSetTest({nodes: 1});
replSet.startSet();
replSet.initiate();
const primary = replSet.getPrimary();
const dbName = 'test';
-
-var coll = primary.getDB(dbName).getCollection(name);
+const collectionName = jsTestName();
+const coll = primary.getDB(dbName).getCollection(collectionName);
assert.commandWorked(coll.insert({_id: 0, x: 1}));
// Add a secondary node with priority: 0 and votes: 0 so that we prevent elections while
@@ -38,16 +33,8 @@ const secondary = reInitiateSetWithSecondary(replSet, secondaryConfig);
// Update and remove document on primary.
updateRemove(coll, {_id: 0});
-
turnOffHangBeforeCopyingDatabasesFailPoint(secondary);
-
-var res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
-assert.eq(res.initialSyncStatus.fetchedMissingDocs, 0);
-var firstOplogEnd = res.initialSyncStatus.initialSyncOplogEnd;
-
-turnOffHangBeforeGettingMissingDocFailPoint(primary, secondary, name, 0 /* numFetched */);
-
-finishAndValidate(replSet, name, firstOplogEnd, 0 /* numFetched */, 0 /* numDocuments */);
+finishAndValidate(replSet, collectionName, 0 /* numDocuments */);
replSet.stopSet();
})();
diff --git a/jstests/replsets/initial_sync_update_missing_doc2.js b/jstests/replsets/initial_sync_update_missing_doc2.js
deleted file mode 100644
index f2c51e85c9e..00000000000
--- a/jstests/replsets/initial_sync_update_missing_doc2.js
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Initial sync runs in several phases - the first 3 are as follows:
- * 1) fetches the last oplog entry (op_start1) on the source;
- * 2) copies all non-local databases from the source; and
- * 3) fetches and applies operations from the source after op_start1.
- *
- * This test updates and deletes a document on the source between phases 1 and 2. The
- * secondary will initially fail to apply the update operation in phase 3 and subsequently have
- * to attempt to check the source for a new copy of the document. Before the secondary checks
- * the source, we insert a new copy of the document on the source so that the secondary can fetch
- * it.
- *
- */
-
-(function() {
-load("jstests/replsets/libs/initial_sync_update_missing_doc.js");
-load("jstests/libs/check_log.js");
-
-var name = 'initial_sync_update_missing_doc2';
-var replSet = new ReplSetTest({
- name: name,
- nodes: 1,
-});
-
-replSet.startSet();
-replSet.initiate();
-
-const primary = replSet.getPrimary();
-const dbName = 'test';
-
-var coll = primary.getDB(dbName).getCollection(name);
-assert.commandWorked(coll.insert({_id: 0, x: 1}));
-
-// Add a secondary node with priority: 0 and votes: 0 so that we prevent elections while
-// it is syncing from the primary.
-const secondaryConfig = {
- rsConfig: {votes: 0, priority: 0}
-};
-const secondary = reInitiateSetWithSecondary(replSet, secondaryConfig);
-
-// Update and remove document on primary.
-updateRemove(coll, {_id: 0});
-
-turnOffHangBeforeCopyingDatabasesFailPoint(secondary);
-
-// Re-insert deleted document on the sync source. The secondary should be able to fetch and
-// insert this document after failing to apply the udpate.
-assert.commandWorked(coll.insert({_id: 0, x: 3}));
-
-var res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
-assert.eq(res.initialSyncStatus.fetchedMissingDocs, 0);
-var firstOplogEnd = res.initialSyncStatus.initialSyncOplogEnd;
-
-// Temporarily increase log levels so that we can see the 'Inserted missing document' log line.
-secondary.getDB('test').setLogLevel(1, 'replication');
-turnOffHangBeforeGettingMissingDocFailPoint(primary, secondary, name, 1 /* numFetched */);
-secondary.getDB('test').setLogLevel(0, 'replication');
-
-finishAndValidate(replSet, name, firstOplogEnd, 1 /* numFetched */, 1 /* numDocuments */);
-
-replSet.stopSet();
-})();
diff --git a/jstests/replsets/initial_sync_update_missing_doc3.js b/jstests/replsets/initial_sync_update_missing_doc3.js
deleted file mode 100644
index d03e672ebec..00000000000
--- a/jstests/replsets/initial_sync_update_missing_doc3.js
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Initial sync runs in several phases - the first 3 are as follows:
- * 1) fetches the last oplog entry (op_start1) on the source;
- * 2) copies all non-local databases from the source; and
- * 3) fetches and applies operations from the source after op_start1.
- *
- * This test updates and deletes a document on the source between phases 1 and 2. The
- * secondary will initially fail to apply the update operation in phase 3 and subsequently have
- * to attempt to check the source for a new copy of the document. Before the secondary checks the
- * source, we insert a new copy of the document into this collection on the source and mark the
- * collection on which the document to be fetched resides as drop pending, thus effectively
- * renaming the collection but preserving the UUID. This ensures the secondary fetches the
- * document by UUID rather than namespace.
- */
-
-(function() {
-load("jstests/libs/check_log.js");
-load("jstests/replsets/libs/initial_sync_update_missing_doc.js");
-load("jstests/replsets/libs/two_phase_drops.js"); // For TwoPhaseDropCollectionTest.
-
-var name = 'initial_sync_update_missing_doc3';
-var replSet = new ReplSetTest({
- name: name,
- nodes: 1,
-});
-
-replSet.startSet();
-replSet.initiate();
-const primary = replSet.getPrimary();
-const dbName = 'test';
-
-// Check for 'system.drop' two phase drop support.
-if (!TwoPhaseDropCollectionTest.supportsDropPendingNamespaces(replSet)) {
- jsTestLog('Drop pending namespaces not supported by storage engine. Skipping test.');
- replSet.stopSet();
- return;
-}
-
-var coll = primary.getDB(dbName).getCollection(name);
-assert.commandWorked(coll.insert({_id: 0, x: 1}));
-
-// Add a secondary node with priority: 0 so that we prevent elections while it is syncing
-// from the primary.
-// We cannot give the secondary votes: 0 because then it will not be able to acknowledge
-// majority writes. That means the sync source can immediately drop it's collection
-// because it alone determines the majority commit point.
-const secondaryConfig = {
- rsConfig: {priority: 0}
-};
-const secondary = reInitiateSetWithSecondary(replSet, secondaryConfig);
-
-// Update and remove document on primary.
-updateRemove(coll, {_id: 0});
-
-turnOffHangBeforeCopyingDatabasesFailPoint(secondary);
-
-// Re-insert deleted document.
-assert.commandWorked(coll.insert({_id: 0, x: 3}));
-// Mark the collection as drop pending so it gets renamed, but retains the UUID.
-assert.commandWorked(primary.getDB('test').runCommand({"drop": name}));
-
-var res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
-assert.eq(res.initialSyncStatus.fetchedMissingDocs, 0);
-
-secondary.getDB('test').setLogLevel(1, 'replication');
-turnOffHangBeforeGettingMissingDocFailPoint(primary, secondary, name, 1 /* numFetched */);
-secondary.getDB('test').setLogLevel(0, 'replication');
-
-replSet.awaitReplication();
-replSet.awaitSecondaryNodes();
-
-replSet.stopSet();
-})();
diff --git a/jstests/replsets/initial_sync_update_missing_doc_upsert.js b/jstests/replsets/initial_sync_update_missing_doc_upsert.js
index 636244965dd..9fe203fef67 100644
--- a/jstests/replsets/initial_sync_update_missing_doc_upsert.js
+++ b/jstests/replsets/initial_sync_update_missing_doc_upsert.js
@@ -4,29 +4,26 @@
* 2) copies all non-local databases from the source; and
* 3) fetches and applies operations from the source after op_start1.
*
- * This test upserts documents with both the "update" and "applyOps"
- * commands on the source between phases 2 and 3; these should be treated
- * as inserts on the syncing node.
+ * This test upserts documents with both the "update" and "applyOps" commands on the source between
+ * phases 2 and 3; these operations should be treated as inserts on the syncing node and applied
+ * successfully.
*/
(function() {
load("jstests/replsets/libs/initial_sync_update_missing_doc.js");
load("jstests/libs/check_log.js");
-var name = 'initial_sync_update_missing_doc_upsert';
-var replSet = new ReplSetTest({
- name: name,
- nodes: 1,
-});
+const replSet = new ReplSetTest({nodes: 1});
replSet.startSet();
replSet.initiate();
const primary = replSet.getPrimary();
const dbName = 'test';
+const collectionName = jsTestName();
-assert.commandWorked(primary.getDB(dbName).createCollection(name));
-const coll = primary.getDB(dbName).getCollection(name);
+assert.commandWorked(primary.getDB(dbName).createCollection(collectionName));
+const coll = primary.getDB(dbName).getCollection(collectionName);
// Add a secondary node with priority: 0 and votes: 0 so that we prevent elections while
// it is syncing from the primary.
@@ -85,8 +82,7 @@ for (let alwaysUpsert of [null, true]) {
/* The interesting scenario for alwaysUpsert: false is if the document is deleted on the primary
* after updating. When the secondary attempts to apply the oplog entry during initial sync,
- * it will fail to update, and fail to fetch the missing document. Ensure that initial sync
- * proceeds anyway.
+ * it will fail to update. Ensure that initial sync proceeds anyway.
*/
for (let allowAtomic of [null, true, false]) {
coll.insertOne({_id: documentIdCounter});
@@ -99,19 +95,9 @@ for (let allowAtomic of [null, true, false]) {
jsTestLog("Allow initial sync to finish fetching and replaying oplog");
assert.commandWorked(secondary.getDB('admin').runCommand(
- {configureFailPoint: 'initialSyncHangBeforeGettingMissingDocument', mode: 'off'}));
-assert.commandWorked(secondary.getDB('admin').runCommand(
{configureFailPoint: 'initialSyncHangAfterDataCloning', mode: 'off'}));
-assert.commandWorked(secondary.getDB('admin').runCommand(
- {configureFailPoint: 'initialSyncHangBeforeGettingMissingDocument', mode: 'off'}));
-
-checkLog.contains(secondary, 'initial sync done');
-
-const replStatus = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
-const firstOplogEnd = replStatus.initialSyncStatus.initialSyncOplogEnd;
-// *No* missing documents were fetched.
-finishAndValidate(replSet, name, firstOplogEnd, 0 /* numFetched */, numDocuments);
+finishAndValidate(replSet, collectionName, numDocuments);
replSet.stopSet();
})();
diff --git a/jstests/replsets/initial_sync_update_missing_doc_with_prepare.js b/jstests/replsets/initial_sync_update_missing_doc_with_prepare.js
index d4143b50148..2f86c8d5454 100644
--- a/jstests/replsets/initial_sync_update_missing_doc_with_prepare.js
+++ b/jstests/replsets/initial_sync_update_missing_doc_with_prepare.js
@@ -5,10 +5,8 @@
* 3) fetches and applies operations from the source after op_start1.
*
* This test updates and deletes a document on the source between phases 1 and 2 in a prepared
- * transaction. The secondary will initially fail to apply the update operation in phase 3 and
- * subsequently have to attempt to check the source for a new copy of the document. The absence of
- * the document on the source indicates that the source is free to ignore the failed update
- * operation.
+ * transaction. The secondary will fail to apply the update operation in phase 3 but initial sync
+ * completes nevertheless.
*
* @tags: [uses_transactions, uses_prepare_transaction]
*/
@@ -19,18 +17,14 @@ load("jstests/replsets/libs/initial_sync_update_missing_doc.js");
load("jstests/libs/check_log.js");
function doTest(doTransactionWork, numDocuments) {
- const name = 'initial_sync_update_missing_doc_with_prepare';
- const replSet = new ReplSetTest({
- name: name,
- nodes: 1,
- });
+ const replSet = new ReplSetTest({nodes: 1});
replSet.startSet();
replSet.initiate();
const primary = replSet.getPrimary();
const dbName = 'test';
-
- var coll = primary.getDB(dbName).getCollection(name);
+ const collectionName = jsTestName();
+ let coll = primary.getDB(dbName).getCollection(collectionName);
assert.commandWorked(coll.insert({_id: 0, x: 1}));
assert.commandWorked(coll.insert({_id: 1, x: 1}));
@@ -41,7 +35,7 @@ function doTest(doTransactionWork, numDocuments) {
const session = primary.startSession();
const sessionDB = session.getDatabase(dbName);
- const sessionColl = sessionDB.getCollection(name);
+ const sessionColl = sessionDB.getCollection(collectionName);
session.startTransaction();
doTransactionWork(sessionColl, {_id: 0});
@@ -57,20 +51,14 @@ function doTest(doTransactionWork, numDocuments) {
turnOffHangBeforeCopyingDatabasesFailPoint(secondary);
- var res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
- assert.eq(res.initialSyncStatus.fetchedMissingDocs, 0);
- var firstOplogEnd = res.initialSyncStatus.initialSyncOplogEnd;
-
- turnOffHangBeforeGettingMissingDocFailPoint(primary, secondary, name, 0 /* numInserted */);
-
// Since we aborted the second transaction, we expect this collection to still exist after
// initial sync.
- finishAndValidate(replSet, name, firstOplogEnd, 0 /* numInserted */, numDocuments);
+ finishAndValidate(replSet, collectionName, numDocuments);
// Make sure the secondary has the correct documents after syncing from the primary. The
// second document was deleted in the prepared transaction that was aborted. Therefore, it
// should have been properly replication.
- coll = secondary.getDB(dbName).getCollection(name);
+ coll = secondary.getDB(dbName).getCollection(collectionName);
assert.docEq(null, coll.findOne({_id: 0}), 'document on secondary matches primary');
assert.docEq({_id: 1, x: 1}, coll.findOne({_id: 1}), 'document on secondary matches primary');
diff --git a/jstests/replsets/initial_sync_update_reinsert_missing_doc_with_prepare.js b/jstests/replsets/initial_sync_update_reinsert_missing_doc_with_prepare.js
deleted file mode 100644
index 4bb87729af4..00000000000
--- a/jstests/replsets/initial_sync_update_reinsert_missing_doc_with_prepare.js
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Initial sync runs in several phases - the first 3 are as follows:
- * 1) fetches the last oplog entry (op_start1) on the source;
- * 2) copies all non-local databases from the source; and
- * 3) fetches and applies operations from the source after op_start1.
- *
- * This test updates and deletes a document on the source between phases 1 and 2 in a prepared
- * transaction. The secondary will initially fail to apply the update operation in phase 3 and
- * subsequently have to attempt to check the source for a new copy of the document. Before the
- * secondary checks the source, we insert a new copy of the document on the source so that the
- * secondary can fetch it.
- *
- * @tags: [uses_transactions, uses_prepare_transaction]
- */
-
-(function() {
-load("jstests/core/txns/libs/prepare_helpers.js");
-load("jstests/replsets/libs/initial_sync_update_missing_doc.js");
-load("jstests/libs/check_log.js");
-
-function doTest(doTransactionWork, numDocuments) {
- const name = 'initial_sync_update_missing_doc_with_prepare';
- const replSet = new ReplSetTest({
- name: name,
- nodes: 1,
- });
-
- replSet.startSet();
- replSet.initiate();
- const primary = replSet.getPrimary();
- const dbName = 'test';
-
- const coll = primary.getDB(dbName).getCollection(name);
- assert.commandWorked(coll.insert({_id: 0, x: 1}));
-
- // Add a secondary node with priority: 0 and votes: 0 so that we prevent elections while
- // it is syncing from the primary.
- const secondaryConfig = {rsConfig: {votes: 0, priority: 0}};
- const secondary = reInitiateSetWithSecondary(replSet, secondaryConfig);
-
- const session = primary.startSession();
- const sessionDB = session.getDatabase(dbName);
- const sessionColl = sessionDB.getCollection(name);
-
- session.startTransaction();
- doTransactionWork(sessionColl, {_id: 0});
- const prepareTimestamp = PrepareHelpers.prepareTransaction(session);
- assert.commandWorked(PrepareHelpers.commitTransaction(session, prepareTimestamp));
-
- turnOffHangBeforeCopyingDatabasesFailPoint(secondary);
-
- // Re-insert deleted document on the sync source. The secondary should be able to fetch and
- // insert this document after failing to apply the udpate.
- assert.commandWorked(coll.insert({_id: 0, x: 3}));
-
- const res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
- assert.eq(res.initialSyncStatus.fetchedMissingDocs, 0);
- const firstOplogEnd = res.initialSyncStatus.initialSyncOplogEnd;
-
- // Temporarily increase log levels so that we can see the 'Inserted missing document' log
- // line.
- secondary.getDB('test').setLogLevel(1, 'replication');
- turnOffHangBeforeGettingMissingDocFailPoint(primary, secondary, name, 1 /* numFetched */);
- secondary.getDB('test').setLogLevel(0, 'replication');
-
- finishAndValidate(replSet, name, firstOplogEnd, 1 /* numFetched */, numDocuments);
- assert.docEq({_id: 0, x: 3}, coll.findOne({_id: 0}), 'document on secondary matches primary');
-
- replSet.stopSet();
-}
-
-jsTestLog("Testing with prepared transaction");
-// Passing in a function to update and remove document on primary in a prepared transaction
-// between phrase 1 and 2. Once the secondary receives the commit for the transaction, the
-// secondary should apply each operation separately (one update, and one delete) during initial
-// sync.
-doTest(updateRemove, 1 /* numDocuments after initial sync */);
-
-jsTestLog("Testing with large prepared transaction");
-// Passing in a function to insert, update and remove large documents on primary in a large
-// prepared transaction. Once the secondary receives the commit for the transaction, the
-// secondary should apply each operation separately (one insert, one update, and one delete)
-// during initial sync.
-doTest(insertUpdateRemoveLarge, 2 /* numDocuments after initial sync */);
-})();
diff --git a/jstests/replsets/libs/initial_sync_test.js b/jstests/replsets/libs/initial_sync_test.js
index 7ec45729173..5a5b6a7bc56 100644
--- a/jstests/replsets/libs/initial_sync_test.js
+++ b/jstests/replsets/libs/initial_sync_test.js
@@ -280,4 +280,4 @@ function InitialSyncTest(name = "InitialSyncTest", replSet, timeout) {
checkDataConsistency();
return replSet.stopSet();
};
-} \ No newline at end of file
+}
diff --git a/jstests/replsets/libs/initial_sync_update_missing_doc.js b/jstests/replsets/libs/initial_sync_update_missing_doc.js
index 91556719bad..666d463ad23 100644
--- a/jstests/replsets/libs/initial_sync_update_missing_doc.js
+++ b/jstests/replsets/libs/initial_sync_update_missing_doc.js
@@ -10,9 +10,8 @@
*/
// reInitiate the replica set with a secondary node, which will go through initial sync. This
-// function will hand the secondary in initial sync. turnOffHangBeforeCopyingDatabasesFailPoint
-// must be called after reInitiateSetWithSecondary, followed by
-// turnOffHangBeforeGettingMissingDocFailPoint.
+// function will hang the secondary in initial sync. turnOffHangBeforeCopyingDatabasesFailPoint
+// must be called after reInitiateSetWithSecondary.
var reInitiateSetWithSecondary = function(replSet, secondaryConfig) {
const secondary = replSet.add(secondaryConfig);
secondary.setSlaveOk();
@@ -21,8 +20,6 @@ var reInitiateSetWithSecondary = function(replSet, secondaryConfig) {
// copying databases.
assert.commandWorked(secondary.getDB('admin').runCommand(
{configureFailPoint: 'initialSyncHangBeforeCopyingDatabases', mode: 'alwaysOn'}));
- assert.commandWorked(secondary.getDB('admin').runCommand(
- {configureFailPoint: 'initialSyncHangBeforeGettingMissingDocument', mode: 'alwaysOn'}));
// Skip clearing initial sync progress after a successful initial sync attempt so that we
// can check initialSyncStatus fields after initial sync is complete.
@@ -44,43 +41,9 @@ var reInitiateSetWithSecondary = function(replSet, secondaryConfig) {
var turnOffHangBeforeCopyingDatabasesFailPoint = function(secondary) {
assert.commandWorked(secondary.getDB('admin').runCommand(
{configureFailPoint: 'initialSyncHangBeforeCopyingDatabases', mode: 'off'}));
-
- // The following checks assume that we have updated and deleted a document on the sync source
- // that the secondary will try to update in phase 3.
- checkLog.contains(secondary, 'update of non-mod failed');
- checkLog.contains(secondary, 'Fetching missing document');
- checkLog.contains(
- secondary, 'initial sync - initialSyncHangBeforeGettingMissingDocument fail point enabled');
-};
-
-// Must be called after turnOffHangBeforeCopyingDatabasesFailPoint. Turns off the
-// initialSyncHangBeforeGettingMissingDocument fail point so that the secondary can check if the
-// sync source has the missing document.
-var turnOffHangBeforeGettingMissingDocFailPoint = function(primary, secondary, name, numFetched) {
- if (numFetched === 0) {
- // If we did not re-insert the missing document, insert an arbitrary document to move
- // forward minValid even though the document was not found.
- assert.commandWorked(
- primary.getDB('test').getCollection(name + 'b').insert({_id: 1, y: 1}));
- }
-
- assert.commandWorked(secondary.getDB('admin').runCommand(
- {configureFailPoint: 'initialSyncHangBeforeGettingMissingDocument', mode: 'off'}));
-
- // If we've re-inserted the missing document between secondaryHangsBeforeGettingMissingDoc and
- // this function, the secondary will insert the missing document after it fails the update.
- // Otherwise, it will fail to fetch anything from the sync source because the document was
- // deleted.
- if (numFetched > 0) {
- checkLog.contains(secondary, 'Inserted missing document');
- } else {
- checkLog.contains(
- secondary, 'Missing document not found on source; presumably deleted later in oplog.');
- }
- checkLog.contains(secondary, 'initial sync done');
};
-var finishAndValidate = function(replSet, name, firstOplogEnd, numFetched, numDocuments) {
+var finishAndValidate = function(replSet, name, numDocuments) {
replSet.awaitReplication();
replSet.awaitSecondaryNodes();
const dbName = 'test';
@@ -95,24 +58,6 @@ secondary collection: ${tojson(secondaryCollection.find().toArray())}`);
throw new Error('Did not sync collection');
}
- const res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
-
- // If we haven't re-inserted any documents after deleting them, the fetch count is 0 because we
- // are unable to get the document from the sync source.
- assert.eq(res.initialSyncStatus.fetchedMissingDocs, numFetched);
-
- const finalOplogEnd = res.initialSyncStatus.initialSyncOplogEnd;
-
- if (numFetched > 0) {
- assert.neq(firstOplogEnd,
- finalOplogEnd,
- "minValid was not moved forward when missing document was fetched");
- } else {
- assert.eq(firstOplogEnd,
- finalOplogEnd,
- "minValid was moved forward when missing document was not fetched");
- }
-
assert.eq(0,
secondary.getDB('local')['temp_oplog_buffer'].find().itcount(),
"Oplog buffer was not dropped after initial sync");
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 5074ce190ec..ebd878cc05c 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -61,15 +61,13 @@ env.Library(
)
env.Library(
- target='oplogreader',
+ target='replication_auth',
source=[
- 'oplogreader.cpp',
+ 'replication_auth.cpp',
],
LIBDEPS_PRIVATE=[
- '$BUILD_DIR/mongo/base',
- '$BUILD_DIR/mongo/client/clientdriver_network',
'$BUILD_DIR/mongo/db/auth/authorization_manager_global',
- '$BUILD_DIR/mongo/util/net/network',
+ '$BUILD_DIR/mongo/client/authentication',
],
)
@@ -92,7 +90,6 @@ env.Library(
LIBDEPS=[
'oplog',
'oplog_interface_remote',
- 'oplogreader',
'repl_coordinator_interface',
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/background',
@@ -104,6 +101,9 @@ env.Library(
'$BUILD_DIR/mongo/db/query_exec',
'$BUILD_DIR/mongo/util/fail_point',
],
+ LIBDEPS_PRIVATE=[
+ 'replication_auth',
+ ]
)
env.Library(
@@ -528,13 +528,13 @@ env.Library(
'oplog',
'oplog_application_interface',
'oplog_entry',
- 'oplogreader',
'repl_coordinator_interface',
'repl_settings',
'storage_interface',
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/commands/mongod_fsync',
+ 'replication_auth',
],
)
@@ -927,7 +927,6 @@ env.Library(
],
LIBDEPS=[
'task_runner',
- 'oplogreader',
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/client/fetcher',
'$BUILD_DIR/mongo/client/remote_command_retry_scheduler',
@@ -940,6 +939,7 @@ env.Library(
],
LIBDEPS_PRIVATE=[
'repl_server_parameters',
+ 'replication_auth',
],
)
@@ -1118,7 +1118,6 @@ env.Library(
'$BUILD_DIR/mongo/db/query_exec',
"$BUILD_DIR/mongo/util/fail_point",
'oplog',
- 'oplogreader',
'repl_coordinator_interface',
'repl_settings',
'replica_set_messages',
@@ -1127,6 +1126,7 @@ env.Library(
'$BUILD_DIR/mongo/db/commands/server_status',
'$BUILD_DIR/mongo/db/stats/counters',
'$BUILD_DIR/mongo/transport/message_compressor',
+ 'replication_auth',
'split_horizon',
],
)
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index a163be9a084..9bdf4acb8ff 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -42,8 +42,8 @@
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/query/cursor_response.h"
-#include "mongo/db/repl/oplogreader.h"
#include "mongo/db/repl/repl_server_parameters_gen.h"
+#include "mongo/db/repl/replication_auth.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/rpc/get_status_from_command_result.h"
diff --git a/src/mongo/db/repl/initial_sync_state.h b/src/mongo/db/repl/initial_sync_state.h
index 09673458a67..d5957bf2c42 100644
--- a/src/mongo/db/repl/initial_sync_state.h
+++ b/src/mongo/db/repl/initial_sync_state.h
@@ -59,7 +59,6 @@ struct InitialSyncState {
// oplog entry.
Timestamp stopTimestamp; // Referred to as minvalid, or the place we can transition states.
Timer timer; // Timer for timing how long each initial sync attempt takes.
- size_t fetchedMissingDocs = 0;
size_t appliedOps = 0;
};
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index d4696f7f92a..a0c250ff58c 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -88,10 +88,6 @@ MONGO_FAIL_POINT_DEFINE(initialSyncHangBeforeCopyingDatabases);
// Failpoint which causes the initial sync function to hang before finishing.
MONGO_FAIL_POINT_DEFINE(initialSyncHangBeforeFinish);
-// Failpoint which causes the initial sync function to hang before calling shouldRetry on a failed
-// operation.
-MONGO_FAIL_POINT_DEFINE(initialSyncHangBeforeGettingMissingDocument);
-
// Failpoint which causes the initial sync function to hang before creating the oplog.
MONGO_FAIL_POINT_DEFINE(initialSyncHangBeforeCreatingOplog);
@@ -175,24 +171,6 @@ StatusWith<OpTimeAndWallTime> parseOpTimeAndWallTime(const QueryResponseStatus&
return result;
}
-/**
- * OplogApplier observer that updates 'fetchCount' when applying operations for each writer thread.
- */
-class InitialSyncApplyObserver : public OplogApplier::Observer {
-public:
- explicit InitialSyncApplyObserver(AtomicWord<unsigned>* fetchCount) : _fetchCount(fetchCount) {}
-
- // OplogApplier::Observer functions
- void onBatchBegin(const OplogApplier::Operations&) final {}
- void onBatchEnd(const StatusWith<OpTime>&, const OplogApplier::Operations&) final {}
- void onMissingDocumentsFetchedAndInserted(const std::vector<FetchInfo>& docs) final {
- _fetchCount->fetchAndAdd(docs.size());
- }
-
-private:
- AtomicWord<unsigned>* const _fetchCount;
-};
-
} // namespace
InitialSyncer::InitialSyncer(
@@ -209,8 +187,7 @@ InitialSyncer::InitialSyncer(
_writerPool(writerPool),
_storage(storage),
_replicationProcess(replicationProcess),
- _onCompletion(onCompletion),
- _observer(std::make_unique<InitialSyncApplyObserver>(&_fetchCount)) {
+ _onCompletion(onCompletion) {
uassert(ErrorCodes::BadValue, "task executor cannot be null", _exec);
uassert(ErrorCodes::BadValue, "invalid storage interface", _storage);
uassert(ErrorCodes::BadValue, "invalid replication process", _replicationProcess);
@@ -370,7 +347,6 @@ void InitialSyncer::_appendInitialSyncProgressMinimal_inlock(BSONObjBuilder* bob
if (!_initialSyncState) {
return;
}
- bob->appendNumber("fetchedMissingDocs", _initialSyncState->fetchedMissingDocs);
bob->appendNumber("appliedOps", _initialSyncState->appliedOps);
if (!_initialSyncState->beginApplyingTimestamp.isNull()) {
bob->append("initialSyncOplogStart", _initialSyncState->beginApplyingTimestamp);
@@ -908,10 +884,13 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>
auto consistencyMarkers = _replicationProcess->getConsistencyMarkers();
OplogApplier::Options options(OplogApplication::Mode::kInitialSync);
options.allowNamespaceNotFoundErrorsOnCrudOps = true;
- options.missingDocumentSourceForInitialSync = _syncSource;
options.beginApplyingOpTime = lastOpTime;
- _oplogApplier = _dataReplicatorExternalState->makeOplogApplier(
- _oplogBuffer.get(), _observer.get(), consistencyMarkers, _storage, options, _writerPool);
+ _oplogApplier = _dataReplicatorExternalState->makeOplogApplier(_oplogBuffer.get(),
+ &noopOplogApplierObserver,
+ consistencyMarkers,
+ _storage,
+ options,
+ _writerPool);
const auto beginApplyingTimestamp = lastOpTime.getTimestamp();
_initialSyncState->beginApplyingTimestamp = beginApplyingTimestamp;
@@ -1289,56 +1268,6 @@ void InitialSyncer::_multiApplierCallback(const Status& multiApplierStatus,
auto opCtx = makeOpCtx();
const bool orderedCommit = true;
_storage->oplogDiskLocRegister(opCtx.get(), lastAppliedOpTime.getTimestamp(), orderedCommit);
-
- auto fetchCount = _fetchCount.load();
- if (fetchCount > 0) {
- _initialSyncState->fetchedMissingDocs += fetchCount;
- _fetchCount.store(0);
- status = _scheduleLastOplogEntryFetcher_inlock(
- [=](const StatusWith<mongo::Fetcher::QueryResponse>& response,
- mongo::Fetcher::NextAction*,
- mongo::BSONObjBuilder*) {
- return _lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments(
- response, onCompletionGuard);
- });
- if (!status.isOK()) {
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
- return;
- }
- return;
- }
-
- _checkApplierProgressAndScheduleGetNextApplierBatch_inlock(lock, onCompletionGuard);
-}
-
-void InitialSyncer::_lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments(
- const StatusWith<Fetcher::QueryResponse>& result,
- std::shared_ptr<OnCompletionGuard> onCompletionGuard) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- auto status = _checkForShutdownAndConvertStatus_inlock(
- result.getStatus(), "error getting last oplog entry after fetching missing documents");
- if (!status.isOK()) {
- error() << "Failed to get new minValid from source " << _syncSource << " due to '"
- << redact(status) << "'";
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
- return;
- }
-
- auto&& optimeStatus = parseOpTimeAndWallTime(result);
- if (!optimeStatus.isOK()) {
- error() << "Failed to parse new minValid from source " << _syncSource << " due to '"
- << redact(optimeStatus.getStatus()) << "'";
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, optimeStatus.getStatus());
- return;
- }
- auto&& optime = optimeStatus.getValue().opTime;
-
- const auto newOplogEnd = optime.getTimestamp();
- LOG(2) << "Pushing back minValid from " << _initialSyncState->stopTimestamp << " to "
- << newOplogEnd;
- _initialSyncState->stopTimestamp = newOplogEnd;
-
- // Get another batch to apply.
_checkApplierProgressAndScheduleGetNextApplierBatch_inlock(lock, onCompletionGuard);
}
diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h
index 9021970cfcb..414de99a576 100644
--- a/src/mongo/db/repl/initial_syncer.h
+++ b/src/mongo/db/repl/initial_syncer.h
@@ -68,10 +68,6 @@ MONGO_FAIL_POINT_DECLARE(failInitSyncWithBufferedEntriesLeft);
// Failpoint which causes the initial sync function to hang before copying databases.
MONGO_FAIL_POINT_DECLARE(initialSyncHangBeforeCopyingDatabases);
-// Failpoint which causes the initial sync function to hang before calling shouldRetry on a failed
-// operation.
-MONGO_FAIL_POINT_DECLARE(initialSyncHangBeforeGettingMissingDocument);
-
// Failpoint which stops the applier.
MONGO_FAIL_POINT_DECLARE(rsSyncApplyStop);
@@ -94,11 +90,9 @@ struct InitialSyncerOptions {
/** Function to sets this node into a specific follower mode. */
using SetFollowerModeFn = std::function<bool(const MemberState&)>;
- // Error and retry values
+ // Retry values
Milliseconds syncSourceRetryWait{1000};
Milliseconds initialSyncRetryWait{1000};
- Seconds blacklistSyncSourcePenaltyForNetworkConnectionError{10};
- Minutes blacklistSyncSourcePenaltyForOplogStartMissing{10};
// InitialSyncer waits this long before retrying getApplierBatchCallback() if there are
// currently no operations available to apply or if the 'rsSyncApplyStop' failpoint is active.
@@ -326,28 +320,19 @@ private:
* | (no ops to apply) | | (have ops to apply)
* | | |
* | | V
- * | | _getNextApplierBatchCallback()<-----+
- * | | | ^ |
- * | | | | |
- * | | | (no docs fetched | |
- * | | | and end ts not | |
- * | | | reached) | |
- * | | | | |
- * | | V | |
- * | | _multiApplierCallback()-----+ |
- * | | | | |
- * | | | | |
- * | | | | (docs fetched) | (end ts not
- * | | | | | reached)
- * | | | V |
- * | | | _lastOplogEntryFetcherCallbackAfter-
- * | | | FetchingMissingDocuments()
- * | | | |
- * | | | |
- * | (reached end timestamp)
- * | | | |
- * | V V V
- * | _rollbackCheckerCheckForRollbackCallback()
+ * | | _getNextApplierBatchCallback()
+ * | | | ^
+ * | | | |
+ * | | | (end ts not reached)
+ * | | | |
+ * | | V |
+ * | | _multiApplierCallback()-----+
+ * | | |
+ * | | |
+ * | (reached end timestamp)
+ * | | |
+ * | V V
+ * | _rollbackCheckerCheckForRollbackCallback()
* | |
* | |
* +------------------------------+
@@ -474,16 +459,6 @@ private:
std::shared_ptr<OnCompletionGuard> onCompletionGuard);
/**
- * Callback for third '_lastOplogEntryFetcher' callback. This is scheduled after MultiApplier
- * completed successfully and missing documents were fetched from the sync source while
- * DataReplicatorExternalState::_multiApply() was processing operations.
- * This callback will update InitialSyncState::stopTimestamp on success.
- */
- void _lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments(
- const StatusWith<Fetcher::QueryResponse>& result,
- std::shared_ptr<OnCompletionGuard> onCompletionGuard);
-
- /**
* Callback for rollback checker's last replSetGetRBID command after cloning data and applying
* operations.
*/
@@ -650,7 +625,6 @@ private:
OpTime _lastFetched; // (MX)
OpTimeAndWallTime _lastApplied; // (MX)
std::unique_ptr<OplogBuffer> _oplogBuffer; // (M)
- std::unique_ptr<OplogApplier::Observer> _observer; // (S)
std::unique_ptr<OplogApplier> _oplogApplier; // (M)
// Used to signal changes in _state.
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 820c4147fa4..47d4283b03b 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -3948,118 +3948,6 @@ TEST_F(InitialSyncerTest,
ASSERT_EQUALS(lastOp.getWallClockTime().get(), _lastApplied.getValue().wallTime);
}
-TEST_F(
- InitialSyncerTest,
- InitialSyncerSchedulesLastOplogEntryFetcherToGetNewStopTimestampIfMissingDocumentsHaveBeenFetchedDuringMultiInitialSyncApply) {
- // Skip reconstructing prepared transactions at the end of initial sync because
- // InitialSyncerTest does not construct ServiceEntryPoint and this causes a segmentation fault
- // when reconstructPreparedTransactions uses DBDirectClient to call into ServiceEntryPoint.
- FailPointEnableBlock skipReconstructPreparedTransactions("skipReconstructPreparedTransactions");
-
- // Skip clearing initial sync progress so that we can check if missing documents have been
- // fetched after the initial sync attempt.
- FailPointEnableBlock skipClearInitialSyncState("skipClearInitialSyncState");
-
- auto initialSyncer = &getInitialSyncer();
- auto opCtx = makeOpCtx();
-
- // Override DataReplicatorExternalState::_multiApply() so that it will also fetch a missing
- // document.
- // This forces InitialSyncer to evaluate its end timestamp for applying operations after each
- // batch.
- bool fetchCountIncremented = false;
- getExternalState()->multiApplyFn = [&fetchCountIncremented](OperationContext* opCtx,
- const MultiApplier::Operations& ops,
- OplogApplier::Observer* observer) {
- if (!fetchCountIncremented) {
- auto entry = makeOplogEntry(1);
- observer->onMissingDocumentsFetchedAndInserted({std::make_pair(entry, BSONObj())});
- fetchCountIncremented = true;
- }
- return ops.back().getOpTime();
- };
-
- _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345));
- ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts));
-
- // Use command for third and last operation to ensure we have two batches to apply.
- auto lastOp = makeOplogEntry(3, OpTypeEnum::kCommand);
-
- auto net = getNet();
- int baseRollbackId = 1;
- {
- executor::NetworkInterfaceMock::InNetworkGuard guard(net);
-
- // Base rollback ID.
- net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
-
- // Send an empty optime as the response to the beginFetchingOptime find request, which will
- // cause the beginFetchingTimestamp to be the same as the beginApplyingTimestamp.
- auto request = net->scheduleSuccessfulResponse(
- makeCursorResponse(0LL, NamespaceString::kSessionTransactionsTableNamespace, {}, true));
- assertRemoteCommandNameEquals("find", request);
- net->runReadyNetworkOperations();
-
- // Last oplog entry.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)});
-
- // Feature Compatibility Version.
- processSuccessfulFCVFetcherResponse40();
-
- // Quickest path to a successful DatabasesCloner completion is to respond to the
- // listDatabases with an empty list of database names.
- assertRemoteCommandNameEquals(
- "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({})));
- net->runReadyNetworkOperations();
-
- // OplogFetcher's oplog tailing query. Response has enough operations to reach
- // end timestamp.
- request = net->scheduleSuccessfulResponse(
- makeCursorResponse(1LL,
- _options.localOplogNS,
- {makeOplogEntryObj(1), makeOplogEntryObj(2), lastOp.toBSON()}));
- assertRemoteCommandNameEquals("find", request);
- ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay"));
- net->runReadyNetworkOperations();
-
- // Second last oplog entry fetcher.
- // Send oplog entry with timestamp 2. InitialSyncer will update this end timestamp after
- // applying the first batch.
- processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)});
-
- // Black hole OplogFetcher's getMore request.
- auto noi = net->getNextReadyRequest();
- request = noi->getRequest();
- assertRemoteCommandNameEquals("getMore", request);
- net->blackHole(noi);
-
- // Third last oplog entry fetcher.
- processSuccessfulLastOplogEntryFetcherResponse({lastOp.toBSON()});
-
- // Last rollback ID.
- request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId));
- assertRemoteCommandNameEquals("replSetGetRBID", request);
- net->runReadyNetworkOperations();
-
- // _multiApplierCallback() will cancel the _getNextApplierBatchCallback() task after setting
- // the completion status.
- // We call runReadyNetworkOperations() again to deliver the cancellation status to
- // _oplogFetcherCallback().
- net->runReadyNetworkOperations();
- }
-
- initialSyncer->join();
- ASSERT_OK(_lastApplied.getStatus());
- ASSERT_EQUALS(lastOp.getOpTime(), _lastApplied.getValue().opTime);
- ASSERT_EQUALS(lastOp.getWallClockTime().get(), _lastApplied.getValue().wallTime);
-
- ASSERT_TRUE(fetchCountIncremented);
-
- auto progress = initialSyncer->getInitialSyncProgress();
- log() << "Progress after initial sync attempt: " << progress;
- ASSERT_EQUALS(1, progress.getIntField("fetchedMissingDocs")) << progress;
-}
-
TEST_F(InitialSyncerTest,
InitialSyncerReturnsInvalidSyncSourceWhenFailInitialSyncWithBadHostFailpointIsEnabled) {
auto initialSyncer = &getInitialSyncer();
@@ -4183,13 +4071,12 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
auto progress = initialSyncer->getInitialSyncProgress();
log() << "Progress after first failed response: " << progress;
- ASSERT_EQUALS(progress.nFields(), 8) << progress;
+ ASSERT_EQUALS(progress.nFields(), 7) << progress;
ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 0) << progress;
ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress;
ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress;
ASSERT_BSONOBJ_EQ(progress.getObjectField("initialSyncAttempts"), BSONObj());
- ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress;
ASSERT_EQUALS(progress.getIntField("appliedOps"), 0) << progress;
ASSERT_BSONOBJ_EQ(progress.getObjectField("databases"), BSON("databasesCloned" << 0));
@@ -4239,12 +4126,11 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
progress = initialSyncer->getInitialSyncProgress();
log() << "Progress after failure: " << progress;
- ASSERT_EQUALS(progress.nFields(), 8) << progress;
+ ASSERT_EQUALS(progress.nFields(), 7) << progress;
ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress;
ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress;
ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress;
- ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress;
ASSERT_EQUALS(progress.getIntField("appliedOps"), 0) << progress;
ASSERT_BSONOBJ_EQ(progress.getObjectField("databases"), BSON("databasesCloned" << 0));
@@ -4327,13 +4213,12 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
progress = initialSyncer->getInitialSyncProgress();
log() << "Progress after all but last successful response: " << progress;
- ASSERT_EQUALS(progress.nFields(), 9) << progress;
+ ASSERT_EQUALS(progress.nFields(), 8) << progress;
ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress;
ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress;
ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress;
ASSERT_EQUALS(progress["initialSyncOplogEnd"].timestamp(), Timestamp(7, 1)) << progress;
ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
- ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress;
// Expected applied ops to be a superset of this range: Timestamp(2,1) ... Timestamp(7,1).
ASSERT_GREATER_THAN_OR_EQUALS(progress.getIntField("appliedOps"), 6) << progress;
auto databasesProgress = progress.getObjectField("databases");
@@ -4389,7 +4274,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
progress = initialSyncer->getInitialSyncProgress();
log() << "Progress at end: " << progress;
- ASSERT_EQUALS(progress.nFields(), 11) << progress;
+ ASSERT_EQUALS(progress.nFields(), 10) << progress;
ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress;
ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress;
ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress;
@@ -4397,7 +4282,6 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) {
ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress;
ASSERT_EQUALS(progress["initialSyncOplogEnd"].timestamp(), Timestamp(7, 1)) << progress;
ASSERT_EQUALS(progress["initialSyncElapsedMillis"].type(), NumberInt) << progress;
- ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress;
// Expected applied ops to be a superset of this range: Timestamp(2,1) ... Timestamp(7,1).
ASSERT_GREATER_THAN_OR_EQUALS(progress.getIntField("appliedOps"), 6) << progress;
diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp
index 63f883e8e31..3a2a3f0fe97 100644
--- a/src/mongo/db/repl/oplog_applier.cpp
+++ b/src/mongo/db/repl/oplog_applier.cpp
@@ -44,6 +44,8 @@
namespace mongo {
namespace repl {
+NoopOplogApplierObserver noopOplogApplierObserver;
+
using CallbackArgs = executor::TaskExecutor::CallbackArgs;
// static
diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h
index e9a7daf26fc..7f5c492803f 100644
--- a/src/mongo/db/repl/oplog_applier.h
+++ b/src/mongo/db/repl/oplog_applier.h
@@ -71,10 +71,6 @@ public:
bool relaxUniqueIndexConstraints = false;
bool skipWritesToOplog = false;
- // For initial sync only. If an update fails, the missing document is fetched from
- // this sync source to insert into the local collection.
- boost::optional<HostAndPort> missingDocumentSourceForInitialSync;
-
// Used to determine which operations should be applied. Only initial sync will set this to
// be something other than the null optime.
OpTime beginApplyingOpTime = OpTime();
@@ -273,18 +269,15 @@ public:
*/
virtual void onBatchEnd(const StatusWith<OpTime>& lastOpTimeApplied,
const OplogApplier::Operations& operations) = 0;
+};
- /**
- * Called when documents are fetched and inserted into the collection in order to
- * apply an update operation.
- * Applies to initial sync only.
- *
- * TODO: Delegate fetching behavior to OplogApplier owner.
- */
- using FetchInfo = std::pair<OplogEntry, BSONObj>;
- virtual void onMissingDocumentsFetchedAndInserted(
- const std::vector<FetchInfo>& documentsFetchedAndInserted) = 0;
+class NoopOplogApplierObserver : public repl::OplogApplier::Observer {
+public:
+ void onBatchBegin(const repl::OplogApplier::Operations&) final {}
+ void onBatchEnd(const StatusWith<repl::OpTime>&, const repl::OplogApplier::Operations&) final {}
};
+extern NoopOplogApplierObserver noopOplogApplierObserver;
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h
deleted file mode 100644
index dca256bf871..00000000000
--- a/src/mongo/db/repl/oplogreader.h
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-
-#pragma once
-
-#include "mongo/client/dbclient_connection.h"
-#include "mongo/client/dbclient_cursor.h"
-#include "mongo/util/net/hostandport.h"
-
-namespace mongo {
-namespace repl {
-
-/**
- * Authenticates conn using the server's cluster-membership credentials.
- *
- * Returns true on successful authentication.
- */
-bool replAuthenticate(DBClientBase* conn);
-
-/* started abstracting out the querying of the primary/master's oplog
- still fairly awkward but a start.
-*/
-
-class OplogReader {
-private:
- std::shared_ptr<DBClientConnection> _conn;
- std::shared_ptr<DBClientCursor> cursor;
- int _tailingQueryOptions;
-
- // If _conn was actively connected, _host represents the current HostAndPort of the
- // connection.
- HostAndPort _host;
-
-public:
- OplogReader();
- ~OplogReader() {}
- void resetCursor() {
- cursor.reset();
- }
- void resetConnection() {
- cursor.reset();
- _conn.reset();
- _host = HostAndPort();
- }
- DBClientConnection* conn() {
- return _conn.get();
- }
- BSONObj findOne(const char* ns, const Query& q) {
- return conn()->findOne(ns, q, nullptr, QueryOption_SlaveOk);
- }
- BSONObj findOneByUUID(const std::string& db, UUID uuid, const BSONObj& filter) {
- // Note that the findOneByUUID() function of DBClient passes SlaveOK to the client.
- BSONObj foundDoc;
- std::tie(foundDoc, std::ignore) = conn()->findOneByUUID(db, uuid, filter);
- return foundDoc;
- }
-
- /* SO_TIMEOUT (send/recv time out) for our DBClientConnections */
- static const Seconds kSocketTimeout;
-
- /* ok to call if already connected */
- bool connect(const HostAndPort& host);
-
- void tailCheck();
-
- bool haveCursor() {
- return cursor.get() != nullptr;
- }
-
- void tailingQuery(const char* ns, const BSONObj& query);
-
- bool more() {
- uassert(15910, "Doesn't have cursor for reading oplog", cursor.get());
- return cursor->more();
- }
-
- bool moreInCurrentBatch() {
- uassert(15911, "Doesn't have cursor for reading oplog", cursor.get());
- return cursor->moreInCurrentBatch();
- }
-
- BSONObj nextSafe() {
- return cursor->nextSafe();
- }
-};
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/replication_auth.cpp
index 17811adcf9a..515dbf665f8 100644
--- a/src/mongo/db/repl/oplogreader.cpp
+++ b/src/mongo/db/repl/replication_auth.cpp
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2018-present MongoDB, Inc.
+ * Copyright (C) 2019-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -27,27 +27,20 @@
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
-
#include "mongo/platform/basic.h"
-#include "mongo/db/repl/oplogreader.h"
+#include "mongo/db/repl/replication_auth.h"
#include <string>
#include "mongo/client/authenticate.h"
#include "mongo/db/auth/authorization_manager.h"
-#include "mongo/db/auth/authorization_session.h"
-#include "mongo/executor/network_interface.h"
-#include "mongo/util/log.h"
namespace mongo {
namespace repl {
namespace {
// Gets the singleton AuthorizationManager object for this server process
-//
-// TODO (SERVER-37563): Pass the service context instead of calling getGlobalServiceContext.
AuthorizationManager* getGlobalAuthorizationManager() {
AuthorizationManager* globalAuthManager = AuthorizationManager::get(getGlobalServiceContext());
fassert(16842, globalAuthManager != nullptr);
@@ -64,47 +57,5 @@ bool replAuthenticate(DBClientBase* conn) {
return true;
}
-const Seconds OplogReader::kSocketTimeout(30);
-
-OplogReader::OplogReader() {
- _tailingQueryOptions = QueryOption_SlaveOk;
- _tailingQueryOptions |= QueryOption_CursorTailable | QueryOption_OplogReplay;
-
- /* TODO: slaveOk maybe shouldn't use? */
- _tailingQueryOptions |= QueryOption_AwaitData;
-}
-
-bool OplogReader::connect(const HostAndPort& host) {
- if (conn() == nullptr || _host != host) {
- resetConnection();
- _conn = std::shared_ptr<DBClientConnection>(
- new DBClientConnection(false, durationCount<Seconds>(kSocketTimeout)));
-
- std::string errmsg;
- if (!_conn->connect(host, StringData(), errmsg) || !replAuthenticate(_conn.get())) {
- resetConnection();
- error() << errmsg;
- return false;
- }
- _conn->setTags(transport::Session::kKeepOpen);
- _host = host;
- }
- return true;
-}
-
-void OplogReader::tailCheck() {
- if (cursor.get() && cursor->isDead()) {
- log() << "old cursor isDead, will initiate a new one" << std::endl;
- resetCursor();
- }
-}
-
-void OplogReader::tailingQuery(const char* ns, const BSONObj& query) {
- verify(!haveCursor());
- LOG(2) << ns << ".find(" << redact(query) << ')';
- cursor.reset(
- _conn->query(NamespaceString(ns), query, 0, 0, nullptr, _tailingQueryOptions).release());
-}
-
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/replication_auth.h b/src/mongo/db/repl/replication_auth.h
new file mode 100644
index 00000000000..6866f6f51a1
--- /dev/null
+++ b/src/mongo/db/repl/replication_auth.h
@@ -0,0 +1,48 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+
+#pragma once
+
+#include "mongo/client/dbclient_connection.h"
+#include "mongo/client/dbclient_cursor.h"
+#include "mongo/util/net/hostandport.h"
+
+namespace mongo {
+namespace repl {
+
+/**
+ * Authenticates conn using the server's cluster-membership credentials.
+ *
+ * Returns true on successful authentication.
+ */
+bool replAuthenticate(DBClientBase* conn);
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index a6106bd0370..69fd30eac81 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -133,13 +133,6 @@ ServerStatusMetricField<Counter64> displayBufferSize("repl.buffer.sizeBytes", &b
ServerStatusMetricField<Counter64> displayBufferMaxSize("repl.buffer.maxSizeBytes",
&bufferGauge.maxSize);
-class NoopOplogApplierObserver : public repl::OplogApplier::Observer {
-public:
- void onBatchBegin(const repl::OplogApplier::Operations&) final {}
- void onBatchEnd(const StatusWith<repl::OpTime>&, const repl::OplogApplier::Operations&) final {}
- void onMissingDocumentsFetchedAndInserted(const std::vector<FetchInfo>&) final {}
-} noopOplogApplierObserver;
-
/**
* Returns new thread pool for thread pool task executor.
*/
@@ -222,8 +215,7 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication(
// Using noop observer now that BackgroundSync no longer implements the OplogApplier::Observer
// interface. During steady state replication, there is no need to log details on every batch
- // we apply (recovery); or track missing documents that are fetched from the sync source
- // (initial sync).
+ // we apply.
_oplogApplier = std::make_unique<OplogApplierImpl>(
_oplogApplierTaskExecutor.get(),
_oplogBuffer.get(),
diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp
index c20aa0c87d4..6e93691e17b 100644
--- a/src/mongo/db/repl/replication_info.cpp
+++ b/src/mongo/db/repl/replication_info.cpp
@@ -49,7 +49,7 @@
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/repl/is_master_response.h"
#include "mongo/db/repl/oplog.h"
-#include "mongo/db/repl/oplogreader.h"
+#include "mongo/db/repl/replication_auth.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_process.h"
#include "mongo/db/repl/storage_interface.h"
diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp
index 81aede4f6b0..999c79fb9af 100644
--- a/src/mongo/db/repl/replication_recovery.cpp
+++ b/src/mongo/db/repl/replication_recovery.cpp
@@ -87,7 +87,6 @@ public:
}
void onBatchEnd(const StatusWith<OpTime>&, const OplogApplier::Operations&) final {}
- void onMissingDocumentsFetchedAndInserted(const std::vector<FetchInfo>&) final {}
void complete(const OpTime& applyThroughOpTime) const {
log() << "Applied " << _numOpsApplied << " operations in " << _numBatches
diff --git a/src/mongo/db/repl/rollback_source_impl.cpp b/src/mongo/db/repl/rollback_source_impl.cpp
index 7d893f0c4fb..329a75d8064 100644
--- a/src/mongo/db/repl/rollback_source_impl.cpp
+++ b/src/mongo/db/repl/rollback_source_impl.cpp
@@ -31,10 +31,11 @@
#include "mongo/db/repl/rollback_source_impl.h"
+#include "mongo/client/dbclient_connection.h"
#include "mongo/db/cloner.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/repl/oplogreader.h"
+#include "mongo/db/repl/replication_auth.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/str.h"
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 17614f16d25..6d3fb15810b 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -65,9 +65,9 @@
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/initial_syncer.h"
#include "mongo/db/repl/multiapplier.h"
-#include "mongo/db/repl/oplogreader.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/repl_set_config.h"
+#include "mongo/db/repl/replication_auth.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/transaction_oplog_application.h"
#include "mongo/db/session.h"
@@ -837,154 +837,6 @@ bool SyncTail::inShutdown() const {
return _inShutdown;
}
-BSONObj SyncTail::getMissingDoc(OperationContext* opCtx, const OplogEntry& oplogEntry) {
- OplogReader missingObjReader; // why are we using OplogReader to run a non-oplog query?
-
- if (MONGO_FAIL_POINT(initialSyncHangBeforeGettingMissingDocument)) {
- log() << "initial sync - initialSyncHangBeforeGettingMissingDocument fail point enabled. "
- "Blocking until fail point is disabled.";
- while (MONGO_FAIL_POINT(initialSyncHangBeforeGettingMissingDocument)) {
- mongo::sleepsecs(1);
- }
- }
-
- auto source = _options.missingDocumentSourceForInitialSync;
- invariant(source);
-
- const int retryMax = 3;
- for (int retryCount = 1; retryCount <= retryMax; ++retryCount) {
- if (retryCount != 1) {
- // if we are retrying, sleep a bit to let the network possibly recover
- sleepsecs(retryCount * retryCount);
- }
- try {
- bool ok = missingObjReader.connect(*source);
- if (!ok) {
- warning() << "network problem detected while connecting to the "
- << "sync source, attempt " << retryCount << " of " << retryMax;
- continue; // try again
- }
- } catch (const NetworkException&) {
- warning() << "network problem detected while connecting to the "
- << "sync source, attempt " << retryCount << " of " << retryMax;
- continue; // try again
- }
-
- // get _id from oplog entry to create query to fetch document.
- const auto idElem = oplogEntry.getIdElement();
-
- if (idElem.eoo()) {
- severe() << "cannot fetch missing document without _id field: "
- << redact(oplogEntry.toBSON());
- fassertFailedNoTrace(28742);
- }
-
- BSONObj query = BSONObjBuilder().append(idElem).obj();
- BSONObj missingObj;
- auto nss = oplogEntry.getNss();
- try {
- auto uuid = oplogEntry.getUuid();
- if (!uuid) {
- missingObj = missingObjReader.findOne(nss.ns().c_str(), query);
- } else {
- auto dbname = nss.db();
- // If a UUID exists for the command object, find the document by UUID.
- missingObj = missingObjReader.findOneByUUID(dbname.toString(), *uuid, query);
- }
- } catch (const NetworkException&) {
- warning() << "network problem detected while fetching a missing document from the "
- << "sync source, attempt " << retryCount << " of " << retryMax;
- continue; // try again
- } catch (DBException& e) {
- error() << "assertion fetching missing object: " << redact(e);
- throw;
- }
-
- // success!
- return missingObj;
- }
- // retry count exceeded
- msgasserted(15916,
- str::stream() << "Can no longer connect to initial sync source: "
- << source->toString());
-}
-
-void SyncTail::fetchAndInsertMissingDocument(OperationContext* opCtx,
- const OplogEntry& oplogEntry) {
- // Note that using the local UUID/NamespaceString mapping is sufficient for checking
- // whether the collection is capped on the remote because convertToCapped creates a
- // new collection with a different UUID.
- const NamespaceString nss(parseUUIDOrNs(opCtx, oplogEntry));
-
- {
- // If the document is in a capped collection then it's okay for it to be missing.
- AutoGetCollectionForRead autoColl(opCtx, nss);
- Collection* const collection = autoColl.getCollection();
- if (collection && collection->isCapped()) {
- log() << "Not fetching missing document in capped collection (" << nss << ")";
- return;
- }
- }
-
- log() << "Fetching missing document: " << redact(oplogEntry.toBSON());
- BSONObj missingObj = getMissingDoc(opCtx, oplogEntry);
-
- if (missingObj.isEmpty()) {
- BSONObj object2;
- if (auto optionalObject2 = oplogEntry.getObject2()) {
- object2 = *optionalObject2;
- }
- log() << "Missing document not found on source; presumably deleted later in oplog. o first "
- "field: "
- << redact(oplogEntry.getObject()) << ", o2: " << redact(object2);
-
- return;
- }
-
- return writeConflictRetry(opCtx, "fetchAndInsertMissingDocument", nss.ns(), [&] {
- // Take an X lock on the database in order to preclude other modifications.
- AutoGetDb autoDb(opCtx, nss.db(), MODE_X);
- Database* const db = autoDb.getDb();
-
- WriteUnitOfWork wunit(opCtx);
-
- Collection* coll = nullptr;
- auto uuid = oplogEntry.getUuid();
- if (!uuid) {
- if (!db) {
- return;
- }
- coll = db->getOrCreateCollection(opCtx, nss);
- } else {
- // If the oplog entry has a UUID, use it to find the collection in which to insert the
- // missing document.
- auto& catalog = CollectionCatalog::get(opCtx);
- coll = catalog.lookupCollectionByUUID(*uuid);
- if (!coll) {
- // TODO(SERVER-30819) insert this UUID into the missing UUIDs set.
- return;
- }
- }
-
- invariant(coll);
-
- OpDebug* const nullOpDebug = nullptr;
- Status status = coll->insertDocument(opCtx, InsertStatement(missingObj), nullOpDebug, true);
- uassert(15917,
- str::stream() << "Failed to insert missing document: " << status.toString(),
- status.isOK());
-
- LOG(1) << "Inserted missing document: " << redact(missingObj);
-
- wunit.commit();
-
- if (_observer) {
- const OplogApplier::Observer::FetchInfo fetchInfo(oplogEntry, missingObj);
- _observer->onMissingDocumentsFetchedAndInserted({fetchInfo});
- }
- });
-}
-
// This free function is used by the writer threads to apply each op
Status multiSyncApply(OperationContext* opCtx,
MultiApplier::OperationPtrs* ops,
@@ -1038,14 +890,10 @@ Status multiSyncApply(OperationContext* opCtx,
opCtx, &entry, oplogApplicationMode, stableTimestampForRecovery);
if (!status.isOK()) {
- // In initial sync, update operations can cause documents to be missed during
- // collection cloning. As a result, it is possible that a document that we
- // need to update is not present locally. In that case we fetch the document
- // from the sync source.
+ // Tried to apply an update operation but the document is missing, there must be
+ // a delete operation for the document later in the oplog.
if (status == ErrorCodes::UpdateOperationFailed &&
- st->getOptions().missingDocumentSourceForInitialSync) {
- // We might need to fetch the missing docs from the sync source.
- st->fetchAndInsertMissingDocument(opCtx, entry);
+ oplogApplicationMode == OplogApplication::Mode::kInitialSync) {
continue;
}
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index a2ab581d75c..65d7639e5cb 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -192,23 +192,6 @@ public:
using BatchLimits = OplogApplier::BatchLimits;
/**
- * Fetch a single document referenced in the operation from the sync source.
- *
- * The sync source is specified at construction in
- * OplogApplier::Options::missingDocumentSourceForInitialSync.
- */
- virtual BSONObj getMissingDoc(OperationContext* opCtx, const OplogEntry& oplogEntry);
-
- /**
- * If an update fails, fetches the missing document and inserts it into the local collection.
- *
- * Calls OplogApplier::Observer::onMissingDocumentsFetchedAndInserted() if the document was
- * fetched and inserted successfully.
- */
- virtual void fetchAndInsertMissingDocument(OperationContext* opCtx,
- const OplogEntry& oplogEntry);
-
- /**
* Applies a batch of oplog entries by writing the oplog entries to the local oplog and then
* using a set of threads to apply the operations. It will only apply (but will
* still write to the oplog) oplog entries with a timestamp greater than or equal to the
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index d6a194ba29d..1e544e731fb 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -101,51 +101,14 @@ OplogEntry makeOplogEntry(OpTypeEnum opType, NamespaceString nss, OptionalCollec
}
/**
- * Testing-only SyncTail that returns user-provided "document" for getMissingDoc().
+ * Testing-only SyncTail.
*/
-class SyncTailWithLocalDocumentFetcher : public SyncTail, OplogApplier::Observer {
+class SyncTailForTest : public SyncTail {
public:
- SyncTailWithLocalDocumentFetcher(const BSONObj& document);
- BSONObj getMissingDoc(OperationContext* opCtx, const OplogEntry& oplogEntry) override;
-
- // OplogApplier::Observer functions
- void onBatchBegin(const OplogApplier::Operations&) final {}
- void onBatchEnd(const StatusWith<OpTime>&, const OplogApplier::Operations&) final {}
- void onMissingDocumentsFetchedAndInserted(const std::vector<FetchInfo>& docs) final {
- numFetched += docs.size();
- }
-
- std::size_t numFetched = 0U;
-
-private:
- BSONObj _document;
-};
-
-/**
- * Testing-only SyncTail that checks the operation context in fetchAndInsertMissingDocument().
- */
-class SyncTailWithOperationContextChecker : public SyncTail {
-public:
- SyncTailWithOperationContextChecker();
- void fetchAndInsertMissingDocument(OperationContext* opCtx,
- const OplogEntry& oplogEntry) override;
- bool called = false;
+ SyncTailForTest();
};
-SyncTailWithLocalDocumentFetcher::SyncTailWithLocalDocumentFetcher(const BSONObj& document)
- : SyncTail(this, // observer
- nullptr, // consistency markers
- nullptr, // storage interface
- SyncTail::MultiSyncApplyFunc(),
- nullptr, // writer pool
- SyncTailTest::makeInitialSyncOptions()),
- _document(document) {}
-
-BSONObj SyncTailWithLocalDocumentFetcher::getMissingDoc(OperationContext*, const OplogEntry&) {
- return _document;
-}
-
-SyncTailWithOperationContextChecker::SyncTailWithOperationContextChecker()
+SyncTailForTest::SyncTailForTest()
: SyncTail(nullptr, // observer
nullptr, // consistency markers
nullptr, // storage interface
@@ -153,14 +116,6 @@ SyncTailWithOperationContextChecker::SyncTailWithOperationContextChecker()
nullptr, // writer pool
SyncTailTest::makeInitialSyncOptions()) {}
-void SyncTailWithOperationContextChecker::fetchAndInsertMissingDocument(OperationContext* opCtx,
- const OplogEntry&) {
- ASSERT_FALSE(opCtx->writesAreReplicated());
- ASSERT_FALSE(opCtx->lockState()->shouldConflictWithSecondaryBatchApplication());
- ASSERT_TRUE(documentValidationDisabled(opCtx));
- called = true;
-}
-
/**
* Creates collection options suitable for oplog.
*/
@@ -1769,8 +1724,7 @@ TEST_F(SyncTailTest, MultiSyncApplyFallsBackOnApplyingInsertsIndividuallyWhenGro
}
TEST_F(SyncTailTest, MultiSyncApplyIgnoresUpdateOperationIfDocumentIsMissingFromSyncSource) {
- BSONObj emptyDoc;
- SyncTailWithLocalDocumentFetcher syncTail(emptyDoc);
+ SyncTailForTest syncTail;
NamespaceString nss("test.t");
{
Lock::GlobalWrite globalLock(_opCtx.get());
@@ -1786,17 +1740,14 @@ TEST_F(SyncTailTest, MultiSyncApplyIgnoresUpdateOperationIfDocumentIsMissingFrom
WorkerMultikeyPathInfo pathInfo;
ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
- // Since the missing document is not found on the sync source, the collection referenced by
- // the failed operation should not be automatically created.
+ // Since the document was missing when we cloned data from the sync source, the collection
+ // referenced by the failed operation should not be automatically created.
ASSERT_FALSE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection());
-
- // Fetch count should remain zero if we failed to copy the missing document.
- ASSERT_EQUALS(syncTail.numFetched, 0U);
}
TEST_F(SyncTailTest, MultiSyncApplySkipsDocumentOnNamespaceNotFoundDuringInitialSync) {
BSONObj emptyDoc;
- SyncTailWithLocalDocumentFetcher syncTail(emptyDoc);
+ SyncTailForTest syncTail;
NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
NamespaceString badNss("local." + _agent.getSuiteName() + "_" + _agent.getTestName() + "bad");
auto doc1 = BSON("_id" << 1);
@@ -1809,7 +1760,6 @@ TEST_F(SyncTailTest, MultiSyncApplySkipsDocumentOnNamespaceNotFoundDuringInitial
MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3};
WorkerMultikeyPathInfo pathInfo;
ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
- ASSERT_EQUALS(syncTail.numFetched, 0U);
CollectionReader collectionReader(_opCtx.get(), nss);
ASSERT_BSONOBJ_EQ(doc1, unittest::assertGet(collectionReader.next()));
@@ -1819,7 +1769,7 @@ TEST_F(SyncTailTest, MultiSyncApplySkipsDocumentOnNamespaceNotFoundDuringInitial
TEST_F(SyncTailTest, MultiSyncApplySkipsIndexCreationOnNamespaceNotFoundDuringInitialSync) {
BSONObj emptyDoc;
- SyncTailWithLocalDocumentFetcher syncTail(emptyDoc);
+ SyncTailForTest syncTail;
NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName());
NamespaceString badNss("local." + _agent.getSuiteName() + "_" + _agent.getTestName() + "bad");
auto doc1 = BSON("_id" << 1);
@@ -1834,7 +1784,6 @@ TEST_F(SyncTailTest, MultiSyncApplySkipsIndexCreationOnNamespaceNotFoundDuringIn
MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3};
WorkerMultikeyPathInfo pathInfo;
ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
- ASSERT_EQUALS(syncTail.numFetched, 0U);
CollectionReader collectionReader(_opCtx.get(), nss);
ASSERT_BSONOBJ_EQ(doc1, unittest::assertGet(collectionReader.next()));
@@ -1845,26 +1794,6 @@ TEST_F(SyncTailTest, MultiSyncApplySkipsIndexCreationOnNamespaceNotFoundDuringIn
ASSERT_FALSE(AutoGetCollectionForReadCommand(_opCtx.get(), badNss).getCollection());
}
-TEST_F(SyncTailTest, MultiSyncApplyFetchesMissingDocumentIfDocumentIsAvailableFromSyncSource) {
- SyncTailWithLocalDocumentFetcher syncTail(BSON("_id" << 0 << "x" << 1));
- NamespaceString nss("test.t");
- createCollection(_opCtx.get(), nss, {});
- auto updatedDocument = BSON("_id" << 0 << "x" << 1);
- auto op = makeUpdateDocumentOplogEntry(
- {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), updatedDocument);
- MultiApplier::OperationPtrs ops = {&op};
- WorkerMultikeyPathInfo pathInfo;
- ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo));
- ASSERT_EQUALS(syncTail.numFetched, 1U);
-
- // The collection referenced by "ns" in the failed operation is automatically created to hold
- // the missing document fetched from the sync source. We verify the contents of the collection
- // with the CollectionReader class.
- CollectionReader collectionReader(_opCtx.get(), nss);
- ASSERT_BSONOBJ_EQ(updatedDocument, unittest::assertGet(collectionReader.next()));
- ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, collectionReader.next().getStatus());
-}
-
namespace {
class ReplicationCoordinatorSignalDrainCompleteThrows : public ReplicationCoordinatorMock {
diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp
index 59860e000c2..7c85abeabf3 100644
--- a/src/mongo/db/repl/sync_tail_test_fixture.cpp
+++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp
@@ -94,7 +94,6 @@ void SyncTailOpObserver::onCreateCollection(OperationContext* opCtx,
OplogApplier::Options SyncTailTest::makeInitialSyncOptions() {
OplogApplier::Options options(OplogApplication::Mode::kInitialSync);
options.allowNamespaceNotFoundErrorsOnCrudOps = true;
- options.missingDocumentSourceForInitialSync = HostAndPort("localhost", 123);
return options;
}
diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp
index 95d016a523d..0712c4e7b88 100644
--- a/src/mongo/dbtests/repltests.cpp
+++ b/src/mongo/dbtests/repltests.cpp
@@ -1356,96 +1356,6 @@ public:
}
};
-class SyncTest : public SyncTail {
-public:
- bool returnEmpty;
- explicit SyncTest(OplogApplier::Observer* observer)
- : SyncTail(observer,
- nullptr,
- nullptr,
- SyncTail::MultiSyncApplyFunc(),
- nullptr,
- OplogApplier::Options(OplogApplication::Mode::kInitialSync)),
- returnEmpty(false) {}
- virtual ~SyncTest() {}
- BSONObj getMissingDoc(OperationContext* opCtx, const OplogEntry& oplogEntry) override {
- if (returnEmpty) {
- BSONObj o;
- return o;
- }
- return BSON("_id"
- << "on remote"
- << "foo"
- << "baz");
- }
-};
-
-class FetchAndInsertMissingDocumentObserver : public OplogApplier::Observer {
-public:
- void onBatchBegin(const OplogApplier::Operations&) final {}
- void onBatchEnd(const StatusWith<OpTime>&, const OplogApplier::Operations&) final {}
- void onMissingDocumentsFetchedAndInserted(const std::vector<FetchInfo>&) final {
- fetched = true;
- }
- bool fetched = false;
-};
-
-class FetchAndInsertMissingDocument : public Base {
-public:
- void run() {
- // Replication is not supported by mobile SE.
- if (mongo::storageGlobalParams.engine == "mobile") {
- return;
- }
- bool threw = false;
- auto oplogEntry = makeOplogEntry(OpTime(Timestamp(100, 1), 1LL), // optime
- OpTypeEnum::kUpdate, // op type
- NamespaceString(ns()), // namespace
- BSON("foo"
- << "bar"), // o
- BSON("_id"
- << "in oplog"
- << "foo"
- << "bar")); // o2
-
- Lock::GlobalWrite lk(&_opCtx);
-
- // this should fail because we can't connect
- try {
- OplogApplier::Options options(OplogApplication::Mode::kInitialSync);
- options.allowNamespaceNotFoundErrorsOnCrudOps = true;
- options.missingDocumentSourceForInitialSync = HostAndPort("localhost", 123);
- SyncTail badSource(
- nullptr, nullptr, nullptr, SyncTail::MultiSyncApplyFunc(), nullptr, options);
-
- OldClientContext ctx(&_opCtx, ns());
- badSource.getMissingDoc(&_opCtx, oplogEntry);
- } catch (DBException&) {
- threw = true;
- }
- verify(threw);
-
- // now this should succeed
- FetchAndInsertMissingDocumentObserver observer;
- SyncTest t(&observer);
- t.fetchAndInsertMissingDocument(&_opCtx, oplogEntry);
- ASSERT(observer.fetched);
- verify(!_client
- .findOne(ns(),
- BSON("_id"
- << "on remote"))
- .isEmpty());
-
- // Reset flag in observer before next test case.
- observer.fetched = false;
-
- // force it not to find an obj
- t.returnEmpty = true;
- t.fetchAndInsertMissingDocument(&_opCtx, oplogEntry);
- ASSERT_FALSE(observer.fetched);
- }
-};
-
class All : public Suite {
public:
All() : Suite("repl") {}
@@ -1503,7 +1413,6 @@ public:
add<Idempotence::ReplaySetPreexistingNoOpPull>();
add<Idempotence::ReplayArrayFieldNotAppended>();
add<DeleteOpIsIdBased>();
- add<FetchAndInsertMissingDocument>();
}
};
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index ea4dc1e148b..5d2ac4a9e25 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -145,7 +145,6 @@ class DoNothingOplogApplierObserver : public repl::OplogApplier::Observer {
public:
void onBatchBegin(const repl::OplogApplier::Operations&) final {}
void onBatchEnd(const StatusWith<repl::OpTime>&, const repl::OplogApplier::Operations&) final {}
- void onMissingDocumentsFetchedAndInserted(const std::vector<FetchInfo>&) final {}
};
class StorageTimestampTest {
@@ -1374,7 +1373,6 @@ public:
auto writerPool = repl::OplogApplier::makeWriterPool();
repl::OplogApplier::Options options(repl::OplogApplication::Mode::kInitialSync);
options.allowNamespaceNotFoundErrorsOnCrudOps = true;
- options.missingDocumentSourceForInitialSync = HostAndPort("localhost", 123);
repl::OplogApplierImpl oplogApplier(
nullptr, // task executor. not required for multiApply().