diff options
-rw-r--r-- | jstests/multiVersion/initial_sync_drop_against_last_stable.js | 162 | ||||
-rw-r--r-- | jstests/replsets/initial_sync_drop_collection.js | 105 | ||||
-rw-r--r-- | jstests/replsets/initial_sync_rename_collection.js | 207 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.cpp | 55 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.h | 31 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner_test.cpp | 154 |
6 files changed, 661 insertions, 53 deletions
diff --git a/jstests/multiVersion/initial_sync_drop_against_last_stable.js b/jstests/multiVersion/initial_sync_drop_against_last_stable.js new file mode 100644 index 00000000000..a7969a7f0d6 --- /dev/null +++ b/jstests/multiVersion/initial_sync_drop_against_last_stable.js @@ -0,0 +1,162 @@ +/** + * Test that CollectionCloner completes without error when a collection is dropped during cloning, + * specifically when that sync source is in 4.2. + */ + +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); +load('jstests/replsets/libs/two_phase_drops.js'); +load("jstests/libs/uuid_util.js"); + +// Set up replica set with two nodes. We will add a third and force it to sync from the secondary. +const testName = "initial_sync_drop_against_last_stable"; +const dbName = testName; +const replTest = new ReplSetTest({ + name: testName, + nodes: [ + {}, /* primary */ + {rsConfig: {priority: 0, votes: 0}, binVersion: "last-stable"}, /* sync source */ + {rsConfig: {priority: 0, votes: 0}} /* initial syncing node */ + ] +}); +replTest.startSet(); +replTest.initiate(); + +const collName = "testcoll"; +const primary = replTest.getPrimary(); +const primaryDB = primary.getDB(dbName); +const primaryColl = primaryDB[collName]; +const nss = primaryColl.getFullName(); + +// The sync source. +const syncSource = replTest.getSecondary(); + +// The initial syncing node. Places in the test that refer to 'the secondary' refer to this node. +let secondary = replTest.getSecondaries()[1]; +assert.neq(syncSource, secondary, "initial syncing node should be the third in the set"); +let secondaryDB = secondary.getDB(dbName); +let secondaryColl = secondaryDB[collName]; + +// This function adds data to the collection, restarts the secondary node with the given +// parameters and setting the given failpoint, waits for the failpoint to be hit, +// drops the collection, then disables the failpoint. It then optionally waits for the +// expectedLog message and waits for the secondary to complete initial sync, then ensures +// the collection on the secondary is empty. +function setupTest({failPoint, extraFailPointData, secondaryStartupParams}) { + jsTestLog("Writing data to collection."); + assert.commandWorked(primaryColl.insert([{_id: 1}, {_id: 2}])); + const data = Object.merge(extraFailPointData || {}, {nss: nss}); + + jsTestLog("Restarting secondary with failPoint " + failPoint + " set for " + nss); + secondaryStartupParams = secondaryStartupParams || {}; + secondaryStartupParams['failpoint.' + failPoint] = tojson({mode: 'alwaysOn', data: data}); + // Force the initial syncing node to sync against the 4.2 secondary. + secondaryStartupParams['failpoint.forceSyncSourceCandidate'] = + tojson({mode: 'alwaysOn', data: {hostAndPort: syncSource.host}}); + // Skip clearing initial sync progress after a successful initial sync attempt so that we + // can check initialSyncStatus fields after initial sync is complete. + secondaryStartupParams['failpoint.skipClearInitialSyncState'] = tojson({mode: 'alwaysOn'}); + secondaryStartupParams['numInitialSyncAttempts'] = 1; + secondary = + replTest.restart(secondary, {startClean: true, setParameter: secondaryStartupParams}); + secondaryDB = secondary.getDB(dbName); + secondaryColl = secondaryDB[collName]; + + jsTestLog("Waiting for secondary to reach failPoint " + failPoint); + assert.commandWorked(secondary.adminCommand({ + waitForFailPoint: failPoint, + timesEntered: 1, + maxTimeMS: kDefaultWaitForFailPointTimeout + })); + + // Restarting the secondary may have resulted in an election. Wait until the system + // stabilizes and reaches RS_STARTUP2 state. + replTest.getPrimary(); + replTest.waitForState(secondary, ReplSetTest.State.STARTUP_2); +} + +function finishTest({failPoint, expectedLog, waitForDrop, createNew}) { + // Get the uuid for use in checking the log line. + let uuid = getUUIDFromListCollections(primaryDB, collName); + + jsTestLog("Dropping collection on primary: " + primaryColl.getFullName()); + assert(primaryColl.drop()); + replTest.awaitReplication(null, null, [syncSource]); + + if (waitForDrop) { + jsTestLog("Waiting for drop to commit on primary"); + TwoPhaseDropCollectionTest.waitForDropToComplete(primaryDB, collName); + } + + if (createNew) { + jsTestLog("Creating a new collection with the same name: " + primaryColl.getFullName()); + assert.commandWorked(primaryColl.insert({_id: "not the same collection"})); + } + + jsTestLog("Allowing secondary to continue."); + assert.commandWorked(secondary.adminCommand({configureFailPoint: failPoint, mode: 'off'})); + + if (expectedLog) { + jsTestLog(eval(expectedLog)); + checkLog.contains(secondary, eval(expectedLog)); + } + + jsTestLog("Waiting for initial sync to complete."); + replTest.waitForState(secondary, ReplSetTest.State.SECONDARY); + + let res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1})); + assert.eq(0, res.initialSyncStatus.failedInitialSyncAttempts); + + if (createNew) { + assert.eq([{_id: "not the same collection"}], secondaryColl.find().toArray()); + assert(primaryColl.drop()); + } else { + assert.eq(0, secondaryColl.find().itcount()); + } + + replTest.checkReplicatedDataHashes(); +} + +function runDropTest(params) { + setupTest(params); + finishTest(params); +} + +jsTestLog("[1] Testing dropping between listIndexes and find."); +runDropTest({ + failPoint: "hangBeforeClonerStage", + extraFailPointData: {cloner: "CollectionCloner", stage: "query"} +}); + +jsTestLog( + "[2] Testing dropping between listIndexes and find, with new same-name collection created."); +runDropTest({ + failPoint: "hangBeforeClonerStage", + extraFailPointData: {cloner: "CollectionCloner", stage: "query"}, + createNew: true +}); + +jsTestLog("[3] Testing committed drop between getMore calls."); +runDropTest({ + failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse", + secondaryStartupParams: {collectionClonerBatchSize: 1}, + waitForDrop: true, + expectedLog: + "`CollectionCloner ns: '${nss}' uuid: ${uuid} stopped because collection was dropped on source.`" +}); + +jsTestLog( + "[4] Testing committed drop with new same-name collection created, between getMore calls."); +runDropTest({ + failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse", + secondaryStartupParams: {collectionClonerBatchSize: 1}, + waitForDrop: true, + expectedLog: + "`CollectionCloner ns: '${nss}' uuid: ${uuid} stopped because collection was dropped on source.`", + createNew: true +}); + +replTest.stopSet(); +})();
\ No newline at end of file diff --git a/jstests/replsets/initial_sync_drop_collection.js b/jstests/replsets/initial_sync_drop_collection.js index cd6b6aaee08..de375ffc73c 100644 --- a/jstests/replsets/initial_sync_drop_collection.js +++ b/jstests/replsets/initial_sync_drop_collection.js @@ -1,16 +1,11 @@ -// Test that CollectionCloner completes without error when a collection is dropped during cloning. +/** + * Test that CollectionCloner completes without error when a collection is dropped during cloning. + * @tags: [requires_fcv_44] + */ (function() { "use strict"; -// TODO(SERVER-43277): This test is disabled while we work on the Resumable Initial Sync -// project -/* eslint-disable no-unreachable */ -return; - -// Skip db hash check because secondary cannot complete initial sync. -TestData.skipCheckDBHashes = true; - load("jstests/libs/fail_point_util.js"); load('jstests/replsets/libs/two_phase_drops.js'); load("jstests/libs/uuid_util.js"); @@ -30,7 +25,6 @@ var secondaryDB = secondary.getDB(dbName); const collName = "testcoll"; var primaryColl = primaryDB[collName]; var secondaryColl = secondaryDB[collName]; -var pRenameColl = primaryDB["r_" + collName]; var nss = primaryColl.getFullName(); // This function adds data to the collection, restarts the secondary node with the given @@ -38,13 +32,14 @@ var nss = primaryColl.getFullName(); // drops the collection, then disables the failpoint. It then optionally waits for the // expectedLog message and waits for the secondary to complete initial sync, then ensures // the collection on the secondary is empty. -function setupTest({failPoint, secondaryStartupParams}) { +function setupTest({failPoint, extraFailPointData, secondaryStartupParams}) { jsTestLog("Writing data to collection."); assert.commandWorked(primaryColl.insert([{_id: 1}, {_id: 2}])); + const data = Object.merge(extraFailPointData || {}, {nss: nss}); jsTestLog("Restarting secondary with failPoint " + failPoint + " set for " + nss); secondaryStartupParams = secondaryStartupParams || {}; - secondaryStartupParams['failpoint.' + failPoint] = tojson({mode: 'alwaysOn', data: {nss: nss}}); + secondaryStartupParams['failpoint.' + failPoint] = tojson({mode: 'alwaysOn', data: data}); // Skip clearing initial sync progress after a successful initial sync attempt so that we // can check initialSyncStatus fields after initial sync is complete. secondaryStartupParams['failpoint.skipClearInitialSyncState'] = tojson({mode: 'alwaysOn'}); @@ -67,9 +62,9 @@ function setupTest({failPoint, secondaryStartupParams}) { replTest.waitForState(secondary, ReplSetTest.State.STARTUP_2); } -function finishTest({failPoint, secondaryStartupParams, expectedLog, waitForDrop, createNew}) { +function finishTest({failPoint, expectedLog, waitForDrop, createNew}) { // Get the uuid for use in checking the log line. - let uuid = getUUIDFromListCollections(primaryDB, collName); + const uuid = extractUUIDFromObject(getUUIDFromListCollections(primaryDB, collName)); jsTestLog("Dropping collection on primary: " + primaryColl.getFullName()); assert(primaryColl.drop()); @@ -79,6 +74,16 @@ function finishTest({failPoint, secondaryStartupParams, expectedLog, waitForDrop TwoPhaseDropCollectionTest.waitForDropToComplete(primaryDB, collName); } + // Only set for test cases that use 'system.drop' namespaces when dropping collections. + // In those tests the variable 'rnss' represents such a namespace. Used for expectedLog. + // See test cases 3 and 4 below. + let rnss; + const dropPendingColl = + TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(primaryDB, collName); + if (dropPendingColl) { + rnss = dbName + "." + dropPendingColl["name"]; + } + if (createNew) { jsTestLog("Creating a new collection with the same name: " + primaryColl.getFullName()); assert.commandWorked(primaryColl.insert({_id: "not the same collection"})); @@ -112,68 +117,72 @@ function runDropTest(params) { finishTest(params); } -jsTestLog("Testing dropping between listIndexes and find."); -runDropTest({failPoint: "initialSyncHangCollectionClonerBeforeEstablishingCursor"}); +jsTestLog("[1] Testing dropping between listIndexes and find."); +runDropTest({ + failPoint: "hangBeforeClonerStage", + extraFailPointData: {cloner: "CollectionCloner", stage: "query"} +}); + +jsTestLog( + "[2] Testing dropping between listIndexes and find, with new same-name collection created."); +runDropTest({ + failPoint: "hangBeforeClonerStage", + extraFailPointData: {cloner: "CollectionCloner", stage: "query"}, + createNew: true +}); + +let expectedLogFor3and4 = + "`CollectionCloner ns: '${nss}' uuid: UUID(\"${uuid}\") stopped because collection was dropped on source.`"; -jsTestLog("Testing dropping between listIndexes and find, with new same-name collection created."); -runDropTest( - {failPoint: "initialSyncHangCollectionClonerBeforeEstablishingCursor", createNew: true}); +// We don't support 4.2 style two-phase drops with EMRC=false - in that configuration, the +// collection will instead be renamed to a <db>.system.drop.* namespace before being dropped. Since +// the cloner queries collection by UUID, it will observe the first drop phase as a rename. +// We still want to check that initial sync succeeds in such a case. +if (TwoPhaseDropCollectionTest.supportsDropPendingNamespaces(replTest)) { + expectedLogFor3and4 = + "`Initial Sync retrying CollectionCloner stage query due to QueryPlanKilled: collection renamed from '${nss}' to '${rnss}'. UUID ${uuid}`"; +} -jsTestLog("Testing drop-pending between getMore calls."); +jsTestLog("[3] Testing drop-pending between getMore calls."); runDropTest({ failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse", secondaryStartupParams: {collectionClonerBatchSize: 1}, - expectedLog: - "`CollectionCloner ns: '${nss}' uuid: ${uuid} stopped because collection was dropped.`" + expectedLog: expectedLogFor3and4 }); -jsTestLog("Testing drop-pending with new same-name collection created, between getMore calls."); +jsTestLog("[4] Testing drop-pending with new same-name collection created, between getMore calls."); runDropTest({ failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse", secondaryStartupParams: {collectionClonerBatchSize: 1}, - expectedLog: - "`CollectionCloner ns: '${nss}' uuid: ${uuid} stopped because collection was dropped.`", + expectedLog: expectedLogFor3and4, createNew: true }); -jsTestLog("Testing committed drop between getMore calls."); - // Add another node to the set, so when we drop the collection it can commit. This other // secondary will be finished with initial sync when the drop happens. var secondary2 = replTest.add({rsConfig: {priority: 0}}); replTest.reInitiate(); replTest.waitForState(secondary2, ReplSetTest.State.SECONDARY); +jsTestLog("[5] Testing committed drop between getMore calls."); runDropTest({ failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse", secondaryStartupParams: {collectionClonerBatchSize: 1}, waitForDrop: true, expectedLog: - "`CollectionCloner ns: '${nss}' uuid: ${uuid} stopped because collection was dropped.`" + "`CollectionCloner ns: '${nss}' uuid: UUID(\"${uuid}\") stopped because collection was dropped on source.`" }); -jsTestLog("Testing rename between getMores."); -setupTest({ +jsTestLog( + "[6] Testing committed drop with new same-name collection created, between getMore calls."); +runDropTest({ failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse", secondaryStartupParams: {collectionClonerBatchSize: 1}, + waitForDrop: true, + expectedLog: + "`CollectionCloner ns: '${nss}' uuid: UUID(\"${uuid}\") stopped because collection was dropped on source.`", + createNew: true }); -jsTestLog("Renaming collection on primary"); -assert.commandWorked(primary.adminCommand({ - renameCollection: primaryColl.getFullName(), - to: pRenameColl.getFullName(), - dropTarget: false -})); - -jsTestLog("Allowing secondary to continue."); -// Make sure we don't reach the fassert() indicating initial sync failure. -assert.commandWorked( - secondary.adminCommand({configureFailPoint: "initialSyncHangBeforeFinish", mode: 'alwaysOn'})); - -assert.commandWorked(secondary.adminCommand({ - configureFailPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse", - mode: 'off' -})); -jsTestLog("Waiting for initial sync to complete."); -checkLog.contains(secondary, "The maximum number of retries have been exhausted for initial sync."); + replTest.stopSet(); })(); diff --git a/jstests/replsets/initial_sync_rename_collection.js b/jstests/replsets/initial_sync_rename_collection.js new file mode 100644 index 00000000000..cc07397cd90 --- /dev/null +++ b/jstests/replsets/initial_sync_rename_collection.js @@ -0,0 +1,207 @@ +/** + * Test that CollectionCloner completes without error when a collection is renamed during cloning. + * @tags: [requires_fcv_44] + */ + +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); +load("jstests/libs/uuid_util.js"); +load('jstests/replsets/libs/two_phase_drops.js'); + +// Set up replica set. Disallow chaining so nodes always sync from primary. +const testName = "initial_sync_rename_collection"; +const dbName = testName; +const replTest = new ReplSetTest( + {name: testName, nodes: [{}, {rsConfig: {priority: 0}}], settings: {chainingAllowed: false}}); +replTest.startSet(); +replTest.initiate(); + +const collName = "testcoll"; +const primary = replTest.getPrimary(); +const primaryDB = primary.getDB(dbName); +const primaryColl = primaryDB[collName]; +const pRenameColl = primaryDB["r_" + collName]; + +// Used for cross-DB renames. +const secondDbName = testName + "_cross"; +const primarySecondDB = primary.getDB(secondDbName); +const pCrossDBRenameColl = primarySecondDB[collName + "_cross"]; + +const nss = primaryColl.getFullName(); +const rnss = pRenameColl.getFullName(); +let secondary = replTest.getSecondary(); +let secondaryDB = secondary.getDB(dbName); +let secondaryColl = secondaryDB[collName]; + +// This function adds data to the collection, restarts the secondary node with the given +// parameters and setting the given failpoint, waits for the failpoint to be hit, +// renames the collection, then disables the failpoint. It then optionally waits for the +// expectedLog message and waits for the secondary to complete initial sync, then ensures +// the collection on the secondary has been properly cloned. +function setupTest({failPoint, extraFailPointData, secondaryStartupParams}) { + jsTestLog("Writing data to collection."); + assert.commandWorked(primaryColl.insert([{_id: 1}, {_id: 2}])); + const data = Object.merge(extraFailPointData || {}, {nss: nss}); + + jsTestLog("Restarting secondary with failPoint " + failPoint + " set for " + nss); + secondaryStartupParams = secondaryStartupParams || {}; + secondaryStartupParams['failpoint.' + failPoint] = tojson({mode: 'alwaysOn', data: data}); + // Skip clearing initial sync progress after a successful initial sync attempt so that we + // can check initialSyncStatus fields after initial sync is complete. + secondaryStartupParams['failpoint.skipClearInitialSyncState'] = tojson({mode: 'alwaysOn'}); + secondaryStartupParams['numInitialSyncAttempts'] = 1; + secondary = + replTest.restart(secondary, {startClean: true, setParameter: secondaryStartupParams}); + secondaryDB = secondary.getDB(dbName); + secondaryColl = secondaryDB[collName]; + + jsTestLog("Waiting for secondary to reach failPoint " + failPoint); + assert.commandWorked(secondary.adminCommand({ + waitForFailPoint: failPoint, + timesEntered: 1, + maxTimeMS: kDefaultWaitForFailPointTimeout + })); + + // Restarting the secondary may have resulted in an election. Wait until the system + // stabilizes and reaches RS_STARTUP2 state. + replTest.getPrimary(); + replTest.waitForState(secondary, ReplSetTest.State.STARTUP_2); +} + +function finishTest({failPoint, expectedLog, createNew, renameAcrossDBs}) { + // Get the uuid for use in checking the log line. + const uuid = extractUUIDFromObject(getUUIDFromListCollections(primaryDB, collName)); + const target = (renameAcrossDBs ? pCrossDBRenameColl : pRenameColl); + + jsTestLog("Renaming collection on primary: " + target.getFullName()); + assert.commandWorked(primary.adminCommand({ + renameCollection: primaryColl.getFullName(), + to: target.getFullName(), + dropTarget: false + })); + + // Only set for test cases that use 'system.drop' namespaces when dropping collections. + // In those tests the variable 'dropPendingNss' represents such a namespace. Used for + // expectedLog. See test cases 6 and 8 below. + let dropPendingNss; + const dropPendingColl = + TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(primaryDB, collName); + if (dropPendingColl) { + dropPendingNss = dbName + "." + dropPendingColl["name"]; + } + + if (createNew) { + jsTestLog("Creating a new collection with the same name: " + primaryColl.getFullName()); + assert.commandWorked(primaryColl.insert({_id: "not the same collection"})); + } + + jsTestLog("Allowing secondary to continue."); + assert.commandWorked(secondary.adminCommand({configureFailPoint: failPoint, mode: 'off'})); + + if (expectedLog) { + jsTestLog(eval(expectedLog)); + checkLog.contains(secondary, eval(expectedLog)); + } + + jsTestLog("Waiting for initial sync to complete."); + replTest.waitForState(secondary, ReplSetTest.State.SECONDARY); + + let res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1})); + assert.eq(0, res.initialSyncStatus.failedInitialSyncAttempts); + + if (createNew) { + assert.eq([{_id: "not the same collection"}], secondaryColl.find().toArray()); + assert(primaryColl.drop()); + } else { + assert.eq(0, secondaryColl.find().itcount()); + } + replTest.checkReplicatedDataHashes(); + + // Drop the renamed collection so we can start fresh the next time around. + assert(target.drop()); +} + +function runRenameTest(params) { + setupTest(params); + finishTest(params); +} + +jsTestLog("[1] Testing rename between listIndexes and find."); +runRenameTest({ + failPoint: "hangBeforeClonerStage", + extraFailPointData: {cloner: "CollectionCloner", stage: "query"} +}); + +jsTestLog("[2] Testing cross-DB rename between listIndexes and find."); +runRenameTest({ + failPoint: "hangBeforeClonerStage", + extraFailPointData: {cloner: "CollectionCloner", stage: "query"}, + renameAcrossDBs: true +}); + +jsTestLog( + "[3] Testing rename between listIndexes and find, with new same-name collection created."); +runRenameTest({ + failPoint: "hangBeforeClonerStage", + extraFailPointData: {cloner: "CollectionCloner", stage: "query"}, + createNew: true +}); + +jsTestLog( + "[4] Testing cross-DB rename between listIndexes and find, with new same-name collection created."); +runRenameTest({ + failPoint: "hangBeforeClonerStage", + extraFailPointData: {cloner: "CollectionCloner", stage: "query"}, + createNew: true, + renameAcrossDBs: true +}); + +jsTestLog("[5] Testing rename between getMores."); +runRenameTest({ + failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse", + secondaryStartupParams: {collectionClonerBatchSize: 1}, + expectedLog: + "`Initial Sync retrying CollectionCloner stage query due to QueryPlanKilled: collection renamed from '${nss}' to '${rnss}'. UUID ${uuid}`" +}); + +// A cross-DB rename will appear as a drop in the context of the source DB. +let expectedLogFor6and8 = + "`CollectionCloner ns: '${nss}' uuid: UUID(\"${uuid}\") stopped because collection was dropped on source.`"; + +// We don't support 4.2 style two-phase drops with EMRC=false - in that configuration, the +// collection will instead be renamed to a <db>.system.drop.* namespace before being dropped. Since +// the cloner queries collection by UUID, it will observe the first drop phase as a rename. +// We still want to check that initial sync succeeds in such a case. +if (TwoPhaseDropCollectionTest.supportsDropPendingNamespaces(replTest)) { + expectedLogFor6and8 = + "`Initial Sync retrying CollectionCloner stage query due to QueryPlanKilled: collection renamed from '${nss}' to '${dropPendingNss}'. UUID ${uuid}`"; +} + +jsTestLog("[6] Testing cross-DB rename between getMores."); +runRenameTest({ + failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse", + secondaryStartupParams: {collectionClonerBatchSize: 1}, + renameAcrossDBs: true, + expectedLog: expectedLogFor6and8 +}); + +jsTestLog("[7] Testing rename with new same-name collection created, between getMores."); +runRenameTest({ + failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse", + secondaryStartupParams: {collectionClonerBatchSize: 1}, + expectedLog: + "`Initial Sync retrying CollectionCloner stage query due to QueryPlanKilled: collection renamed from '${nss}' to '${rnss}'. UUID ${uuid}`" +}); + +jsTestLog("[8] Testing cross-DB rename with new same-name collection created, between getMores."); +runRenameTest({ + failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse", + secondaryStartupParams: {collectionClonerBatchSize: 1}, + renameAcrossDBs: true, + expectedLog: expectedLogFor6and8 +}); + +replTest.stopSet(); +})();
\ No newline at end of file diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 41968073f89..4d73098c787 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -200,10 +200,12 @@ void CollectionCloner::runQuery() { if (_resumeSupported) { if (_resumeToken) { // Resume the query from where we left off. + LOG(1) << "Collection cloner will resume the last successful query"; query = QUERY("query" << BSONObj() << "$readOnce" << true << "$_requestResumeToken" << true << "$_resumeAfter" << _resumeToken.get()); } else { // New attempt at a resumable query. + LOG(1) << "Collection cloner will run a new query"; query = QUERY("query" << BSONObj() << "$readOnce" << true << "$_requestResumeToken" << true); } @@ -226,13 +228,35 @@ void CollectionCloner::runQuery() { } catch (...) { auto status = exceptionToStatus(); + // If the collection was dropped at any point, we can just move on to the next cloner. + // This applies to both resumable (4.4) and non-resumable (4.2) queries. + if (status == ErrorCodes::NamespaceNotFound) { + throw; // This will re-throw the NamespaceNotFound, resulting in a clean exit. + } + + // Wire version 4.2 only. if (!_resumeSupported) { - std::string message = str::stream() - << "Collection clone failed and is not resumable. nss: " << _sourceNss; - log() << message; - uasserted(ErrorCodes::InitialSyncFailure, message); + // If we lost our cursor last round, the only time we can can continue is if we find out + // this round that the collection was dropped on the source (that scenario is covered + // right above). If that is not the case, then the cloner would have more work to do, + // but since we cannot resume the query, we must abort initial sync. + if (_lostNonResumableCursor) { + abortNonResumableClone(status); + } + + // Collection has changed upstream. This will trigger the code block above next round, + // (unless we find out the collection was dropped via getting a NamespaceNotFound). + if (_queryStage.isCursorError(status)) { + log() << "Lost cursor during non-resumable query: " << status; + _lostNonResumableCursor = true; + throw; + } + // Any other errors (including network errors, but excluding NamespaceNotFound) result + // in immediate failure. + abortNonResumableClone(status); } + // Re-throw all query errors for resumable (4.4) queries. throw; } } @@ -249,6 +273,14 @@ void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) { } } + // If this is 'true', it means that something happened to our remote cursor for a reason other + // than the collection being dropped, all while we were running a non-resumable (4.2) clone. + // We must abort initial sync in that case. + if (_lostNonResumableCursor) { + // This will be caught in runQuery(). + uasserted(ErrorCodes::InitialSyncFailure, "Lost remote cursor"); + } + if (_firstBatchOfQueryRound && _resumeSupported) { // Store the cursorId of the remote cursor. _remoteCursorId = iter.getCursorId(); @@ -354,7 +386,7 @@ void CollectionCloner::killOldQueryCursor() { auto id = _remoteCursorId; auto cmdObj = BSON("killCursors" << nss.coll() << "cursors" << BSON_ARRAY(id)); - + LOG(1) << "Attempting to kill old remote cursor with id: " << id; try { getClient()->runCommand(nss.db().toString(), cmdObj, infoObj); } catch (...) { @@ -365,6 +397,19 @@ void CollectionCloner::killOldQueryCursor() { _remoteCursorId = -1; } +void CollectionCloner::forgetOldQueryCursor() { + _remoteCursorId = -1; +} + +// Throws. +void CollectionCloner::abortNonResumableClone(const Status& status) { + invariant(!_resumeSupported); + log() << "Error during non-resumable clone: " << status; + std::string message = str::stream() + << "Collection clone failed and is not resumable. nss: " << _sourceNss; + uasserted(ErrorCodes::InitialSyncFailure, message); +} + CollectionCloner::Stats CollectionCloner::getStats() const { stdx::lock_guard<Latch> lk(_mutex); return _stats; diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index 764fcefb6dd..479015bef45 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -140,8 +140,22 @@ private: : CollectionClonerStage(name, cloner, stageFunc) {} bool isTransientError(const Status& status) override { + if (isCursorError(status)) { + // We have already lost this cursor so do not try to kill it. + getCloner()->forgetOldQueryCursor(); + return true; + } return ErrorCodes::isRetriableError(status); } + + static bool isCursorError(const Status& status) { + // Our cursor was killed due to changes on the remote collection. + if ((status == ErrorCodes::CursorNotFound) || (status == ErrorCodes::OperationFailed) || + (status == ErrorCodes::QueryPlanKilled)) { + return true; + } + return false; + } }; std::string describeForFuzzer(BaseClonerStage* stage) const final { @@ -210,6 +224,18 @@ private: */ void killOldQueryCursor(); + /** + * Clears the stored id of the remote cursor so that we do not attempt to kill it. + * We call this when we know it has already been killed by the sync source itself. + */ + void forgetOldQueryCursor(); + + /** + * Used to terminate the clone when we encounter a fatal error during a non-resumable query. + * Throws. + */ + void abortNonResumableClone(const Status& status); + // All member variables are labeled with one of the following codes indicating the // synchronization rules for accessing them. // @@ -253,6 +279,11 @@ private: // If true, it means we are starting a new query or resuming an interrupted one. bool _firstBatchOfQueryRound = true; // (X) + + // Only set during non-resumable (4.2) queries. + // Signifies that there were changes to the collection on the sync source that resulted in + // our remote cursor getting killed. + bool _lostNonResumableCursor = false; // (X) }; } // namespace repl diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index b6c4ce672d8..5f00b44bc30 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -1105,5 +1105,159 @@ TEST_F(CollectionClonerTest, ResumableQueryTwoResumes) { ASSERT_EQUALS(7u, stats.documentsCopied); } +// We receive a QueryPlanKilled error, then a NamespaceNotFound error, indicating that the +// collection no longer exists in the database. +TEST_F(CollectionClonerTest, NonResumableCursorErrorDropOK) { + // Set client wireVersion to 4.2, where we do not yet support resumable cloning. + _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS, + WireVersion::SHARDED_TRANSACTIONS); + + // Set up data for preliminary stages + auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" + << "_id_"); + _mockServer->setCommandReply("count", createCountResponse(3)); + _mockServer->setCommandReply("listIndexes", + createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec))); + + auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); + auto timesEnteredBeforeStage = beforeStageFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}")); + + auto beforeRetryFailPoint = globalFailPointRegistry().find("hangBeforeRetryingClonerStage"); + auto timesEnteredBeforeRetry = beforeRetryFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}")); + + // Set up documents to be returned from upstream node. + _mockServer->insert(_nss.ns(), BSON("_id" << 1)); + _mockServer->insert(_nss.ns(), BSON("_id" << 2)); + _mockServer->insert(_nss.ns(), BSON("_id" << 3)); + + auto cloner = makeCollectionCloner(); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + ASSERT_OK(cloner->run()); + }); + + // Wait until we get to the query stage. + beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1); + + // This will cause the next batch to fail once (transiently), but we do not support resume. + auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore"); + failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'QueryPlanKilled'}")); + + // Let us begin with the query stage. + beforeStageFailPoint->setMode(FailPoint::off, 0); + beforeRetryFailPoint->waitForTimesEntered(timesEnteredBeforeRetry + 1); + + // Follow-up the QueryPlanKilled error with a NamespaceNotFound. + failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'NamespaceNotFound'}")); + + beforeRetryFailPoint->setMode(FailPoint::off, 0); + clonerThread.join(); +} + +// We receive an OperationFailed error, but the next error we receive is _not_ NamespaceNotFound, +// which means the collection still exists in the database, but we cannot resume the query. +TEST_F(CollectionClonerTest, NonResumableCursorErrorThenOtherError) { + // Set client wireVersion to 4.2, where we do not yet support resumable cloning. + _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS, + WireVersion::SHARDED_TRANSACTIONS); + + // Set up data for preliminary stages + auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" + << "_id_"); + _mockServer->setCommandReply("count", createCountResponse(3)); + _mockServer->setCommandReply("listIndexes", + createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec))); + + auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); + auto timesEnteredBeforeStage = beforeStageFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}")); + + auto beforeRetryFailPoint = globalFailPointRegistry().find("hangBeforeRetryingClonerStage"); + auto timesEnteredBeforeRetry = beforeRetryFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}")); + + // Set up documents to be returned from upstream node. + _mockServer->insert(_nss.ns(), BSON("_id" << 1)); + _mockServer->insert(_nss.ns(), BSON("_id" << 2)); + _mockServer->insert(_nss.ns(), BSON("_id" << 3)); + + auto cloner = makeCollectionCloner(); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + auto status = cloner->run(); + ASSERT_EQUALS(ErrorCodes::InitialSyncFailure, status); + ASSERT_STRING_CONTAINS(status.reason(), "Collection clone failed and is not resumable"); + }); + + // Wait until we get to the query stage. + beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1); + + // This will cause the next batch to fail once (transiently), but we do not support resume. + auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore"); + failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'OperationFailed'}")); + + // Let us begin with the query stage. + beforeStageFailPoint->setMode(FailPoint::off, 0); + beforeRetryFailPoint->waitForTimesEntered(timesEnteredBeforeRetry + 1); + + // Follow-up the QueryPlanKilled error with a NamespaceNotFound. + failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'UnknownError'}")); + + beforeRetryFailPoint->setMode(FailPoint::off, 0); + clonerThread.join(); +} + +// We receive a CursorNotFound error, but the next query succeeds, indicating that the +// collection still exists in the database, but we cannot resume the query. +TEST_F(CollectionClonerTest, NonResumableCursorErrorThenSuccessEqualsFailure) { + // Set client wireVersion to 4.2, where we do not yet support resumable cloning. + _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS, + WireVersion::SHARDED_TRANSACTIONS); + + // Set up data for preliminary stages + auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" + << "_id_"); + _mockServer->setCommandReply("count", createCountResponse(3)); + _mockServer->setCommandReply("listIndexes", + createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec))); + + auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); + auto timesEnteredBeforeStage = beforeStageFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}")); + + // Set up documents to be returned from upstream node. + _mockServer->insert(_nss.ns(), BSON("_id" << 1)); + _mockServer->insert(_nss.ns(), BSON("_id" << 2)); + _mockServer->insert(_nss.ns(), BSON("_id" << 3)); + + auto cloner = makeCollectionCloner(); + cloner->setBatchSize_forTest(2); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + auto status = cloner->run(); + ASSERT_EQUALS(ErrorCodes::InitialSyncFailure, status); + ASSERT_STRING_CONTAINS(status.reason(), "Collection clone failed and is not resumable"); + }); + + // Wait until we get to the query stage. + beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1); + + // This will cause the next batch to fail once (transiently), but we do not support resume. + auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore"); + failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'CursorNotFound'}")); + + // Let us begin with the query stage. We let the next retry succeed this time. + beforeStageFailPoint->setMode(FailPoint::off, 0); + clonerThread.join(); +} + } // namespace repl } // namespace mongo |