diff options
author | A. Jesse Jiryu Davis <jesse@mongodb.com> | 2019-08-06 23:00:28 -0400 |
---|---|---|
committer | A. Jesse Jiryu Davis <jesse@mongodb.com> | 2019-08-21 11:55:02 -0400 |
commit | 609e2697c2619925073d4ac0f57f33e789d05100 (patch) | |
tree | 0c94a13d998f40b9cd16f4da272e98744d6bb98b | |
parent | 08f836390d61726235e583f46012f43995695c85 (diff) | |
download | mongo-609e2697c2619925073d4ac0f57f33e789d05100.tar.gz |
SERVER-42022 Remove missing-document fetcher
31 files changed, 178 insertions, 1164 deletions
diff --git a/jstests/noPassthrough/apply_ops_mode.js b/jstests/noPassthrough/apply_ops_mode.js index 2376ed1c30f..b03edf2c855 100644 --- a/jstests/noPassthrough/apply_ops_mode.js +++ b/jstests/noPassthrough/apply_ops_mode.js @@ -12,48 +12,54 @@ var standalone = MongoRunner.runMongod(); var db = standalone.getDB("test"); var coll = db.getCollection("apply_ops_mode1"); -coll.drop(); -assert.commandWorked(coll.insert({_id: 1})); // ------------ Testing normal updates --------------- var id = ObjectId(); -var updateOp = {op: 'u', ns: coll.getFullName(), o: {_id: id, x: 1}, o2: {_id: id}}; -assert.commandFailed(db.adminCommand({applyOps: [updateOp], alwaysUpsert: false})); -assert.eq(coll.count({x: 1}), 0); - -// Test that 'InitialSync' does not override 'alwaysUpsert: false'. -assert.commandFailed(db.adminCommand( - {applyOps: [updateOp], alwaysUpsert: false, oplogApplicationMode: "InitialSync"})); -assert.eq(coll.count({x: 1}), 0); - -// Test parsing failure. -assert.commandFailedWithCode( - db.adminCommand({applyOps: [updateOp], oplogApplicationMode: "BadMode"}), - ErrorCodes.FailedToParse); -assert.commandFailedWithCode(db.adminCommand({applyOps: [updateOp], oplogApplicationMode: 5}), - ErrorCodes.TypeMismatch); - -// Test default succeeds. -assert.commandWorked(db.adminCommand({applyOps: [updateOp]})); -assert.eq(coll.count({x: 1}), 1); - -// Use new collection to make logs cleaner. -coll = db.getCollection("apply_ops_mode2"); -coll.drop(); -updateOp.ns = coll.getFullName(); -assert.commandWorked(coll.insert({_id: 1})); - -// Test default succeeds in 'InitialSync' mode. -assert.commandWorked(db.adminCommand({applyOps: [updateOp], oplogApplicationMode: "InitialSync"})); -assert.eq(coll.count({x: 1}), 1); +for (let updateOp of [ + // An update with a modifier. + {op: 'u', ns: coll.getFullName(), o: {$set: {x: 1}}, o2: {_id: id}}, + // A full-document replace. + {op: 'u', ns: coll.getFullName(), o: {_id: id, x: 1}, o2: {_id: id}}, +]) { + coll.drop(); + assert.writeOK(coll.insert({_id: 1})); + + jsTestLog(`Test applyOps with the following op:\n${tojson(updateOp)}`); + assert.commandFailed(db.adminCommand({applyOps: [updateOp], alwaysUpsert: false})); + assert.eq(coll.count({x: 1}), 0); + + // Test that 'InitialSync' does not override 'alwaysUpsert: false'. + assert.commandFailed(db.adminCommand( + {applyOps: [updateOp], alwaysUpsert: false, oplogApplicationMode: "InitialSync"})); + assert.eq(coll.count({x: 1}), 0); + + // Test parsing failure. + assert.commandFailedWithCode( + db.adminCommand({applyOps: [updateOp], oplogApplicationMode: "BadMode"}), + ErrorCodes.FailedToParse); + assert.commandFailedWithCode(db.adminCommand({applyOps: [updateOp], oplogApplicationMode: 5}), + ErrorCodes.TypeMismatch); + + // Test default succeeds. + assert.commandWorked(db.adminCommand({applyOps: [updateOp]})); + assert.eq(coll.count({x: 1}), 1); + + coll.drop(); + assert.commandWorked(coll.insert({_id: 1})); + + // Test default succeeds in 'InitialSync' mode. + assert.commandWorked( + db.adminCommand({applyOps: [updateOp], oplogApplicationMode: "InitialSync"})); + assert.eq(coll.count({x: 1}), 1); +} // ------------ Testing fCV updates --------------- var adminDB = db.getSiblingDB("admin"); const systemVersionColl = adminDB.getCollection("system.version"); -updateOp = { +var updateOp = { op: 'u', ns: systemVersionColl.getFullName(), o: {_id: "featureCompatibilityVersion", version: lastStableFCV}, diff --git a/jstests/replsets/initial_sync_replSetGetStatus.js b/jstests/replsets/initial_sync_replSetGetStatus.js index 7d325997328..96397750f44 100644 --- a/jstests/replsets/initial_sync_replSetGetStatus.js +++ b/jstests/replsets/initial_sync_replSetGetStatus.js @@ -61,7 +61,6 @@ checkLog.contains(secondary, 'initial sync - initialSyncHangBeforeFinish fail po res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1})); assert(res.initialSyncStatus, () => "Response should have an 'initialSyncStatus' field: " + tojson(res)); -assert.eq(res.initialSyncStatus.fetchedMissingDocs, 0); assert.eq(res.initialSyncStatus.appliedOps, 3); assert.eq(res.initialSyncStatus.failedInitialSyncAttempts, 0); assert.eq(res.initialSyncStatus.maxFailedInitialSyncAttempts, 10); diff --git a/jstests/replsets/initial_sync_update_missing_doc1.js b/jstests/replsets/initial_sync_update_missing_doc.js index 2c1fdc1eea4..532e82ab3bd 100644 --- a/jstests/replsets/initial_sync_update_missing_doc1.js +++ b/jstests/replsets/initial_sync_update_missing_doc.js @@ -4,29 +4,24 @@ * 2) copies all non-local databases from the source; and * 3) fetches and applies operations from the source after op_start1. * - * This test updates and deletes a document on the source between phases 1 and 2. The - * secondary will initially fail to apply the update operation in phase 3 and subsequently have - * to attempt to check the source for a new copy of the document. The absence of the document on - * the source indicates that the source is free to ignore the failed update operation. - * + * This test updates and deletes a document on the source between phases 1 and 2. The secondary will + * fail to apply the update operation in phase 3 but initial sync completes nevertheless. The + * absence of the document on the source indicates that the source is free to ignore the failed + * update operation. */ (function() { load("jstests/replsets/libs/initial_sync_update_missing_doc.js"); load("jstests/libs/check_log.js"); -const name = 'initial_sync_update_missing_doc1'; -const replSet = new ReplSetTest({ - name: name, - nodes: 1, -}); +const replSet = new ReplSetTest({nodes: 1}); replSet.startSet(); replSet.initiate(); const primary = replSet.getPrimary(); const dbName = 'test'; - -var coll = primary.getDB(dbName).getCollection(name); +const collectionName = jsTestName(); +const coll = primary.getDB(dbName).getCollection(collectionName); assert.commandWorked(coll.insert({_id: 0, x: 1})); // Add a secondary node with priority: 0 and votes: 0 so that we prevent elections while @@ -38,16 +33,8 @@ const secondary = reInitiateSetWithSecondary(replSet, secondaryConfig); // Update and remove document on primary. updateRemove(coll, {_id: 0}); - turnOffHangBeforeCopyingDatabasesFailPoint(secondary); - -var res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1})); -assert.eq(res.initialSyncStatus.fetchedMissingDocs, 0); -var firstOplogEnd = res.initialSyncStatus.initialSyncOplogEnd; - -turnOffHangBeforeGettingMissingDocFailPoint(primary, secondary, name, 0 /* numFetched */); - -finishAndValidate(replSet, name, firstOplogEnd, 0 /* numFetched */, 0 /* numDocuments */); +finishAndValidate(replSet, collectionName, 0 /* numDocuments */); replSet.stopSet(); })(); diff --git a/jstests/replsets/initial_sync_update_missing_doc2.js b/jstests/replsets/initial_sync_update_missing_doc2.js deleted file mode 100644 index f2c51e85c9e..00000000000 --- a/jstests/replsets/initial_sync_update_missing_doc2.js +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Initial sync runs in several phases - the first 3 are as follows: - * 1) fetches the last oplog entry (op_start1) on the source; - * 2) copies all non-local databases from the source; and - * 3) fetches and applies operations from the source after op_start1. - * - * This test updates and deletes a document on the source between phases 1 and 2. The - * secondary will initially fail to apply the update operation in phase 3 and subsequently have - * to attempt to check the source for a new copy of the document. Before the secondary checks - * the source, we insert a new copy of the document on the source so that the secondary can fetch - * it. - * - */ - -(function() { -load("jstests/replsets/libs/initial_sync_update_missing_doc.js"); -load("jstests/libs/check_log.js"); - -var name = 'initial_sync_update_missing_doc2'; -var replSet = new ReplSetTest({ - name: name, - nodes: 1, -}); - -replSet.startSet(); -replSet.initiate(); - -const primary = replSet.getPrimary(); -const dbName = 'test'; - -var coll = primary.getDB(dbName).getCollection(name); -assert.commandWorked(coll.insert({_id: 0, x: 1})); - -// Add a secondary node with priority: 0 and votes: 0 so that we prevent elections while -// it is syncing from the primary. -const secondaryConfig = { - rsConfig: {votes: 0, priority: 0} -}; -const secondary = reInitiateSetWithSecondary(replSet, secondaryConfig); - -// Update and remove document on primary. -updateRemove(coll, {_id: 0}); - -turnOffHangBeforeCopyingDatabasesFailPoint(secondary); - -// Re-insert deleted document on the sync source. The secondary should be able to fetch and -// insert this document after failing to apply the udpate. -assert.commandWorked(coll.insert({_id: 0, x: 3})); - -var res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1})); -assert.eq(res.initialSyncStatus.fetchedMissingDocs, 0); -var firstOplogEnd = res.initialSyncStatus.initialSyncOplogEnd; - -// Temporarily increase log levels so that we can see the 'Inserted missing document' log line. -secondary.getDB('test').setLogLevel(1, 'replication'); -turnOffHangBeforeGettingMissingDocFailPoint(primary, secondary, name, 1 /* numFetched */); -secondary.getDB('test').setLogLevel(0, 'replication'); - -finishAndValidate(replSet, name, firstOplogEnd, 1 /* numFetched */, 1 /* numDocuments */); - -replSet.stopSet(); -})(); diff --git a/jstests/replsets/initial_sync_update_missing_doc3.js b/jstests/replsets/initial_sync_update_missing_doc3.js deleted file mode 100644 index d03e672ebec..00000000000 --- a/jstests/replsets/initial_sync_update_missing_doc3.js +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Initial sync runs in several phases - the first 3 are as follows: - * 1) fetches the last oplog entry (op_start1) on the source; - * 2) copies all non-local databases from the source; and - * 3) fetches and applies operations from the source after op_start1. - * - * This test updates and deletes a document on the source between phases 1 and 2. The - * secondary will initially fail to apply the update operation in phase 3 and subsequently have - * to attempt to check the source for a new copy of the document. Before the secondary checks the - * source, we insert a new copy of the document into this collection on the source and mark the - * collection on which the document to be fetched resides as drop pending, thus effectively - * renaming the collection but preserving the UUID. This ensures the secondary fetches the - * document by UUID rather than namespace. - */ - -(function() { -load("jstests/libs/check_log.js"); -load("jstests/replsets/libs/initial_sync_update_missing_doc.js"); -load("jstests/replsets/libs/two_phase_drops.js"); // For TwoPhaseDropCollectionTest. - -var name = 'initial_sync_update_missing_doc3'; -var replSet = new ReplSetTest({ - name: name, - nodes: 1, -}); - -replSet.startSet(); -replSet.initiate(); -const primary = replSet.getPrimary(); -const dbName = 'test'; - -// Check for 'system.drop' two phase drop support. -if (!TwoPhaseDropCollectionTest.supportsDropPendingNamespaces(replSet)) { - jsTestLog('Drop pending namespaces not supported by storage engine. Skipping test.'); - replSet.stopSet(); - return; -} - -var coll = primary.getDB(dbName).getCollection(name); -assert.commandWorked(coll.insert({_id: 0, x: 1})); - -// Add a secondary node with priority: 0 so that we prevent elections while it is syncing -// from the primary. -// We cannot give the secondary votes: 0 because then it will not be able to acknowledge -// majority writes. That means the sync source can immediately drop it's collection -// because it alone determines the majority commit point. -const secondaryConfig = { - rsConfig: {priority: 0} -}; -const secondary = reInitiateSetWithSecondary(replSet, secondaryConfig); - -// Update and remove document on primary. -updateRemove(coll, {_id: 0}); - -turnOffHangBeforeCopyingDatabasesFailPoint(secondary); - -// Re-insert deleted document. -assert.commandWorked(coll.insert({_id: 0, x: 3})); -// Mark the collection as drop pending so it gets renamed, but retains the UUID. -assert.commandWorked(primary.getDB('test').runCommand({"drop": name})); - -var res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1})); -assert.eq(res.initialSyncStatus.fetchedMissingDocs, 0); - -secondary.getDB('test').setLogLevel(1, 'replication'); -turnOffHangBeforeGettingMissingDocFailPoint(primary, secondary, name, 1 /* numFetched */); -secondary.getDB('test').setLogLevel(0, 'replication'); - -replSet.awaitReplication(); -replSet.awaitSecondaryNodes(); - -replSet.stopSet(); -})(); diff --git a/jstests/replsets/initial_sync_update_missing_doc_upsert.js b/jstests/replsets/initial_sync_update_missing_doc_upsert.js index 636244965dd..9fe203fef67 100644 --- a/jstests/replsets/initial_sync_update_missing_doc_upsert.js +++ b/jstests/replsets/initial_sync_update_missing_doc_upsert.js @@ -4,29 +4,26 @@ * 2) copies all non-local databases from the source; and * 3) fetches and applies operations from the source after op_start1. * - * This test upserts documents with both the "update" and "applyOps" - * commands on the source between phases 2 and 3; these should be treated - * as inserts on the syncing node. + * This test upserts documents with both the "update" and "applyOps" commands on the source between + * phases 2 and 3; these operations should be treated as inserts on the syncing node and applied + * successfully. */ (function() { load("jstests/replsets/libs/initial_sync_update_missing_doc.js"); load("jstests/libs/check_log.js"); -var name = 'initial_sync_update_missing_doc_upsert'; -var replSet = new ReplSetTest({ - name: name, - nodes: 1, -}); +const replSet = new ReplSetTest({nodes: 1}); replSet.startSet(); replSet.initiate(); const primary = replSet.getPrimary(); const dbName = 'test'; +const collectionName = jsTestName(); -assert.commandWorked(primary.getDB(dbName).createCollection(name)); -const coll = primary.getDB(dbName).getCollection(name); +assert.commandWorked(primary.getDB(dbName).createCollection(collectionName)); +const coll = primary.getDB(dbName).getCollection(collectionName); // Add a secondary node with priority: 0 and votes: 0 so that we prevent elections while // it is syncing from the primary. @@ -85,8 +82,7 @@ for (let alwaysUpsert of [null, true]) { /* The interesting scenario for alwaysUpsert: false is if the document is deleted on the primary * after updating. When the secondary attempts to apply the oplog entry during initial sync, - * it will fail to update, and fail to fetch the missing document. Ensure that initial sync - * proceeds anyway. + * it will fail to update. Ensure that initial sync proceeds anyway. */ for (let allowAtomic of [null, true, false]) { coll.insertOne({_id: documentIdCounter}); @@ -99,19 +95,9 @@ for (let allowAtomic of [null, true, false]) { jsTestLog("Allow initial sync to finish fetching and replaying oplog"); assert.commandWorked(secondary.getDB('admin').runCommand( - {configureFailPoint: 'initialSyncHangBeforeGettingMissingDocument', mode: 'off'})); -assert.commandWorked(secondary.getDB('admin').runCommand( {configureFailPoint: 'initialSyncHangAfterDataCloning', mode: 'off'})); -assert.commandWorked(secondary.getDB('admin').runCommand( - {configureFailPoint: 'initialSyncHangBeforeGettingMissingDocument', mode: 'off'})); - -checkLog.contains(secondary, 'initial sync done'); - -const replStatus = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1})); -const firstOplogEnd = replStatus.initialSyncStatus.initialSyncOplogEnd; -// *No* missing documents were fetched. -finishAndValidate(replSet, name, firstOplogEnd, 0 /* numFetched */, numDocuments); +finishAndValidate(replSet, collectionName, numDocuments); replSet.stopSet(); })(); diff --git a/jstests/replsets/initial_sync_update_missing_doc_with_prepare.js b/jstests/replsets/initial_sync_update_missing_doc_with_prepare.js index d4143b50148..2f86c8d5454 100644 --- a/jstests/replsets/initial_sync_update_missing_doc_with_prepare.js +++ b/jstests/replsets/initial_sync_update_missing_doc_with_prepare.js @@ -5,10 +5,8 @@ * 3) fetches and applies operations from the source after op_start1. * * This test updates and deletes a document on the source between phases 1 and 2 in a prepared - * transaction. The secondary will initially fail to apply the update operation in phase 3 and - * subsequently have to attempt to check the source for a new copy of the document. The absence of - * the document on the source indicates that the source is free to ignore the failed update - * operation. + * transaction. The secondary will fail to apply the update operation in phase 3 but initial sync + * completes nevertheless. * * @tags: [uses_transactions, uses_prepare_transaction] */ @@ -19,18 +17,14 @@ load("jstests/replsets/libs/initial_sync_update_missing_doc.js"); load("jstests/libs/check_log.js"); function doTest(doTransactionWork, numDocuments) { - const name = 'initial_sync_update_missing_doc_with_prepare'; - const replSet = new ReplSetTest({ - name: name, - nodes: 1, - }); + const replSet = new ReplSetTest({nodes: 1}); replSet.startSet(); replSet.initiate(); const primary = replSet.getPrimary(); const dbName = 'test'; - - var coll = primary.getDB(dbName).getCollection(name); + const collectionName = jsTestName(); + let coll = primary.getDB(dbName).getCollection(collectionName); assert.commandWorked(coll.insert({_id: 0, x: 1})); assert.commandWorked(coll.insert({_id: 1, x: 1})); @@ -41,7 +35,7 @@ function doTest(doTransactionWork, numDocuments) { const session = primary.startSession(); const sessionDB = session.getDatabase(dbName); - const sessionColl = sessionDB.getCollection(name); + const sessionColl = sessionDB.getCollection(collectionName); session.startTransaction(); doTransactionWork(sessionColl, {_id: 0}); @@ -57,20 +51,14 @@ function doTest(doTransactionWork, numDocuments) { turnOffHangBeforeCopyingDatabasesFailPoint(secondary); - var res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1})); - assert.eq(res.initialSyncStatus.fetchedMissingDocs, 0); - var firstOplogEnd = res.initialSyncStatus.initialSyncOplogEnd; - - turnOffHangBeforeGettingMissingDocFailPoint(primary, secondary, name, 0 /* numInserted */); - // Since we aborted the second transaction, we expect this collection to still exist after // initial sync. - finishAndValidate(replSet, name, firstOplogEnd, 0 /* numInserted */, numDocuments); + finishAndValidate(replSet, collectionName, numDocuments); // Make sure the secondary has the correct documents after syncing from the primary. The // second document was deleted in the prepared transaction that was aborted. Therefore, it // should have been properly replication. - coll = secondary.getDB(dbName).getCollection(name); + coll = secondary.getDB(dbName).getCollection(collectionName); assert.docEq(null, coll.findOne({_id: 0}), 'document on secondary matches primary'); assert.docEq({_id: 1, x: 1}, coll.findOne({_id: 1}), 'document on secondary matches primary'); diff --git a/jstests/replsets/initial_sync_update_reinsert_missing_doc_with_prepare.js b/jstests/replsets/initial_sync_update_reinsert_missing_doc_with_prepare.js deleted file mode 100644 index 4bb87729af4..00000000000 --- a/jstests/replsets/initial_sync_update_reinsert_missing_doc_with_prepare.js +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Initial sync runs in several phases - the first 3 are as follows: - * 1) fetches the last oplog entry (op_start1) on the source; - * 2) copies all non-local databases from the source; and - * 3) fetches and applies operations from the source after op_start1. - * - * This test updates and deletes a document on the source between phases 1 and 2 in a prepared - * transaction. The secondary will initially fail to apply the update operation in phase 3 and - * subsequently have to attempt to check the source for a new copy of the document. Before the - * secondary checks the source, we insert a new copy of the document on the source so that the - * secondary can fetch it. - * - * @tags: [uses_transactions, uses_prepare_transaction] - */ - -(function() { -load("jstests/core/txns/libs/prepare_helpers.js"); -load("jstests/replsets/libs/initial_sync_update_missing_doc.js"); -load("jstests/libs/check_log.js"); - -function doTest(doTransactionWork, numDocuments) { - const name = 'initial_sync_update_missing_doc_with_prepare'; - const replSet = new ReplSetTest({ - name: name, - nodes: 1, - }); - - replSet.startSet(); - replSet.initiate(); - const primary = replSet.getPrimary(); - const dbName = 'test'; - - const coll = primary.getDB(dbName).getCollection(name); - assert.commandWorked(coll.insert({_id: 0, x: 1})); - - // Add a secondary node with priority: 0 and votes: 0 so that we prevent elections while - // it is syncing from the primary. - const secondaryConfig = {rsConfig: {votes: 0, priority: 0}}; - const secondary = reInitiateSetWithSecondary(replSet, secondaryConfig); - - const session = primary.startSession(); - const sessionDB = session.getDatabase(dbName); - const sessionColl = sessionDB.getCollection(name); - - session.startTransaction(); - doTransactionWork(sessionColl, {_id: 0}); - const prepareTimestamp = PrepareHelpers.prepareTransaction(session); - assert.commandWorked(PrepareHelpers.commitTransaction(session, prepareTimestamp)); - - turnOffHangBeforeCopyingDatabasesFailPoint(secondary); - - // Re-insert deleted document on the sync source. The secondary should be able to fetch and - // insert this document after failing to apply the udpate. - assert.commandWorked(coll.insert({_id: 0, x: 3})); - - const res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1})); - assert.eq(res.initialSyncStatus.fetchedMissingDocs, 0); - const firstOplogEnd = res.initialSyncStatus.initialSyncOplogEnd; - - // Temporarily increase log levels so that we can see the 'Inserted missing document' log - // line. - secondary.getDB('test').setLogLevel(1, 'replication'); - turnOffHangBeforeGettingMissingDocFailPoint(primary, secondary, name, 1 /* numFetched */); - secondary.getDB('test').setLogLevel(0, 'replication'); - - finishAndValidate(replSet, name, firstOplogEnd, 1 /* numFetched */, numDocuments); - assert.docEq({_id: 0, x: 3}, coll.findOne({_id: 0}), 'document on secondary matches primary'); - - replSet.stopSet(); -} - -jsTestLog("Testing with prepared transaction"); -// Passing in a function to update and remove document on primary in a prepared transaction -// between phrase 1 and 2. Once the secondary receives the commit for the transaction, the -// secondary should apply each operation separately (one update, and one delete) during initial -// sync. -doTest(updateRemove, 1 /* numDocuments after initial sync */); - -jsTestLog("Testing with large prepared transaction"); -// Passing in a function to insert, update and remove large documents on primary in a large -// prepared transaction. Once the secondary receives the commit for the transaction, the -// secondary should apply each operation separately (one insert, one update, and one delete) -// during initial sync. -doTest(insertUpdateRemoveLarge, 2 /* numDocuments after initial sync */); -})(); diff --git a/jstests/replsets/libs/initial_sync_test.js b/jstests/replsets/libs/initial_sync_test.js index 7ec45729173..5a5b6a7bc56 100644 --- a/jstests/replsets/libs/initial_sync_test.js +++ b/jstests/replsets/libs/initial_sync_test.js @@ -280,4 +280,4 @@ function InitialSyncTest(name = "InitialSyncTest", replSet, timeout) { checkDataConsistency(); return replSet.stopSet(); }; -}
\ No newline at end of file +} diff --git a/jstests/replsets/libs/initial_sync_update_missing_doc.js b/jstests/replsets/libs/initial_sync_update_missing_doc.js index 91556719bad..666d463ad23 100644 --- a/jstests/replsets/libs/initial_sync_update_missing_doc.js +++ b/jstests/replsets/libs/initial_sync_update_missing_doc.js @@ -10,9 +10,8 @@ */ // reInitiate the replica set with a secondary node, which will go through initial sync. This -// function will hand the secondary in initial sync. turnOffHangBeforeCopyingDatabasesFailPoint -// must be called after reInitiateSetWithSecondary, followed by -// turnOffHangBeforeGettingMissingDocFailPoint. +// function will hang the secondary in initial sync. turnOffHangBeforeCopyingDatabasesFailPoint +// must be called after reInitiateSetWithSecondary. var reInitiateSetWithSecondary = function(replSet, secondaryConfig) { const secondary = replSet.add(secondaryConfig); secondary.setSlaveOk(); @@ -21,8 +20,6 @@ var reInitiateSetWithSecondary = function(replSet, secondaryConfig) { // copying databases. assert.commandWorked(secondary.getDB('admin').runCommand( {configureFailPoint: 'initialSyncHangBeforeCopyingDatabases', mode: 'alwaysOn'})); - assert.commandWorked(secondary.getDB('admin').runCommand( - {configureFailPoint: 'initialSyncHangBeforeGettingMissingDocument', mode: 'alwaysOn'})); // Skip clearing initial sync progress after a successful initial sync attempt so that we // can check initialSyncStatus fields after initial sync is complete. @@ -44,43 +41,9 @@ var reInitiateSetWithSecondary = function(replSet, secondaryConfig) { var turnOffHangBeforeCopyingDatabasesFailPoint = function(secondary) { assert.commandWorked(secondary.getDB('admin').runCommand( {configureFailPoint: 'initialSyncHangBeforeCopyingDatabases', mode: 'off'})); - - // The following checks assume that we have updated and deleted a document on the sync source - // that the secondary will try to update in phase 3. - checkLog.contains(secondary, 'update of non-mod failed'); - checkLog.contains(secondary, 'Fetching missing document'); - checkLog.contains( - secondary, 'initial sync - initialSyncHangBeforeGettingMissingDocument fail point enabled'); -}; - -// Must be called after turnOffHangBeforeCopyingDatabasesFailPoint. Turns off the -// initialSyncHangBeforeGettingMissingDocument fail point so that the secondary can check if the -// sync source has the missing document. -var turnOffHangBeforeGettingMissingDocFailPoint = function(primary, secondary, name, numFetched) { - if (numFetched === 0) { - // If we did not re-insert the missing document, insert an arbitrary document to move - // forward minValid even though the document was not found. - assert.commandWorked( - primary.getDB('test').getCollection(name + 'b').insert({_id: 1, y: 1})); - } - - assert.commandWorked(secondary.getDB('admin').runCommand( - {configureFailPoint: 'initialSyncHangBeforeGettingMissingDocument', mode: 'off'})); - - // If we've re-inserted the missing document between secondaryHangsBeforeGettingMissingDoc and - // this function, the secondary will insert the missing document after it fails the update. - // Otherwise, it will fail to fetch anything from the sync source because the document was - // deleted. - if (numFetched > 0) { - checkLog.contains(secondary, 'Inserted missing document'); - } else { - checkLog.contains( - secondary, 'Missing document not found on source; presumably deleted later in oplog.'); - } - checkLog.contains(secondary, 'initial sync done'); }; -var finishAndValidate = function(replSet, name, firstOplogEnd, numFetched, numDocuments) { +var finishAndValidate = function(replSet, name, numDocuments) { replSet.awaitReplication(); replSet.awaitSecondaryNodes(); const dbName = 'test'; @@ -95,24 +58,6 @@ secondary collection: ${tojson(secondaryCollection.find().toArray())}`); throw new Error('Did not sync collection'); } - const res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1})); - - // If we haven't re-inserted any documents after deleting them, the fetch count is 0 because we - // are unable to get the document from the sync source. - assert.eq(res.initialSyncStatus.fetchedMissingDocs, numFetched); - - const finalOplogEnd = res.initialSyncStatus.initialSyncOplogEnd; - - if (numFetched > 0) { - assert.neq(firstOplogEnd, - finalOplogEnd, - "minValid was not moved forward when missing document was fetched"); - } else { - assert.eq(firstOplogEnd, - finalOplogEnd, - "minValid was moved forward when missing document was not fetched"); - } - assert.eq(0, secondary.getDB('local')['temp_oplog_buffer'].find().itcount(), "Oplog buffer was not dropped after initial sync"); diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 5074ce190ec..ebd878cc05c 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -61,15 +61,13 @@ env.Library( ) env.Library( - target='oplogreader', + target='replication_auth', source=[ - 'oplogreader.cpp', + 'replication_auth.cpp', ], LIBDEPS_PRIVATE=[ - '$BUILD_DIR/mongo/base', - '$BUILD_DIR/mongo/client/clientdriver_network', '$BUILD_DIR/mongo/db/auth/authorization_manager_global', - '$BUILD_DIR/mongo/util/net/network', + '$BUILD_DIR/mongo/client/authentication', ], ) @@ -92,7 +90,6 @@ env.Library( LIBDEPS=[ 'oplog', 'oplog_interface_remote', - 'oplogreader', 'repl_coordinator_interface', '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/db/background', @@ -104,6 +101,9 @@ env.Library( '$BUILD_DIR/mongo/db/query_exec', '$BUILD_DIR/mongo/util/fail_point', ], + LIBDEPS_PRIVATE=[ + 'replication_auth', + ] ) env.Library( @@ -528,13 +528,13 @@ env.Library( 'oplog', 'oplog_application_interface', 'oplog_entry', - 'oplogreader', 'repl_coordinator_interface', 'repl_settings', 'storage_interface', ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/commands/mongod_fsync', + 'replication_auth', ], ) @@ -927,7 +927,6 @@ env.Library( ], LIBDEPS=[ 'task_runner', - 'oplogreader', '$BUILD_DIR/mongo/base', '$BUILD_DIR/mongo/client/fetcher', '$BUILD_DIR/mongo/client/remote_command_retry_scheduler', @@ -940,6 +939,7 @@ env.Library( ], LIBDEPS_PRIVATE=[ 'repl_server_parameters', + 'replication_auth', ], ) @@ -1118,7 +1118,6 @@ env.Library( '$BUILD_DIR/mongo/db/query_exec', "$BUILD_DIR/mongo/util/fail_point", 'oplog', - 'oplogreader', 'repl_coordinator_interface', 'repl_settings', 'replica_set_messages', @@ -1127,6 +1126,7 @@ env.Library( '$BUILD_DIR/mongo/db/commands/server_status', '$BUILD_DIR/mongo/db/stats/counters', '$BUILD_DIR/mongo/transport/message_compressor', + 'replication_auth', 'split_horizon', ], ) diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index a163be9a084..9bdf4acb8ff 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -42,8 +42,8 @@ #include "mongo/db/catalog/collection_options.h" #include "mongo/db/namespace_string.h" #include "mongo/db/query/cursor_response.h" -#include "mongo/db/repl/oplogreader.h" #include "mongo/db/repl/repl_server_parameters_gen.h" +#include "mongo/db/repl/replication_auth.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/storage_interface_mock.h" #include "mongo/rpc/get_status_from_command_result.h" diff --git a/src/mongo/db/repl/initial_sync_state.h b/src/mongo/db/repl/initial_sync_state.h index 09673458a67..d5957bf2c42 100644 --- a/src/mongo/db/repl/initial_sync_state.h +++ b/src/mongo/db/repl/initial_sync_state.h @@ -59,7 +59,6 @@ struct InitialSyncState { // oplog entry. Timestamp stopTimestamp; // Referred to as minvalid, or the place we can transition states. Timer timer; // Timer for timing how long each initial sync attempt takes. - size_t fetchedMissingDocs = 0; size_t appliedOps = 0; }; diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index d4696f7f92a..a0c250ff58c 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -88,10 +88,6 @@ MONGO_FAIL_POINT_DEFINE(initialSyncHangBeforeCopyingDatabases); // Failpoint which causes the initial sync function to hang before finishing. MONGO_FAIL_POINT_DEFINE(initialSyncHangBeforeFinish); -// Failpoint which causes the initial sync function to hang before calling shouldRetry on a failed -// operation. -MONGO_FAIL_POINT_DEFINE(initialSyncHangBeforeGettingMissingDocument); - // Failpoint which causes the initial sync function to hang before creating the oplog. MONGO_FAIL_POINT_DEFINE(initialSyncHangBeforeCreatingOplog); @@ -175,24 +171,6 @@ StatusWith<OpTimeAndWallTime> parseOpTimeAndWallTime(const QueryResponseStatus& return result; } -/** - * OplogApplier observer that updates 'fetchCount' when applying operations for each writer thread. - */ -class InitialSyncApplyObserver : public OplogApplier::Observer { -public: - explicit InitialSyncApplyObserver(AtomicWord<unsigned>* fetchCount) : _fetchCount(fetchCount) {} - - // OplogApplier::Observer functions - void onBatchBegin(const OplogApplier::Operations&) final {} - void onBatchEnd(const StatusWith<OpTime>&, const OplogApplier::Operations&) final {} - void onMissingDocumentsFetchedAndInserted(const std::vector<FetchInfo>& docs) final { - _fetchCount->fetchAndAdd(docs.size()); - } - -private: - AtomicWord<unsigned>* const _fetchCount; -}; - } // namespace InitialSyncer::InitialSyncer( @@ -209,8 +187,7 @@ InitialSyncer::InitialSyncer( _writerPool(writerPool), _storage(storage), _replicationProcess(replicationProcess), - _onCompletion(onCompletion), - _observer(std::make_unique<InitialSyncApplyObserver>(&_fetchCount)) { + _onCompletion(onCompletion) { uassert(ErrorCodes::BadValue, "task executor cannot be null", _exec); uassert(ErrorCodes::BadValue, "invalid storage interface", _storage); uassert(ErrorCodes::BadValue, "invalid replication process", _replicationProcess); @@ -370,7 +347,6 @@ void InitialSyncer::_appendInitialSyncProgressMinimal_inlock(BSONObjBuilder* bob if (!_initialSyncState) { return; } - bob->appendNumber("fetchedMissingDocs", _initialSyncState->fetchedMissingDocs); bob->appendNumber("appliedOps", _initialSyncState->appliedOps); if (!_initialSyncState->beginApplyingTimestamp.isNull()) { bob->append("initialSyncOplogStart", _initialSyncState->beginApplyingTimestamp); @@ -908,10 +884,13 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> auto consistencyMarkers = _replicationProcess->getConsistencyMarkers(); OplogApplier::Options options(OplogApplication::Mode::kInitialSync); options.allowNamespaceNotFoundErrorsOnCrudOps = true; - options.missingDocumentSourceForInitialSync = _syncSource; options.beginApplyingOpTime = lastOpTime; - _oplogApplier = _dataReplicatorExternalState->makeOplogApplier( - _oplogBuffer.get(), _observer.get(), consistencyMarkers, _storage, options, _writerPool); + _oplogApplier = _dataReplicatorExternalState->makeOplogApplier(_oplogBuffer.get(), + &noopOplogApplierObserver, + consistencyMarkers, + _storage, + options, + _writerPool); const auto beginApplyingTimestamp = lastOpTime.getTimestamp(); _initialSyncState->beginApplyingTimestamp = beginApplyingTimestamp; @@ -1289,56 +1268,6 @@ void InitialSyncer::_multiApplierCallback(const Status& multiApplierStatus, auto opCtx = makeOpCtx(); const bool orderedCommit = true; _storage->oplogDiskLocRegister(opCtx.get(), lastAppliedOpTime.getTimestamp(), orderedCommit); - - auto fetchCount = _fetchCount.load(); - if (fetchCount > 0) { - _initialSyncState->fetchedMissingDocs += fetchCount; - _fetchCount.store(0); - status = _scheduleLastOplogEntryFetcher_inlock( - [=](const StatusWith<mongo::Fetcher::QueryResponse>& response, - mongo::Fetcher::NextAction*, - mongo::BSONObjBuilder*) { - return _lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments( - response, onCompletionGuard); - }); - if (!status.isOK()) { - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); - return; - } - return; - } - - _checkApplierProgressAndScheduleGetNextApplierBatch_inlock(lock, onCompletionGuard); -} - -void InitialSyncer::_lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments( - const StatusWith<Fetcher::QueryResponse>& result, - std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - stdx::lock_guard<stdx::mutex> lock(_mutex); - auto status = _checkForShutdownAndConvertStatus_inlock( - result.getStatus(), "error getting last oplog entry after fetching missing documents"); - if (!status.isOK()) { - error() << "Failed to get new minValid from source " << _syncSource << " due to '" - << redact(status) << "'"; - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); - return; - } - - auto&& optimeStatus = parseOpTimeAndWallTime(result); - if (!optimeStatus.isOK()) { - error() << "Failed to parse new minValid from source " << _syncSource << " due to '" - << redact(optimeStatus.getStatus()) << "'"; - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, optimeStatus.getStatus()); - return; - } - auto&& optime = optimeStatus.getValue().opTime; - - const auto newOplogEnd = optime.getTimestamp(); - LOG(2) << "Pushing back minValid from " << _initialSyncState->stopTimestamp << " to " - << newOplogEnd; - _initialSyncState->stopTimestamp = newOplogEnd; - - // Get another batch to apply. _checkApplierProgressAndScheduleGetNextApplierBatch_inlock(lock, onCompletionGuard); } diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h index 9021970cfcb..414de99a576 100644 --- a/src/mongo/db/repl/initial_syncer.h +++ b/src/mongo/db/repl/initial_syncer.h @@ -68,10 +68,6 @@ MONGO_FAIL_POINT_DECLARE(failInitSyncWithBufferedEntriesLeft); // Failpoint which causes the initial sync function to hang before copying databases. MONGO_FAIL_POINT_DECLARE(initialSyncHangBeforeCopyingDatabases); -// Failpoint which causes the initial sync function to hang before calling shouldRetry on a failed -// operation. -MONGO_FAIL_POINT_DECLARE(initialSyncHangBeforeGettingMissingDocument); - // Failpoint which stops the applier. MONGO_FAIL_POINT_DECLARE(rsSyncApplyStop); @@ -94,11 +90,9 @@ struct InitialSyncerOptions { /** Function to sets this node into a specific follower mode. */ using SetFollowerModeFn = std::function<bool(const MemberState&)>; - // Error and retry values + // Retry values Milliseconds syncSourceRetryWait{1000}; Milliseconds initialSyncRetryWait{1000}; - Seconds blacklistSyncSourcePenaltyForNetworkConnectionError{10}; - Minutes blacklistSyncSourcePenaltyForOplogStartMissing{10}; // InitialSyncer waits this long before retrying getApplierBatchCallback() if there are // currently no operations available to apply or if the 'rsSyncApplyStop' failpoint is active. @@ -326,28 +320,19 @@ private: * | (no ops to apply) | | (have ops to apply) * | | | * | | V - * | | _getNextApplierBatchCallback()<-----+ - * | | | ^ | - * | | | | | - * | | | (no docs fetched | | - * | | | and end ts not | | - * | | | reached) | | - * | | | | | - * | | V | | - * | | _multiApplierCallback()-----+ | - * | | | | | - * | | | | | - * | | | | (docs fetched) | (end ts not - * | | | | | reached) - * | | | V | - * | | | _lastOplogEntryFetcherCallbackAfter- - * | | | FetchingMissingDocuments() - * | | | | - * | | | | - * | (reached end timestamp) - * | | | | - * | V V V - * | _rollbackCheckerCheckForRollbackCallback() + * | | _getNextApplierBatchCallback() + * | | | ^ + * | | | | + * | | | (end ts not reached) + * | | | | + * | | V | + * | | _multiApplierCallback()-----+ + * | | | + * | | | + * | (reached end timestamp) + * | | | + * | V V + * | _rollbackCheckerCheckForRollbackCallback() * | | * | | * +------------------------------+ @@ -474,16 +459,6 @@ private: std::shared_ptr<OnCompletionGuard> onCompletionGuard); /** - * Callback for third '_lastOplogEntryFetcher' callback. This is scheduled after MultiApplier - * completed successfully and missing documents were fetched from the sync source while - * DataReplicatorExternalState::_multiApply() was processing operations. - * This callback will update InitialSyncState::stopTimestamp on success. - */ - void _lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments( - const StatusWith<Fetcher::QueryResponse>& result, - std::shared_ptr<OnCompletionGuard> onCompletionGuard); - - /** * Callback for rollback checker's last replSetGetRBID command after cloning data and applying * operations. */ @@ -650,7 +625,6 @@ private: OpTime _lastFetched; // (MX) OpTimeAndWallTime _lastApplied; // (MX) std::unique_ptr<OplogBuffer> _oplogBuffer; // (M) - std::unique_ptr<OplogApplier::Observer> _observer; // (S) std::unique_ptr<OplogApplier> _oplogApplier; // (M) // Used to signal changes in _state. diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 820c4147fa4..47d4283b03b 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -3948,118 +3948,6 @@ TEST_F(InitialSyncerTest, ASSERT_EQUALS(lastOp.getWallClockTime().get(), _lastApplied.getValue().wallTime); } -TEST_F( - InitialSyncerTest, - InitialSyncerSchedulesLastOplogEntryFetcherToGetNewStopTimestampIfMissingDocumentsHaveBeenFetchedDuringMultiInitialSyncApply) { - // Skip reconstructing prepared transactions at the end of initial sync because - // InitialSyncerTest does not construct ServiceEntryPoint and this causes a segmentation fault - // when reconstructPreparedTransactions uses DBDirectClient to call into ServiceEntryPoint. - FailPointEnableBlock skipReconstructPreparedTransactions("skipReconstructPreparedTransactions"); - - // Skip clearing initial sync progress so that we can check if missing documents have been - // fetched after the initial sync attempt. - FailPointEnableBlock skipClearInitialSyncState("skipClearInitialSyncState"); - - auto initialSyncer = &getInitialSyncer(); - auto opCtx = makeOpCtx(); - - // Override DataReplicatorExternalState::_multiApply() so that it will also fetch a missing - // document. - // This forces InitialSyncer to evaluate its end timestamp for applying operations after each - // batch. - bool fetchCountIncremented = false; - getExternalState()->multiApplyFn = [&fetchCountIncremented](OperationContext* opCtx, - const MultiApplier::Operations& ops, - OplogApplier::Observer* observer) { - if (!fetchCountIncremented) { - auto entry = makeOplogEntry(1); - observer->onMissingDocumentsFetchedAndInserted({std::make_pair(entry, BSONObj())}); - fetchCountIncremented = true; - } - return ops.back().getOpTime(); - }; - - _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); - ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts)); - - // Use command for third and last operation to ensure we have two batches to apply. - auto lastOp = makeOplogEntry(3, OpTypeEnum::kCommand); - - auto net = getNet(); - int baseRollbackId = 1; - { - executor::NetworkInterfaceMock::InNetworkGuard guard(net); - - // Base rollback ID. - net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); - - // Send an empty optime as the response to the beginFetchingOptime find request, which will - // cause the beginFetchingTimestamp to be the same as the beginApplyingTimestamp. - auto request = net->scheduleSuccessfulResponse( - makeCursorResponse(0LL, NamespaceString::kSessionTransactionsTableNamespace, {}, true)); - assertRemoteCommandNameEquals("find", request); - net->runReadyNetworkOperations(); - - // Last oplog entry. - processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(1)}); - - // Feature Compatibility Version. - processSuccessfulFCVFetcherResponse40(); - - // Quickest path to a successful DatabasesCloner completion is to respond to the - // listDatabases with an empty list of database names. - assertRemoteCommandNameEquals( - "listDatabases", net->scheduleSuccessfulResponse(makeListDatabasesResponse({}))); - net->runReadyNetworkOperations(); - - // OplogFetcher's oplog tailing query. Response has enough operations to reach - // end timestamp. - request = net->scheduleSuccessfulResponse( - makeCursorResponse(1LL, - _options.localOplogNS, - {makeOplogEntryObj(1), makeOplogEntryObj(2), lastOp.toBSON()})); - assertRemoteCommandNameEquals("find", request); - ASSERT_TRUE(request.cmdObj.getBoolField("oplogReplay")); - net->runReadyNetworkOperations(); - - // Second last oplog entry fetcher. - // Send oplog entry with timestamp 2. InitialSyncer will update this end timestamp after - // applying the first batch. - processSuccessfulLastOplogEntryFetcherResponse({makeOplogEntryObj(2)}); - - // Black hole OplogFetcher's getMore request. - auto noi = net->getNextReadyRequest(); - request = noi->getRequest(); - assertRemoteCommandNameEquals("getMore", request); - net->blackHole(noi); - - // Third last oplog entry fetcher. - processSuccessfulLastOplogEntryFetcherResponse({lastOp.toBSON()}); - - // Last rollback ID. - request = net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(baseRollbackId)); - assertRemoteCommandNameEquals("replSetGetRBID", request); - net->runReadyNetworkOperations(); - - // _multiApplierCallback() will cancel the _getNextApplierBatchCallback() task after setting - // the completion status. - // We call runReadyNetworkOperations() again to deliver the cancellation status to - // _oplogFetcherCallback(). - net->runReadyNetworkOperations(); - } - - initialSyncer->join(); - ASSERT_OK(_lastApplied.getStatus()); - ASSERT_EQUALS(lastOp.getOpTime(), _lastApplied.getValue().opTime); - ASSERT_EQUALS(lastOp.getWallClockTime().get(), _lastApplied.getValue().wallTime); - - ASSERT_TRUE(fetchCountIncremented); - - auto progress = initialSyncer->getInitialSyncProgress(); - log() << "Progress after initial sync attempt: " << progress; - ASSERT_EQUALS(1, progress.getIntField("fetchedMissingDocs")) << progress; -} - TEST_F(InitialSyncerTest, InitialSyncerReturnsInvalidSyncSourceWhenFailInitialSyncWithBadHostFailpointIsEnabled) { auto initialSyncer = &getInitialSyncer(); @@ -4183,13 +4071,12 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { auto progress = initialSyncer->getInitialSyncProgress(); log() << "Progress after first failed response: " << progress; - ASSERT_EQUALS(progress.nFields(), 8) << progress; + ASSERT_EQUALS(progress.nFields(), 7) << progress; ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 0) << progress; ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress; ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress; ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress; ASSERT_BSONOBJ_EQ(progress.getObjectField("initialSyncAttempts"), BSONObj()); - ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress; ASSERT_EQUALS(progress.getIntField("appliedOps"), 0) << progress; ASSERT_BSONOBJ_EQ(progress.getObjectField("databases"), BSON("databasesCloned" << 0)); @@ -4239,12 +4126,11 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { progress = initialSyncer->getInitialSyncProgress(); log() << "Progress after failure: " << progress; - ASSERT_EQUALS(progress.nFields(), 8) << progress; + ASSERT_EQUALS(progress.nFields(), 7) << progress; ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress; ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress; ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress; ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress; - ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress; ASSERT_EQUALS(progress.getIntField("appliedOps"), 0) << progress; ASSERT_BSONOBJ_EQ(progress.getObjectField("databases"), BSON("databasesCloned" << 0)); @@ -4327,13 +4213,12 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { progress = initialSyncer->getInitialSyncProgress(); log() << "Progress after all but last successful response: " << progress; - ASSERT_EQUALS(progress.nFields(), 9) << progress; + ASSERT_EQUALS(progress.nFields(), 8) << progress; ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress; ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress; ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress; ASSERT_EQUALS(progress["initialSyncOplogEnd"].timestamp(), Timestamp(7, 1)) << progress; ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress; - ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress; // Expected applied ops to be a superset of this range: Timestamp(2,1) ... Timestamp(7,1). ASSERT_GREATER_THAN_OR_EQUALS(progress.getIntField("appliedOps"), 6) << progress; auto databasesProgress = progress.getObjectField("databases"); @@ -4389,7 +4274,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { progress = initialSyncer->getInitialSyncProgress(); log() << "Progress at end: " << progress; - ASSERT_EQUALS(progress.nFields(), 11) << progress; + ASSERT_EQUALS(progress.nFields(), 10) << progress; ASSERT_EQUALS(progress.getIntField("failedInitialSyncAttempts"), 1) << progress; ASSERT_EQUALS(progress.getIntField("maxFailedInitialSyncAttempts"), 2) << progress; ASSERT_EQUALS(progress["initialSyncStart"].type(), Date) << progress; @@ -4397,7 +4282,6 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { ASSERT_EQUALS(progress["initialSyncOplogStart"].timestamp(), Timestamp(1, 1)) << progress; ASSERT_EQUALS(progress["initialSyncOplogEnd"].timestamp(), Timestamp(7, 1)) << progress; ASSERT_EQUALS(progress["initialSyncElapsedMillis"].type(), NumberInt) << progress; - ASSERT_EQUALS(progress.getIntField("fetchedMissingDocs"), 0) << progress; // Expected applied ops to be a superset of this range: Timestamp(2,1) ... Timestamp(7,1). ASSERT_GREATER_THAN_OR_EQUALS(progress.getIntField("appliedOps"), 6) << progress; diff --git a/src/mongo/db/repl/oplog_applier.cpp b/src/mongo/db/repl/oplog_applier.cpp index 63f883e8e31..3a2a3f0fe97 100644 --- a/src/mongo/db/repl/oplog_applier.cpp +++ b/src/mongo/db/repl/oplog_applier.cpp @@ -44,6 +44,8 @@ namespace mongo { namespace repl { +NoopOplogApplierObserver noopOplogApplierObserver; + using CallbackArgs = executor::TaskExecutor::CallbackArgs; // static diff --git a/src/mongo/db/repl/oplog_applier.h b/src/mongo/db/repl/oplog_applier.h index e9a7daf26fc..7f5c492803f 100644 --- a/src/mongo/db/repl/oplog_applier.h +++ b/src/mongo/db/repl/oplog_applier.h @@ -71,10 +71,6 @@ public: bool relaxUniqueIndexConstraints = false; bool skipWritesToOplog = false; - // For initial sync only. If an update fails, the missing document is fetched from - // this sync source to insert into the local collection. - boost::optional<HostAndPort> missingDocumentSourceForInitialSync; - // Used to determine which operations should be applied. Only initial sync will set this to // be something other than the null optime. OpTime beginApplyingOpTime = OpTime(); @@ -273,18 +269,15 @@ public: */ virtual void onBatchEnd(const StatusWith<OpTime>& lastOpTimeApplied, const OplogApplier::Operations& operations) = 0; +}; - /** - * Called when documents are fetched and inserted into the collection in order to - * apply an update operation. - * Applies to initial sync only. - * - * TODO: Delegate fetching behavior to OplogApplier owner. - */ - using FetchInfo = std::pair<OplogEntry, BSONObj>; - virtual void onMissingDocumentsFetchedAndInserted( - const std::vector<FetchInfo>& documentsFetchedAndInserted) = 0; +class NoopOplogApplierObserver : public repl::OplogApplier::Observer { +public: + void onBatchBegin(const repl::OplogApplier::Operations&) final {} + void onBatchEnd(const StatusWith<repl::OpTime>&, const repl::OplogApplier::Operations&) final {} }; +extern NoopOplogApplierObserver noopOplogApplierObserver; + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/oplogreader.h b/src/mongo/db/repl/oplogreader.h deleted file mode 100644 index dca256bf871..00000000000 --- a/src/mongo/db/repl/oplogreader.h +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - - -#pragma once - -#include "mongo/client/dbclient_connection.h" -#include "mongo/client/dbclient_cursor.h" -#include "mongo/util/net/hostandport.h" - -namespace mongo { -namespace repl { - -/** - * Authenticates conn using the server's cluster-membership credentials. - * - * Returns true on successful authentication. - */ -bool replAuthenticate(DBClientBase* conn); - -/* started abstracting out the querying of the primary/master's oplog - still fairly awkward but a start. -*/ - -class OplogReader { -private: - std::shared_ptr<DBClientConnection> _conn; - std::shared_ptr<DBClientCursor> cursor; - int _tailingQueryOptions; - - // If _conn was actively connected, _host represents the current HostAndPort of the - // connection. - HostAndPort _host; - -public: - OplogReader(); - ~OplogReader() {} - void resetCursor() { - cursor.reset(); - } - void resetConnection() { - cursor.reset(); - _conn.reset(); - _host = HostAndPort(); - } - DBClientConnection* conn() { - return _conn.get(); - } - BSONObj findOne(const char* ns, const Query& q) { - return conn()->findOne(ns, q, nullptr, QueryOption_SlaveOk); - } - BSONObj findOneByUUID(const std::string& db, UUID uuid, const BSONObj& filter) { - // Note that the findOneByUUID() function of DBClient passes SlaveOK to the client. - BSONObj foundDoc; - std::tie(foundDoc, std::ignore) = conn()->findOneByUUID(db, uuid, filter); - return foundDoc; - } - - /* SO_TIMEOUT (send/recv time out) for our DBClientConnections */ - static const Seconds kSocketTimeout; - - /* ok to call if already connected */ - bool connect(const HostAndPort& host); - - void tailCheck(); - - bool haveCursor() { - return cursor.get() != nullptr; - } - - void tailingQuery(const char* ns, const BSONObj& query); - - bool more() { - uassert(15910, "Doesn't have cursor for reading oplog", cursor.get()); - return cursor->more(); - } - - bool moreInCurrentBatch() { - uassert(15911, "Doesn't have cursor for reading oplog", cursor.get()); - return cursor->moreInCurrentBatch(); - } - - BSONObj nextSafe() { - return cursor->nextSafe(); - } -}; - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/replication_auth.cpp index 17811adcf9a..515dbf665f8 100644 --- a/src/mongo/db/repl/oplogreader.cpp +++ b/src/mongo/db/repl/replication_auth.cpp @@ -1,5 +1,5 @@ /** - * Copyright (C) 2018-present MongoDB, Inc. + * Copyright (C) 2019-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -27,27 +27,20 @@ * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication - #include "mongo/platform/basic.h" -#include "mongo/db/repl/oplogreader.h" +#include "mongo/db/repl/replication_auth.h" #include <string> #include "mongo/client/authenticate.h" #include "mongo/db/auth/authorization_manager.h" -#include "mongo/db/auth/authorization_session.h" -#include "mongo/executor/network_interface.h" -#include "mongo/util/log.h" namespace mongo { namespace repl { namespace { // Gets the singleton AuthorizationManager object for this server process -// -// TODO (SERVER-37563): Pass the service context instead of calling getGlobalServiceContext. AuthorizationManager* getGlobalAuthorizationManager() { AuthorizationManager* globalAuthManager = AuthorizationManager::get(getGlobalServiceContext()); fassert(16842, globalAuthManager != nullptr); @@ -64,47 +57,5 @@ bool replAuthenticate(DBClientBase* conn) { return true; } -const Seconds OplogReader::kSocketTimeout(30); - -OplogReader::OplogReader() { - _tailingQueryOptions = QueryOption_SlaveOk; - _tailingQueryOptions |= QueryOption_CursorTailable | QueryOption_OplogReplay; - - /* TODO: slaveOk maybe shouldn't use? */ - _tailingQueryOptions |= QueryOption_AwaitData; -} - -bool OplogReader::connect(const HostAndPort& host) { - if (conn() == nullptr || _host != host) { - resetConnection(); - _conn = std::shared_ptr<DBClientConnection>( - new DBClientConnection(false, durationCount<Seconds>(kSocketTimeout))); - - std::string errmsg; - if (!_conn->connect(host, StringData(), errmsg) || !replAuthenticate(_conn.get())) { - resetConnection(); - error() << errmsg; - return false; - } - _conn->setTags(transport::Session::kKeepOpen); - _host = host; - } - return true; -} - -void OplogReader::tailCheck() { - if (cursor.get() && cursor->isDead()) { - log() << "old cursor isDead, will initiate a new one" << std::endl; - resetCursor(); - } -} - -void OplogReader::tailingQuery(const char* ns, const BSONObj& query) { - verify(!haveCursor()); - LOG(2) << ns << ".find(" << redact(query) << ')'; - cursor.reset( - _conn->query(NamespaceString(ns), query, 0, 0, nullptr, _tailingQueryOptions).release()); -} - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/replication_auth.h b/src/mongo/db/repl/replication_auth.h new file mode 100644 index 00000000000..6866f6f51a1 --- /dev/null +++ b/src/mongo/db/repl/replication_auth.h @@ -0,0 +1,48 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + + +#pragma once + +#include "mongo/client/dbclient_connection.h" +#include "mongo/client/dbclient_cursor.h" +#include "mongo/util/net/hostandport.h" + +namespace mongo { +namespace repl { + +/** + * Authenticates conn using the server's cluster-membership credentials. + * + * Returns true on successful authentication. + */ +bool replAuthenticate(DBClientBase* conn); + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index a6106bd0370..69fd30eac81 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -133,13 +133,6 @@ ServerStatusMetricField<Counter64> displayBufferSize("repl.buffer.sizeBytes", &b ServerStatusMetricField<Counter64> displayBufferMaxSize("repl.buffer.maxSizeBytes", &bufferGauge.maxSize); -class NoopOplogApplierObserver : public repl::OplogApplier::Observer { -public: - void onBatchBegin(const repl::OplogApplier::Operations&) final {} - void onBatchEnd(const StatusWith<repl::OpTime>&, const repl::OplogApplier::Operations&) final {} - void onMissingDocumentsFetchedAndInserted(const std::vector<FetchInfo>&) final {} -} noopOplogApplierObserver; - /** * Returns new thread pool for thread pool task executor. */ @@ -222,8 +215,7 @@ void ReplicationCoordinatorExternalStateImpl::startSteadyStateReplication( // Using noop observer now that BackgroundSync no longer implements the OplogApplier::Observer // interface. During steady state replication, there is no need to log details on every batch - // we apply (recovery); or track missing documents that are fetched from the sync source - // (initial sync). + // we apply. _oplogApplier = std::make_unique<OplogApplierImpl>( _oplogApplierTaskExecutor.get(), _oplogBuffer.get(), diff --git a/src/mongo/db/repl/replication_info.cpp b/src/mongo/db/repl/replication_info.cpp index c20aa0c87d4..6e93691e17b 100644 --- a/src/mongo/db/repl/replication_info.cpp +++ b/src/mongo/db/repl/replication_info.cpp @@ -49,7 +49,7 @@ #include "mongo/db/query/internal_plans.h" #include "mongo/db/repl/is_master_response.h" #include "mongo/db/repl/oplog.h" -#include "mongo/db/repl/oplogreader.h" +#include "mongo/db/repl/replication_auth.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_process.h" #include "mongo/db/repl/storage_interface.h" diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp index 81aede4f6b0..999c79fb9af 100644 --- a/src/mongo/db/repl/replication_recovery.cpp +++ b/src/mongo/db/repl/replication_recovery.cpp @@ -87,7 +87,6 @@ public: } void onBatchEnd(const StatusWith<OpTime>&, const OplogApplier::Operations&) final {} - void onMissingDocumentsFetchedAndInserted(const std::vector<FetchInfo>&) final {} void complete(const OpTime& applyThroughOpTime) const { log() << "Applied " << _numOpsApplied << " operations in " << _numBatches diff --git a/src/mongo/db/repl/rollback_source_impl.cpp b/src/mongo/db/repl/rollback_source_impl.cpp index 7d893f0c4fb..329a75d8064 100644 --- a/src/mongo/db/repl/rollback_source_impl.cpp +++ b/src/mongo/db/repl/rollback_source_impl.cpp @@ -31,10 +31,11 @@ #include "mongo/db/repl/rollback_source_impl.h" +#include "mongo/client/dbclient_connection.h" #include "mongo/db/cloner.h" #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/repl/oplogreader.h" +#include "mongo/db/repl/replication_auth.h" #include "mongo/util/assert_util.h" #include "mongo/util/str.h" diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 17614f16d25..6d3fb15810b 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -65,9 +65,9 @@ #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/initial_syncer.h" #include "mongo/db/repl/multiapplier.h" -#include "mongo/db/repl/oplogreader.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_set_config.h" +#include "mongo/db/repl/replication_auth.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/transaction_oplog_application.h" #include "mongo/db/session.h" @@ -837,154 +837,6 @@ bool SyncTail::inShutdown() const { return _inShutdown; } -BSONObj SyncTail::getMissingDoc(OperationContext* opCtx, const OplogEntry& oplogEntry) { - OplogReader missingObjReader; // why are we using OplogReader to run a non-oplog query? - - if (MONGO_FAIL_POINT(initialSyncHangBeforeGettingMissingDocument)) { - log() << "initial sync - initialSyncHangBeforeGettingMissingDocument fail point enabled. " - "Blocking until fail point is disabled."; - while (MONGO_FAIL_POINT(initialSyncHangBeforeGettingMissingDocument)) { - mongo::sleepsecs(1); - } - } - - auto source = _options.missingDocumentSourceForInitialSync; - invariant(source); - - const int retryMax = 3; - for (int retryCount = 1; retryCount <= retryMax; ++retryCount) { - if (retryCount != 1) { - // if we are retrying, sleep a bit to let the network possibly recover - sleepsecs(retryCount * retryCount); - } - try { - bool ok = missingObjReader.connect(*source); - if (!ok) { - warning() << "network problem detected while connecting to the " - << "sync source, attempt " << retryCount << " of " << retryMax; - continue; // try again - } - } catch (const NetworkException&) { - warning() << "network problem detected while connecting to the " - << "sync source, attempt " << retryCount << " of " << retryMax; - continue; // try again - } - - // get _id from oplog entry to create query to fetch document. - const auto idElem = oplogEntry.getIdElement(); - - if (idElem.eoo()) { - severe() << "cannot fetch missing document without _id field: " - << redact(oplogEntry.toBSON()); - fassertFailedNoTrace(28742); - } - - BSONObj query = BSONObjBuilder().append(idElem).obj(); - BSONObj missingObj; - auto nss = oplogEntry.getNss(); - try { - auto uuid = oplogEntry.getUuid(); - if (!uuid) { - missingObj = missingObjReader.findOne(nss.ns().c_str(), query); - } else { - auto dbname = nss.db(); - // If a UUID exists for the command object, find the document by UUID. - missingObj = missingObjReader.findOneByUUID(dbname.toString(), *uuid, query); - } - } catch (const NetworkException&) { - warning() << "network problem detected while fetching a missing document from the " - << "sync source, attempt " << retryCount << " of " << retryMax; - continue; // try again - } catch (DBException& e) { - error() << "assertion fetching missing object: " << redact(e); - throw; - } - - // success! - return missingObj; - } - // retry count exceeded - msgasserted(15916, - str::stream() << "Can no longer connect to initial sync source: " - << source->toString()); -} - -void SyncTail::fetchAndInsertMissingDocument(OperationContext* opCtx, - const OplogEntry& oplogEntry) { - // Note that using the local UUID/NamespaceString mapping is sufficient for checking - // whether the collection is capped on the remote because convertToCapped creates a - // new collection with a different UUID. - const NamespaceString nss(parseUUIDOrNs(opCtx, oplogEntry)); - - { - // If the document is in a capped collection then it's okay for it to be missing. - AutoGetCollectionForRead autoColl(opCtx, nss); - Collection* const collection = autoColl.getCollection(); - if (collection && collection->isCapped()) { - log() << "Not fetching missing document in capped collection (" << nss << ")"; - return; - } - } - - log() << "Fetching missing document: " << redact(oplogEntry.toBSON()); - BSONObj missingObj = getMissingDoc(opCtx, oplogEntry); - - if (missingObj.isEmpty()) { - BSONObj object2; - if (auto optionalObject2 = oplogEntry.getObject2()) { - object2 = *optionalObject2; - } - log() << "Missing document not found on source; presumably deleted later in oplog. o first " - "field: " - << redact(oplogEntry.getObject()) << ", o2: " << redact(object2); - - return; - } - - return writeConflictRetry(opCtx, "fetchAndInsertMissingDocument", nss.ns(), [&] { - // Take an X lock on the database in order to preclude other modifications. - AutoGetDb autoDb(opCtx, nss.db(), MODE_X); - Database* const db = autoDb.getDb(); - - WriteUnitOfWork wunit(opCtx); - - Collection* coll = nullptr; - auto uuid = oplogEntry.getUuid(); - if (!uuid) { - if (!db) { - return; - } - coll = db->getOrCreateCollection(opCtx, nss); - } else { - // If the oplog entry has a UUID, use it to find the collection in which to insert the - // missing document. - auto& catalog = CollectionCatalog::get(opCtx); - coll = catalog.lookupCollectionByUUID(*uuid); - if (!coll) { - // TODO(SERVER-30819) insert this UUID into the missing UUIDs set. - return; - } - } - - invariant(coll); - - OpDebug* const nullOpDebug = nullptr; - Status status = coll->insertDocument(opCtx, InsertStatement(missingObj), nullOpDebug, true); - uassert(15917, - str::stream() << "Failed to insert missing document: " << status.toString(), - status.isOK()); - - LOG(1) << "Inserted missing document: " << redact(missingObj); - - wunit.commit(); - - if (_observer) { - const OplogApplier::Observer::FetchInfo fetchInfo(oplogEntry, missingObj); - _observer->onMissingDocumentsFetchedAndInserted({fetchInfo}); - } - }); -} - // This free function is used by the writer threads to apply each op Status multiSyncApply(OperationContext* opCtx, MultiApplier::OperationPtrs* ops, @@ -1038,14 +890,10 @@ Status multiSyncApply(OperationContext* opCtx, opCtx, &entry, oplogApplicationMode, stableTimestampForRecovery); if (!status.isOK()) { - // In initial sync, update operations can cause documents to be missed during - // collection cloning. As a result, it is possible that a document that we - // need to update is not present locally. In that case we fetch the document - // from the sync source. + // Tried to apply an update operation but the document is missing, there must be + // a delete operation for the document later in the oplog. if (status == ErrorCodes::UpdateOperationFailed && - st->getOptions().missingDocumentSourceForInitialSync) { - // We might need to fetch the missing docs from the sync source. - st->fetchAndInsertMissingDocument(opCtx, entry); + oplogApplicationMode == OplogApplication::Mode::kInitialSync) { continue; } diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index a2ab581d75c..65d7639e5cb 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -192,23 +192,6 @@ public: using BatchLimits = OplogApplier::BatchLimits; /** - * Fetch a single document referenced in the operation from the sync source. - * - * The sync source is specified at construction in - * OplogApplier::Options::missingDocumentSourceForInitialSync. - */ - virtual BSONObj getMissingDoc(OperationContext* opCtx, const OplogEntry& oplogEntry); - - /** - * If an update fails, fetches the missing document and inserts it into the local collection. - * - * Calls OplogApplier::Observer::onMissingDocumentsFetchedAndInserted() if the document was - * fetched and inserted successfully. - */ - virtual void fetchAndInsertMissingDocument(OperationContext* opCtx, - const OplogEntry& oplogEntry); - - /** * Applies a batch of oplog entries by writing the oplog entries to the local oplog and then * using a set of threads to apply the operations. It will only apply (but will * still write to the oplog) oplog entries with a timestamp greater than or equal to the diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index d6a194ba29d..1e544e731fb 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -101,51 +101,14 @@ OplogEntry makeOplogEntry(OpTypeEnum opType, NamespaceString nss, OptionalCollec } /** - * Testing-only SyncTail that returns user-provided "document" for getMissingDoc(). + * Testing-only SyncTail. */ -class SyncTailWithLocalDocumentFetcher : public SyncTail, OplogApplier::Observer { +class SyncTailForTest : public SyncTail { public: - SyncTailWithLocalDocumentFetcher(const BSONObj& document); - BSONObj getMissingDoc(OperationContext* opCtx, const OplogEntry& oplogEntry) override; - - // OplogApplier::Observer functions - void onBatchBegin(const OplogApplier::Operations&) final {} - void onBatchEnd(const StatusWith<OpTime>&, const OplogApplier::Operations&) final {} - void onMissingDocumentsFetchedAndInserted(const std::vector<FetchInfo>& docs) final { - numFetched += docs.size(); - } - - std::size_t numFetched = 0U; - -private: - BSONObj _document; -}; - -/** - * Testing-only SyncTail that checks the operation context in fetchAndInsertMissingDocument(). - */ -class SyncTailWithOperationContextChecker : public SyncTail { -public: - SyncTailWithOperationContextChecker(); - void fetchAndInsertMissingDocument(OperationContext* opCtx, - const OplogEntry& oplogEntry) override; - bool called = false; + SyncTailForTest(); }; -SyncTailWithLocalDocumentFetcher::SyncTailWithLocalDocumentFetcher(const BSONObj& document) - : SyncTail(this, // observer - nullptr, // consistency markers - nullptr, // storage interface - SyncTail::MultiSyncApplyFunc(), - nullptr, // writer pool - SyncTailTest::makeInitialSyncOptions()), - _document(document) {} - -BSONObj SyncTailWithLocalDocumentFetcher::getMissingDoc(OperationContext*, const OplogEntry&) { - return _document; -} - -SyncTailWithOperationContextChecker::SyncTailWithOperationContextChecker() +SyncTailForTest::SyncTailForTest() : SyncTail(nullptr, // observer nullptr, // consistency markers nullptr, // storage interface @@ -153,14 +116,6 @@ SyncTailWithOperationContextChecker::SyncTailWithOperationContextChecker() nullptr, // writer pool SyncTailTest::makeInitialSyncOptions()) {} -void SyncTailWithOperationContextChecker::fetchAndInsertMissingDocument(OperationContext* opCtx, - const OplogEntry&) { - ASSERT_FALSE(opCtx->writesAreReplicated()); - ASSERT_FALSE(opCtx->lockState()->shouldConflictWithSecondaryBatchApplication()); - ASSERT_TRUE(documentValidationDisabled(opCtx)); - called = true; -} - /** * Creates collection options suitable for oplog. */ @@ -1769,8 +1724,7 @@ TEST_F(SyncTailTest, MultiSyncApplyFallsBackOnApplyingInsertsIndividuallyWhenGro } TEST_F(SyncTailTest, MultiSyncApplyIgnoresUpdateOperationIfDocumentIsMissingFromSyncSource) { - BSONObj emptyDoc; - SyncTailWithLocalDocumentFetcher syncTail(emptyDoc); + SyncTailForTest syncTail; NamespaceString nss("test.t"); { Lock::GlobalWrite globalLock(_opCtx.get()); @@ -1786,17 +1740,14 @@ TEST_F(SyncTailTest, MultiSyncApplyIgnoresUpdateOperationIfDocumentIsMissingFrom WorkerMultikeyPathInfo pathInfo; ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); - // Since the missing document is not found on the sync source, the collection referenced by - // the failed operation should not be automatically created. + // Since the document was missing when we cloned data from the sync source, the collection + // referenced by the failed operation should not be automatically created. ASSERT_FALSE(AutoGetCollectionForReadCommand(_opCtx.get(), nss).getCollection()); - - // Fetch count should remain zero if we failed to copy the missing document. - ASSERT_EQUALS(syncTail.numFetched, 0U); } TEST_F(SyncTailTest, MultiSyncApplySkipsDocumentOnNamespaceNotFoundDuringInitialSync) { BSONObj emptyDoc; - SyncTailWithLocalDocumentFetcher syncTail(emptyDoc); + SyncTailForTest syncTail; NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); NamespaceString badNss("local." + _agent.getSuiteName() + "_" + _agent.getTestName() + "bad"); auto doc1 = BSON("_id" << 1); @@ -1809,7 +1760,6 @@ TEST_F(SyncTailTest, MultiSyncApplySkipsDocumentOnNamespaceNotFoundDuringInitial MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3}; WorkerMultikeyPathInfo pathInfo; ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); - ASSERT_EQUALS(syncTail.numFetched, 0U); CollectionReader collectionReader(_opCtx.get(), nss); ASSERT_BSONOBJ_EQ(doc1, unittest::assertGet(collectionReader.next())); @@ -1819,7 +1769,7 @@ TEST_F(SyncTailTest, MultiSyncApplySkipsDocumentOnNamespaceNotFoundDuringInitial TEST_F(SyncTailTest, MultiSyncApplySkipsIndexCreationOnNamespaceNotFoundDuringInitialSync) { BSONObj emptyDoc; - SyncTailWithLocalDocumentFetcher syncTail(emptyDoc); + SyncTailForTest syncTail; NamespaceString nss("local." + _agent.getSuiteName() + "_" + _agent.getTestName()); NamespaceString badNss("local." + _agent.getSuiteName() + "_" + _agent.getTestName() + "bad"); auto doc1 = BSON("_id" << 1); @@ -1834,7 +1784,6 @@ TEST_F(SyncTailTest, MultiSyncApplySkipsIndexCreationOnNamespaceNotFoundDuringIn MultiApplier::OperationPtrs ops = {&op0, &op1, &op2, &op3}; WorkerMultikeyPathInfo pathInfo; ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); - ASSERT_EQUALS(syncTail.numFetched, 0U); CollectionReader collectionReader(_opCtx.get(), nss); ASSERT_BSONOBJ_EQ(doc1, unittest::assertGet(collectionReader.next())); @@ -1845,26 +1794,6 @@ TEST_F(SyncTailTest, MultiSyncApplySkipsIndexCreationOnNamespaceNotFoundDuringIn ASSERT_FALSE(AutoGetCollectionForReadCommand(_opCtx.get(), badNss).getCollection()); } -TEST_F(SyncTailTest, MultiSyncApplyFetchesMissingDocumentIfDocumentIsAvailableFromSyncSource) { - SyncTailWithLocalDocumentFetcher syncTail(BSON("_id" << 0 << "x" << 1)); - NamespaceString nss("test.t"); - createCollection(_opCtx.get(), nss, {}); - auto updatedDocument = BSON("_id" << 0 << "x" << 1); - auto op = makeUpdateDocumentOplogEntry( - {Timestamp(Seconds(1), 0), 1LL}, nss, BSON("_id" << 0), updatedDocument); - MultiApplier::OperationPtrs ops = {&op}; - WorkerMultikeyPathInfo pathInfo; - ASSERT_OK(multiSyncApply(_opCtx.get(), &ops, &syncTail, &pathInfo)); - ASSERT_EQUALS(syncTail.numFetched, 1U); - - // The collection referenced by "ns" in the failed operation is automatically created to hold - // the missing document fetched from the sync source. We verify the contents of the collection - // with the CollectionReader class. - CollectionReader collectionReader(_opCtx.get(), nss); - ASSERT_BSONOBJ_EQ(updatedDocument, unittest::assertGet(collectionReader.next())); - ASSERT_EQUALS(ErrorCodes::CollectionIsEmpty, collectionReader.next().getStatus()); -} - namespace { class ReplicationCoordinatorSignalDrainCompleteThrows : public ReplicationCoordinatorMock { diff --git a/src/mongo/db/repl/sync_tail_test_fixture.cpp b/src/mongo/db/repl/sync_tail_test_fixture.cpp index 59860e000c2..7c85abeabf3 100644 --- a/src/mongo/db/repl/sync_tail_test_fixture.cpp +++ b/src/mongo/db/repl/sync_tail_test_fixture.cpp @@ -94,7 +94,6 @@ void SyncTailOpObserver::onCreateCollection(OperationContext* opCtx, OplogApplier::Options SyncTailTest::makeInitialSyncOptions() { OplogApplier::Options options(OplogApplication::Mode::kInitialSync); options.allowNamespaceNotFoundErrorsOnCrudOps = true; - options.missingDocumentSourceForInitialSync = HostAndPort("localhost", 123); return options; } diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp index 95d016a523d..0712c4e7b88 100644 --- a/src/mongo/dbtests/repltests.cpp +++ b/src/mongo/dbtests/repltests.cpp @@ -1356,96 +1356,6 @@ public: } }; -class SyncTest : public SyncTail { -public: - bool returnEmpty; - explicit SyncTest(OplogApplier::Observer* observer) - : SyncTail(observer, - nullptr, - nullptr, - SyncTail::MultiSyncApplyFunc(), - nullptr, - OplogApplier::Options(OplogApplication::Mode::kInitialSync)), - returnEmpty(false) {} - virtual ~SyncTest() {} - BSONObj getMissingDoc(OperationContext* opCtx, const OplogEntry& oplogEntry) override { - if (returnEmpty) { - BSONObj o; - return o; - } - return BSON("_id" - << "on remote" - << "foo" - << "baz"); - } -}; - -class FetchAndInsertMissingDocumentObserver : public OplogApplier::Observer { -public: - void onBatchBegin(const OplogApplier::Operations&) final {} - void onBatchEnd(const StatusWith<OpTime>&, const OplogApplier::Operations&) final {} - void onMissingDocumentsFetchedAndInserted(const std::vector<FetchInfo>&) final { - fetched = true; - } - bool fetched = false; -}; - -class FetchAndInsertMissingDocument : public Base { -public: - void run() { - // Replication is not supported by mobile SE. - if (mongo::storageGlobalParams.engine == "mobile") { - return; - } - bool threw = false; - auto oplogEntry = makeOplogEntry(OpTime(Timestamp(100, 1), 1LL), // optime - OpTypeEnum::kUpdate, // op type - NamespaceString(ns()), // namespace - BSON("foo" - << "bar"), // o - BSON("_id" - << "in oplog" - << "foo" - << "bar")); // o2 - - Lock::GlobalWrite lk(&_opCtx); - - // this should fail because we can't connect - try { - OplogApplier::Options options(OplogApplication::Mode::kInitialSync); - options.allowNamespaceNotFoundErrorsOnCrudOps = true; - options.missingDocumentSourceForInitialSync = HostAndPort("localhost", 123); - SyncTail badSource( - nullptr, nullptr, nullptr, SyncTail::MultiSyncApplyFunc(), nullptr, options); - - OldClientContext ctx(&_opCtx, ns()); - badSource.getMissingDoc(&_opCtx, oplogEntry); - } catch (DBException&) { - threw = true; - } - verify(threw); - - // now this should succeed - FetchAndInsertMissingDocumentObserver observer; - SyncTest t(&observer); - t.fetchAndInsertMissingDocument(&_opCtx, oplogEntry); - ASSERT(observer.fetched); - verify(!_client - .findOne(ns(), - BSON("_id" - << "on remote")) - .isEmpty()); - - // Reset flag in observer before next test case. - observer.fetched = false; - - // force it not to find an obj - t.returnEmpty = true; - t.fetchAndInsertMissingDocument(&_opCtx, oplogEntry); - ASSERT_FALSE(observer.fetched); - } -}; - class All : public Suite { public: All() : Suite("repl") {} @@ -1503,7 +1413,6 @@ public: add<Idempotence::ReplaySetPreexistingNoOpPull>(); add<Idempotence::ReplayArrayFieldNotAppended>(); add<DeleteOpIsIdBased>(); - add<FetchAndInsertMissingDocument>(); } }; diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index ea4dc1e148b..5d2ac4a9e25 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -145,7 +145,6 @@ class DoNothingOplogApplierObserver : public repl::OplogApplier::Observer { public: void onBatchBegin(const repl::OplogApplier::Operations&) final {} void onBatchEnd(const StatusWith<repl::OpTime>&, const repl::OplogApplier::Operations&) final {} - void onMissingDocumentsFetchedAndInserted(const std::vector<FetchInfo>&) final {} }; class StorageTimestampTest { @@ -1374,7 +1373,6 @@ public: auto writerPool = repl::OplogApplier::makeWriterPool(); repl::OplogApplier::Options options(repl::OplogApplication::Mode::kInitialSync); options.allowNamespaceNotFoundErrorsOnCrudOps = true; - options.missingDocumentSourceForInitialSync = HostAndPort("localhost", 123); repl::OplogApplierImpl oplogApplier( nullptr, // task executor. not required for multiApply(). |