summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVesselina Ratcheva <vesselina.ratcheva@10gen.com>2020-01-20 15:57:18 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-01-23 17:40:19 +0000
commit50444adc39207f2f4b3bd261e6fe399fb3cb3a6d (patch)
treeda90f8df470d71345a85dd6af6fedc007a53cbbc
parentdbdef20f64f4a9e6cee83967cbb4019af58d92cb (diff)
downloadmongo-50444adc39207f2f4b3bd261e6fe399fb3cb3a6d.tar.gz
SERVER-43277 Implement resume after collection drop and rename in CollectionCloner query
create mode 100644 jstests/multiVersion/initial_sync_drop_against_last_stable.js create mode 100644 jstests/replsets/initial_sync_rename_collection.js
-rw-r--r--jstests/multiVersion/initial_sync_drop_against_last_stable.js162
-rw-r--r--jstests/replsets/initial_sync_drop_collection.js105
-rw-r--r--jstests/replsets/initial_sync_rename_collection.js207
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp55
-rw-r--r--src/mongo/db/repl/collection_cloner.h31
-rw-r--r--src/mongo/db/repl/collection_cloner_test.cpp154
6 files changed, 661 insertions, 53 deletions
diff --git a/jstests/multiVersion/initial_sync_drop_against_last_stable.js b/jstests/multiVersion/initial_sync_drop_against_last_stable.js
new file mode 100644
index 00000000000..a7969a7f0d6
--- /dev/null
+++ b/jstests/multiVersion/initial_sync_drop_against_last_stable.js
@@ -0,0 +1,162 @@
+/**
+ * Test that CollectionCloner completes without error when a collection is dropped during cloning,
+ * specifically when that sync source is in 4.2.
+ */
+
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js");
+load('jstests/replsets/libs/two_phase_drops.js');
+load("jstests/libs/uuid_util.js");
+
+// Set up replica set with two nodes. We will add a third and force it to sync from the secondary.
+const testName = "initial_sync_drop_against_last_stable";
+const dbName = testName;
+const replTest = new ReplSetTest({
+ name: testName,
+ nodes: [
+ {}, /* primary */
+ {rsConfig: {priority: 0, votes: 0}, binVersion: "last-stable"}, /* sync source */
+ {rsConfig: {priority: 0, votes: 0}} /* initial syncing node */
+ ]
+});
+replTest.startSet();
+replTest.initiate();
+
+const collName = "testcoll";
+const primary = replTest.getPrimary();
+const primaryDB = primary.getDB(dbName);
+const primaryColl = primaryDB[collName];
+const nss = primaryColl.getFullName();
+
+// The sync source.
+const syncSource = replTest.getSecondary();
+
+// The initial syncing node. Places in the test that refer to 'the secondary' refer to this node.
+let secondary = replTest.getSecondaries()[1];
+assert.neq(syncSource, secondary, "initial syncing node should be the third in the set");
+let secondaryDB = secondary.getDB(dbName);
+let secondaryColl = secondaryDB[collName];
+
+// This function adds data to the collection, restarts the secondary node with the given
+// parameters and setting the given failpoint, waits for the failpoint to be hit,
+// drops the collection, then disables the failpoint. It then optionally waits for the
+// expectedLog message and waits for the secondary to complete initial sync, then ensures
+// the collection on the secondary is empty.
+function setupTest({failPoint, extraFailPointData, secondaryStartupParams}) {
+ jsTestLog("Writing data to collection.");
+ assert.commandWorked(primaryColl.insert([{_id: 1}, {_id: 2}]));
+ const data = Object.merge(extraFailPointData || {}, {nss: nss});
+
+ jsTestLog("Restarting secondary with failPoint " + failPoint + " set for " + nss);
+ secondaryStartupParams = secondaryStartupParams || {};
+ secondaryStartupParams['failpoint.' + failPoint] = tojson({mode: 'alwaysOn', data: data});
+ // Force the initial syncing node to sync against the 4.2 secondary.
+ secondaryStartupParams['failpoint.forceSyncSourceCandidate'] =
+ tojson({mode: 'alwaysOn', data: {hostAndPort: syncSource.host}});
+ // Skip clearing initial sync progress after a successful initial sync attempt so that we
+ // can check initialSyncStatus fields after initial sync is complete.
+ secondaryStartupParams['failpoint.skipClearInitialSyncState'] = tojson({mode: 'alwaysOn'});
+ secondaryStartupParams['numInitialSyncAttempts'] = 1;
+ secondary =
+ replTest.restart(secondary, {startClean: true, setParameter: secondaryStartupParams});
+ secondaryDB = secondary.getDB(dbName);
+ secondaryColl = secondaryDB[collName];
+
+ jsTestLog("Waiting for secondary to reach failPoint " + failPoint);
+ assert.commandWorked(secondary.adminCommand({
+ waitForFailPoint: failPoint,
+ timesEntered: 1,
+ maxTimeMS: kDefaultWaitForFailPointTimeout
+ }));
+
+ // Restarting the secondary may have resulted in an election. Wait until the system
+ // stabilizes and reaches RS_STARTUP2 state.
+ replTest.getPrimary();
+ replTest.waitForState(secondary, ReplSetTest.State.STARTUP_2);
+}
+
+function finishTest({failPoint, expectedLog, waitForDrop, createNew}) {
+ // Get the uuid for use in checking the log line.
+ let uuid = getUUIDFromListCollections(primaryDB, collName);
+
+ jsTestLog("Dropping collection on primary: " + primaryColl.getFullName());
+ assert(primaryColl.drop());
+ replTest.awaitReplication(null, null, [syncSource]);
+
+ if (waitForDrop) {
+ jsTestLog("Waiting for drop to commit on primary");
+ TwoPhaseDropCollectionTest.waitForDropToComplete(primaryDB, collName);
+ }
+
+ if (createNew) {
+ jsTestLog("Creating a new collection with the same name: " + primaryColl.getFullName());
+ assert.commandWorked(primaryColl.insert({_id: "not the same collection"}));
+ }
+
+ jsTestLog("Allowing secondary to continue.");
+ assert.commandWorked(secondary.adminCommand({configureFailPoint: failPoint, mode: 'off'}));
+
+ if (expectedLog) {
+ jsTestLog(eval(expectedLog));
+ checkLog.contains(secondary, eval(expectedLog));
+ }
+
+ jsTestLog("Waiting for initial sync to complete.");
+ replTest.waitForState(secondary, ReplSetTest.State.SECONDARY);
+
+ let res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
+ assert.eq(0, res.initialSyncStatus.failedInitialSyncAttempts);
+
+ if (createNew) {
+ assert.eq([{_id: "not the same collection"}], secondaryColl.find().toArray());
+ assert(primaryColl.drop());
+ } else {
+ assert.eq(0, secondaryColl.find().itcount());
+ }
+
+ replTest.checkReplicatedDataHashes();
+}
+
+function runDropTest(params) {
+ setupTest(params);
+ finishTest(params);
+}
+
+jsTestLog("[1] Testing dropping between listIndexes and find.");
+runDropTest({
+ failPoint: "hangBeforeClonerStage",
+ extraFailPointData: {cloner: "CollectionCloner", stage: "query"}
+});
+
+jsTestLog(
+ "[2] Testing dropping between listIndexes and find, with new same-name collection created.");
+runDropTest({
+ failPoint: "hangBeforeClonerStage",
+ extraFailPointData: {cloner: "CollectionCloner", stage: "query"},
+ createNew: true
+});
+
+jsTestLog("[3] Testing committed drop between getMore calls.");
+runDropTest({
+ failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse",
+ secondaryStartupParams: {collectionClonerBatchSize: 1},
+ waitForDrop: true,
+ expectedLog:
+ "`CollectionCloner ns: '${nss}' uuid: ${uuid} stopped because collection was dropped on source.`"
+});
+
+jsTestLog(
+ "[4] Testing committed drop with new same-name collection created, between getMore calls.");
+runDropTest({
+ failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse",
+ secondaryStartupParams: {collectionClonerBatchSize: 1},
+ waitForDrop: true,
+ expectedLog:
+ "`CollectionCloner ns: '${nss}' uuid: ${uuid} stopped because collection was dropped on source.`",
+ createNew: true
+});
+
+replTest.stopSet();
+})(); \ No newline at end of file
diff --git a/jstests/replsets/initial_sync_drop_collection.js b/jstests/replsets/initial_sync_drop_collection.js
index cd6b6aaee08..de375ffc73c 100644
--- a/jstests/replsets/initial_sync_drop_collection.js
+++ b/jstests/replsets/initial_sync_drop_collection.js
@@ -1,16 +1,11 @@
-// Test that CollectionCloner completes without error when a collection is dropped during cloning.
+/**
+ * Test that CollectionCloner completes without error when a collection is dropped during cloning.
+ * @tags: [requires_fcv_44]
+ */
(function() {
"use strict";
-// TODO(SERVER-43277): This test is disabled while we work on the Resumable Initial Sync
-// project
-/* eslint-disable no-unreachable */
-return;
-
-// Skip db hash check because secondary cannot complete initial sync.
-TestData.skipCheckDBHashes = true;
-
load("jstests/libs/fail_point_util.js");
load('jstests/replsets/libs/two_phase_drops.js');
load("jstests/libs/uuid_util.js");
@@ -30,7 +25,6 @@ var secondaryDB = secondary.getDB(dbName);
const collName = "testcoll";
var primaryColl = primaryDB[collName];
var secondaryColl = secondaryDB[collName];
-var pRenameColl = primaryDB["r_" + collName];
var nss = primaryColl.getFullName();
// This function adds data to the collection, restarts the secondary node with the given
@@ -38,13 +32,14 @@ var nss = primaryColl.getFullName();
// drops the collection, then disables the failpoint. It then optionally waits for the
// expectedLog message and waits for the secondary to complete initial sync, then ensures
// the collection on the secondary is empty.
-function setupTest({failPoint, secondaryStartupParams}) {
+function setupTest({failPoint, extraFailPointData, secondaryStartupParams}) {
jsTestLog("Writing data to collection.");
assert.commandWorked(primaryColl.insert([{_id: 1}, {_id: 2}]));
+ const data = Object.merge(extraFailPointData || {}, {nss: nss});
jsTestLog("Restarting secondary with failPoint " + failPoint + " set for " + nss);
secondaryStartupParams = secondaryStartupParams || {};
- secondaryStartupParams['failpoint.' + failPoint] = tojson({mode: 'alwaysOn', data: {nss: nss}});
+ secondaryStartupParams['failpoint.' + failPoint] = tojson({mode: 'alwaysOn', data: data});
// Skip clearing initial sync progress after a successful initial sync attempt so that we
// can check initialSyncStatus fields after initial sync is complete.
secondaryStartupParams['failpoint.skipClearInitialSyncState'] = tojson({mode: 'alwaysOn'});
@@ -67,9 +62,9 @@ function setupTest({failPoint, secondaryStartupParams}) {
replTest.waitForState(secondary, ReplSetTest.State.STARTUP_2);
}
-function finishTest({failPoint, secondaryStartupParams, expectedLog, waitForDrop, createNew}) {
+function finishTest({failPoint, expectedLog, waitForDrop, createNew}) {
// Get the uuid for use in checking the log line.
- let uuid = getUUIDFromListCollections(primaryDB, collName);
+ const uuid = extractUUIDFromObject(getUUIDFromListCollections(primaryDB, collName));
jsTestLog("Dropping collection on primary: " + primaryColl.getFullName());
assert(primaryColl.drop());
@@ -79,6 +74,16 @@ function finishTest({failPoint, secondaryStartupParams, expectedLog, waitForDrop
TwoPhaseDropCollectionTest.waitForDropToComplete(primaryDB, collName);
}
+ // Only set for test cases that use 'system.drop' namespaces when dropping collections.
+ // In those tests the variable 'rnss' represents such a namespace. Used for expectedLog.
+ // See test cases 3 and 4 below.
+ let rnss;
+ const dropPendingColl =
+ TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(primaryDB, collName);
+ if (dropPendingColl) {
+ rnss = dbName + "." + dropPendingColl["name"];
+ }
+
if (createNew) {
jsTestLog("Creating a new collection with the same name: " + primaryColl.getFullName());
assert.commandWorked(primaryColl.insert({_id: "not the same collection"}));
@@ -112,68 +117,72 @@ function runDropTest(params) {
finishTest(params);
}
-jsTestLog("Testing dropping between listIndexes and find.");
-runDropTest({failPoint: "initialSyncHangCollectionClonerBeforeEstablishingCursor"});
+jsTestLog("[1] Testing dropping between listIndexes and find.");
+runDropTest({
+ failPoint: "hangBeforeClonerStage",
+ extraFailPointData: {cloner: "CollectionCloner", stage: "query"}
+});
+
+jsTestLog(
+ "[2] Testing dropping between listIndexes and find, with new same-name collection created.");
+runDropTest({
+ failPoint: "hangBeforeClonerStage",
+ extraFailPointData: {cloner: "CollectionCloner", stage: "query"},
+ createNew: true
+});
+
+let expectedLogFor3and4 =
+ "`CollectionCloner ns: '${nss}' uuid: UUID(\"${uuid}\") stopped because collection was dropped on source.`";
-jsTestLog("Testing dropping between listIndexes and find, with new same-name collection created.");
-runDropTest(
- {failPoint: "initialSyncHangCollectionClonerBeforeEstablishingCursor", createNew: true});
+// We don't support 4.2 style two-phase drops with EMRC=false - in that configuration, the
+// collection will instead be renamed to a <db>.system.drop.* namespace before being dropped. Since
+// the cloner queries collection by UUID, it will observe the first drop phase as a rename.
+// We still want to check that initial sync succeeds in such a case.
+if (TwoPhaseDropCollectionTest.supportsDropPendingNamespaces(replTest)) {
+ expectedLogFor3and4 =
+ "`Initial Sync retrying CollectionCloner stage query due to QueryPlanKilled: collection renamed from '${nss}' to '${rnss}'. UUID ${uuid}`";
+}
-jsTestLog("Testing drop-pending between getMore calls.");
+jsTestLog("[3] Testing drop-pending between getMore calls.");
runDropTest({
failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse",
secondaryStartupParams: {collectionClonerBatchSize: 1},
- expectedLog:
- "`CollectionCloner ns: '${nss}' uuid: ${uuid} stopped because collection was dropped.`"
+ expectedLog: expectedLogFor3and4
});
-jsTestLog("Testing drop-pending with new same-name collection created, between getMore calls.");
+jsTestLog("[4] Testing drop-pending with new same-name collection created, between getMore calls.");
runDropTest({
failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse",
secondaryStartupParams: {collectionClonerBatchSize: 1},
- expectedLog:
- "`CollectionCloner ns: '${nss}' uuid: ${uuid} stopped because collection was dropped.`",
+ expectedLog: expectedLogFor3and4,
createNew: true
});
-jsTestLog("Testing committed drop between getMore calls.");
-
// Add another node to the set, so when we drop the collection it can commit. This other
// secondary will be finished with initial sync when the drop happens.
var secondary2 = replTest.add({rsConfig: {priority: 0}});
replTest.reInitiate();
replTest.waitForState(secondary2, ReplSetTest.State.SECONDARY);
+jsTestLog("[5] Testing committed drop between getMore calls.");
runDropTest({
failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse",
secondaryStartupParams: {collectionClonerBatchSize: 1},
waitForDrop: true,
expectedLog:
- "`CollectionCloner ns: '${nss}' uuid: ${uuid} stopped because collection was dropped.`"
+ "`CollectionCloner ns: '${nss}' uuid: UUID(\"${uuid}\") stopped because collection was dropped on source.`"
});
-jsTestLog("Testing rename between getMores.");
-setupTest({
+jsTestLog(
+ "[6] Testing committed drop with new same-name collection created, between getMore calls.");
+runDropTest({
failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse",
secondaryStartupParams: {collectionClonerBatchSize: 1},
+ waitForDrop: true,
+ expectedLog:
+ "`CollectionCloner ns: '${nss}' uuid: UUID(\"${uuid}\") stopped because collection was dropped on source.`",
+ createNew: true
});
-jsTestLog("Renaming collection on primary");
-assert.commandWorked(primary.adminCommand({
- renameCollection: primaryColl.getFullName(),
- to: pRenameColl.getFullName(),
- dropTarget: false
-}));
-
-jsTestLog("Allowing secondary to continue.");
-// Make sure we don't reach the fassert() indicating initial sync failure.
-assert.commandWorked(
- secondary.adminCommand({configureFailPoint: "initialSyncHangBeforeFinish", mode: 'alwaysOn'}));
-
-assert.commandWorked(secondary.adminCommand({
- configureFailPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse",
- mode: 'off'
-}));
-jsTestLog("Waiting for initial sync to complete.");
-checkLog.contains(secondary, "The maximum number of retries have been exhausted for initial sync.");
+
replTest.stopSet();
})();
diff --git a/jstests/replsets/initial_sync_rename_collection.js b/jstests/replsets/initial_sync_rename_collection.js
new file mode 100644
index 00000000000..cc07397cd90
--- /dev/null
+++ b/jstests/replsets/initial_sync_rename_collection.js
@@ -0,0 +1,207 @@
+/**
+ * Test that CollectionCloner completes without error when a collection is renamed during cloning.
+ * @tags: [requires_fcv_44]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js");
+load("jstests/libs/uuid_util.js");
+load('jstests/replsets/libs/two_phase_drops.js');
+
+// Set up replica set. Disallow chaining so nodes always sync from primary.
+const testName = "initial_sync_rename_collection";
+const dbName = testName;
+const replTest = new ReplSetTest(
+ {name: testName, nodes: [{}, {rsConfig: {priority: 0}}], settings: {chainingAllowed: false}});
+replTest.startSet();
+replTest.initiate();
+
+const collName = "testcoll";
+const primary = replTest.getPrimary();
+const primaryDB = primary.getDB(dbName);
+const primaryColl = primaryDB[collName];
+const pRenameColl = primaryDB["r_" + collName];
+
+// Used for cross-DB renames.
+const secondDbName = testName + "_cross";
+const primarySecondDB = primary.getDB(secondDbName);
+const pCrossDBRenameColl = primarySecondDB[collName + "_cross"];
+
+const nss = primaryColl.getFullName();
+const rnss = pRenameColl.getFullName();
+let secondary = replTest.getSecondary();
+let secondaryDB = secondary.getDB(dbName);
+let secondaryColl = secondaryDB[collName];
+
+// This function adds data to the collection, restarts the secondary node with the given
+// parameters and setting the given failpoint, waits for the failpoint to be hit,
+// renames the collection, then disables the failpoint. It then optionally waits for the
+// expectedLog message and waits for the secondary to complete initial sync, then ensures
+// the collection on the secondary has been properly cloned.
+function setupTest({failPoint, extraFailPointData, secondaryStartupParams}) {
+ jsTestLog("Writing data to collection.");
+ assert.commandWorked(primaryColl.insert([{_id: 1}, {_id: 2}]));
+ const data = Object.merge(extraFailPointData || {}, {nss: nss});
+
+ jsTestLog("Restarting secondary with failPoint " + failPoint + " set for " + nss);
+ secondaryStartupParams = secondaryStartupParams || {};
+ secondaryStartupParams['failpoint.' + failPoint] = tojson({mode: 'alwaysOn', data: data});
+ // Skip clearing initial sync progress after a successful initial sync attempt so that we
+ // can check initialSyncStatus fields after initial sync is complete.
+ secondaryStartupParams['failpoint.skipClearInitialSyncState'] = tojson({mode: 'alwaysOn'});
+ secondaryStartupParams['numInitialSyncAttempts'] = 1;
+ secondary =
+ replTest.restart(secondary, {startClean: true, setParameter: secondaryStartupParams});
+ secondaryDB = secondary.getDB(dbName);
+ secondaryColl = secondaryDB[collName];
+
+ jsTestLog("Waiting for secondary to reach failPoint " + failPoint);
+ assert.commandWorked(secondary.adminCommand({
+ waitForFailPoint: failPoint,
+ timesEntered: 1,
+ maxTimeMS: kDefaultWaitForFailPointTimeout
+ }));
+
+ // Restarting the secondary may have resulted in an election. Wait until the system
+ // stabilizes and reaches RS_STARTUP2 state.
+ replTest.getPrimary();
+ replTest.waitForState(secondary, ReplSetTest.State.STARTUP_2);
+}
+
+function finishTest({failPoint, expectedLog, createNew, renameAcrossDBs}) {
+ // Get the uuid for use in checking the log line.
+ const uuid = extractUUIDFromObject(getUUIDFromListCollections(primaryDB, collName));
+ const target = (renameAcrossDBs ? pCrossDBRenameColl : pRenameColl);
+
+ jsTestLog("Renaming collection on primary: " + target.getFullName());
+ assert.commandWorked(primary.adminCommand({
+ renameCollection: primaryColl.getFullName(),
+ to: target.getFullName(),
+ dropTarget: false
+ }));
+
+ // Only set for test cases that use 'system.drop' namespaces when dropping collections.
+ // In those tests the variable 'dropPendingNss' represents such a namespace. Used for
+ // expectedLog. See test cases 6 and 8 below.
+ let dropPendingNss;
+ const dropPendingColl =
+ TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(primaryDB, collName);
+ if (dropPendingColl) {
+ dropPendingNss = dbName + "." + dropPendingColl["name"];
+ }
+
+ if (createNew) {
+ jsTestLog("Creating a new collection with the same name: " + primaryColl.getFullName());
+ assert.commandWorked(primaryColl.insert({_id: "not the same collection"}));
+ }
+
+ jsTestLog("Allowing secondary to continue.");
+ assert.commandWorked(secondary.adminCommand({configureFailPoint: failPoint, mode: 'off'}));
+
+ if (expectedLog) {
+ jsTestLog(eval(expectedLog));
+ checkLog.contains(secondary, eval(expectedLog));
+ }
+
+ jsTestLog("Waiting for initial sync to complete.");
+ replTest.waitForState(secondary, ReplSetTest.State.SECONDARY);
+
+ let res = assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1}));
+ assert.eq(0, res.initialSyncStatus.failedInitialSyncAttempts);
+
+ if (createNew) {
+ assert.eq([{_id: "not the same collection"}], secondaryColl.find().toArray());
+ assert(primaryColl.drop());
+ } else {
+ assert.eq(0, secondaryColl.find().itcount());
+ }
+ replTest.checkReplicatedDataHashes();
+
+ // Drop the renamed collection so we can start fresh the next time around.
+ assert(target.drop());
+}
+
+function runRenameTest(params) {
+ setupTest(params);
+ finishTest(params);
+}
+
+jsTestLog("[1] Testing rename between listIndexes and find.");
+runRenameTest({
+ failPoint: "hangBeforeClonerStage",
+ extraFailPointData: {cloner: "CollectionCloner", stage: "query"}
+});
+
+jsTestLog("[2] Testing cross-DB rename between listIndexes and find.");
+runRenameTest({
+ failPoint: "hangBeforeClonerStage",
+ extraFailPointData: {cloner: "CollectionCloner", stage: "query"},
+ renameAcrossDBs: true
+});
+
+jsTestLog(
+ "[3] Testing rename between listIndexes and find, with new same-name collection created.");
+runRenameTest({
+ failPoint: "hangBeforeClonerStage",
+ extraFailPointData: {cloner: "CollectionCloner", stage: "query"},
+ createNew: true
+});
+
+jsTestLog(
+ "[4] Testing cross-DB rename between listIndexes and find, with new same-name collection created.");
+runRenameTest({
+ failPoint: "hangBeforeClonerStage",
+ extraFailPointData: {cloner: "CollectionCloner", stage: "query"},
+ createNew: true,
+ renameAcrossDBs: true
+});
+
+jsTestLog("[5] Testing rename between getMores.");
+runRenameTest({
+ failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse",
+ secondaryStartupParams: {collectionClonerBatchSize: 1},
+ expectedLog:
+ "`Initial Sync retrying CollectionCloner stage query due to QueryPlanKilled: collection renamed from '${nss}' to '${rnss}'. UUID ${uuid}`"
+});
+
+// A cross-DB rename will appear as a drop in the context of the source DB.
+let expectedLogFor6and8 =
+ "`CollectionCloner ns: '${nss}' uuid: UUID(\"${uuid}\") stopped because collection was dropped on source.`";
+
+// We don't support 4.2 style two-phase drops with EMRC=false - in that configuration, the
+// collection will instead be renamed to a <db>.system.drop.* namespace before being dropped. Since
+// the cloner queries collection by UUID, it will observe the first drop phase as a rename.
+// We still want to check that initial sync succeeds in such a case.
+if (TwoPhaseDropCollectionTest.supportsDropPendingNamespaces(replTest)) {
+ expectedLogFor6and8 =
+ "`Initial Sync retrying CollectionCloner stage query due to QueryPlanKilled: collection renamed from '${nss}' to '${dropPendingNss}'. UUID ${uuid}`";
+}
+
+jsTestLog("[6] Testing cross-DB rename between getMores.");
+runRenameTest({
+ failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse",
+ secondaryStartupParams: {collectionClonerBatchSize: 1},
+ renameAcrossDBs: true,
+ expectedLog: expectedLogFor6and8
+});
+
+jsTestLog("[7] Testing rename with new same-name collection created, between getMores.");
+runRenameTest({
+ failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse",
+ secondaryStartupParams: {collectionClonerBatchSize: 1},
+ expectedLog:
+ "`Initial Sync retrying CollectionCloner stage query due to QueryPlanKilled: collection renamed from '${nss}' to '${rnss}'. UUID ${uuid}`"
+});
+
+jsTestLog("[8] Testing cross-DB rename with new same-name collection created, between getMores.");
+runRenameTest({
+ failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse",
+ secondaryStartupParams: {collectionClonerBatchSize: 1},
+ renameAcrossDBs: true,
+ expectedLog: expectedLogFor6and8
+});
+
+replTest.stopSet();
+})(); \ No newline at end of file
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index 41968073f89..4d73098c787 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -200,10 +200,12 @@ void CollectionCloner::runQuery() {
if (_resumeSupported) {
if (_resumeToken) {
// Resume the query from where we left off.
+ LOG(1) << "Collection cloner will resume the last successful query";
query = QUERY("query" << BSONObj() << "$readOnce" << true << "$_requestResumeToken"
<< true << "$_resumeAfter" << _resumeToken.get());
} else {
// New attempt at a resumable query.
+ LOG(1) << "Collection cloner will run a new query";
query = QUERY("query" << BSONObj() << "$readOnce" << true << "$_requestResumeToken"
<< true);
}
@@ -226,13 +228,35 @@ void CollectionCloner::runQuery() {
} catch (...) {
auto status = exceptionToStatus();
+ // If the collection was dropped at any point, we can just move on to the next cloner.
+ // This applies to both resumable (4.4) and non-resumable (4.2) queries.
+ if (status == ErrorCodes::NamespaceNotFound) {
+ throw; // This will re-throw the NamespaceNotFound, resulting in a clean exit.
+ }
+
+ // Wire version 4.2 only.
if (!_resumeSupported) {
- std::string message = str::stream()
- << "Collection clone failed and is not resumable. nss: " << _sourceNss;
- log() << message;
- uasserted(ErrorCodes::InitialSyncFailure, message);
+ // If we lost our cursor last round, the only time we can can continue is if we find out
+ // this round that the collection was dropped on the source (that scenario is covered
+ // right above). If that is not the case, then the cloner would have more work to do,
+ // but since we cannot resume the query, we must abort initial sync.
+ if (_lostNonResumableCursor) {
+ abortNonResumableClone(status);
+ }
+
+ // Collection has changed upstream. This will trigger the code block above next round,
+ // (unless we find out the collection was dropped via getting a NamespaceNotFound).
+ if (_queryStage.isCursorError(status)) {
+ log() << "Lost cursor during non-resumable query: " << status;
+ _lostNonResumableCursor = true;
+ throw;
+ }
+ // Any other errors (including network errors, but excluding NamespaceNotFound) result
+ // in immediate failure.
+ abortNonResumableClone(status);
}
+ // Re-throw all query errors for resumable (4.4) queries.
throw;
}
}
@@ -249,6 +273,14 @@ void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) {
}
}
+ // If this is 'true', it means that something happened to our remote cursor for a reason other
+ // than the collection being dropped, all while we were running a non-resumable (4.2) clone.
+ // We must abort initial sync in that case.
+ if (_lostNonResumableCursor) {
+ // This will be caught in runQuery().
+ uasserted(ErrorCodes::InitialSyncFailure, "Lost remote cursor");
+ }
+
if (_firstBatchOfQueryRound && _resumeSupported) {
// Store the cursorId of the remote cursor.
_remoteCursorId = iter.getCursorId();
@@ -354,7 +386,7 @@ void CollectionCloner::killOldQueryCursor() {
auto id = _remoteCursorId;
auto cmdObj = BSON("killCursors" << nss.coll() << "cursors" << BSON_ARRAY(id));
-
+ LOG(1) << "Attempting to kill old remote cursor with id: " << id;
try {
getClient()->runCommand(nss.db().toString(), cmdObj, infoObj);
} catch (...) {
@@ -365,6 +397,19 @@ void CollectionCloner::killOldQueryCursor() {
_remoteCursorId = -1;
}
+void CollectionCloner::forgetOldQueryCursor() {
+ _remoteCursorId = -1;
+}
+
+// Throws.
+void CollectionCloner::abortNonResumableClone(const Status& status) {
+ invariant(!_resumeSupported);
+ log() << "Error during non-resumable clone: " << status;
+ std::string message = str::stream()
+ << "Collection clone failed and is not resumable. nss: " << _sourceNss;
+ uasserted(ErrorCodes::InitialSyncFailure, message);
+}
+
CollectionCloner::Stats CollectionCloner::getStats() const {
stdx::lock_guard<Latch> lk(_mutex);
return _stats;
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index 764fcefb6dd..479015bef45 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -140,8 +140,22 @@ private:
: CollectionClonerStage(name, cloner, stageFunc) {}
bool isTransientError(const Status& status) override {
+ if (isCursorError(status)) {
+ // We have already lost this cursor so do not try to kill it.
+ getCloner()->forgetOldQueryCursor();
+ return true;
+ }
return ErrorCodes::isRetriableError(status);
}
+
+ static bool isCursorError(const Status& status) {
+ // Our cursor was killed due to changes on the remote collection.
+ if ((status == ErrorCodes::CursorNotFound) || (status == ErrorCodes::OperationFailed) ||
+ (status == ErrorCodes::QueryPlanKilled)) {
+ return true;
+ }
+ return false;
+ }
};
std::string describeForFuzzer(BaseClonerStage* stage) const final {
@@ -210,6 +224,18 @@ private:
*/
void killOldQueryCursor();
+ /**
+ * Clears the stored id of the remote cursor so that we do not attempt to kill it.
+ * We call this when we know it has already been killed by the sync source itself.
+ */
+ void forgetOldQueryCursor();
+
+ /**
+ * Used to terminate the clone when we encounter a fatal error during a non-resumable query.
+ * Throws.
+ */
+ void abortNonResumableClone(const Status& status);
+
// All member variables are labeled with one of the following codes indicating the
// synchronization rules for accessing them.
//
@@ -253,6 +279,11 @@ private:
// If true, it means we are starting a new query or resuming an interrupted one.
bool _firstBatchOfQueryRound = true; // (X)
+
+ // Only set during non-resumable (4.2) queries.
+ // Signifies that there were changes to the collection on the sync source that resulted in
+ // our remote cursor getting killed.
+ bool _lostNonResumableCursor = false; // (X)
};
} // namespace repl
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp
index b6c4ce672d8..5f00b44bc30 100644
--- a/src/mongo/db/repl/collection_cloner_test.cpp
+++ b/src/mongo/db/repl/collection_cloner_test.cpp
@@ -1105,5 +1105,159 @@ TEST_F(CollectionClonerTest, ResumableQueryTwoResumes) {
ASSERT_EQUALS(7u, stats.documentsCopied);
}
+// We receive a QueryPlanKilled error, then a NamespaceNotFound error, indicating that the
+// collection no longer exists in the database.
+TEST_F(CollectionClonerTest, NonResumableCursorErrorDropOK) {
+ // Set client wireVersion to 4.2, where we do not yet support resumable cloning.
+ _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS,
+ WireVersion::SHARDED_TRANSACTIONS);
+
+ // Set up data for preliminary stages
+ auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_");
+ _mockServer->setCommandReply("count", createCountResponse(3));
+ _mockServer->setCommandReply("listIndexes",
+ createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec)));
+
+ auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
+ auto timesEnteredBeforeStage = beforeStageFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}"));
+
+ auto beforeRetryFailPoint = globalFailPointRegistry().find("hangBeforeRetryingClonerStage");
+ auto timesEnteredBeforeRetry = beforeRetryFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}"));
+
+ // Set up documents to be returned from upstream node.
+ _mockServer->insert(_nss.ns(), BSON("_id" << 1));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 2));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 3));
+
+ auto cloner = makeCollectionCloner();
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ ASSERT_OK(cloner->run());
+ });
+
+ // Wait until we get to the query stage.
+ beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1);
+
+ // This will cause the next batch to fail once (transiently), but we do not support resume.
+ auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore");
+ failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'QueryPlanKilled'}"));
+
+ // Let us begin with the query stage.
+ beforeStageFailPoint->setMode(FailPoint::off, 0);
+ beforeRetryFailPoint->waitForTimesEntered(timesEnteredBeforeRetry + 1);
+
+ // Follow-up the QueryPlanKilled error with a NamespaceNotFound.
+ failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'NamespaceNotFound'}"));
+
+ beforeRetryFailPoint->setMode(FailPoint::off, 0);
+ clonerThread.join();
+}
+
+// We receive an OperationFailed error, but the next error we receive is _not_ NamespaceNotFound,
+// which means the collection still exists in the database, but we cannot resume the query.
+TEST_F(CollectionClonerTest, NonResumableCursorErrorThenOtherError) {
+ // Set client wireVersion to 4.2, where we do not yet support resumable cloning.
+ _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS,
+ WireVersion::SHARDED_TRANSACTIONS);
+
+ // Set up data for preliminary stages
+ auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_");
+ _mockServer->setCommandReply("count", createCountResponse(3));
+ _mockServer->setCommandReply("listIndexes",
+ createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec)));
+
+ auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
+ auto timesEnteredBeforeStage = beforeStageFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}"));
+
+ auto beforeRetryFailPoint = globalFailPointRegistry().find("hangBeforeRetryingClonerStage");
+ auto timesEnteredBeforeRetry = beforeRetryFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}"));
+
+ // Set up documents to be returned from upstream node.
+ _mockServer->insert(_nss.ns(), BSON("_id" << 1));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 2));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 3));
+
+ auto cloner = makeCollectionCloner();
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ auto status = cloner->run();
+ ASSERT_EQUALS(ErrorCodes::InitialSyncFailure, status);
+ ASSERT_STRING_CONTAINS(status.reason(), "Collection clone failed and is not resumable");
+ });
+
+ // Wait until we get to the query stage.
+ beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1);
+
+ // This will cause the next batch to fail once (transiently), but we do not support resume.
+ auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore");
+ failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'OperationFailed'}"));
+
+ // Let us begin with the query stage.
+ beforeStageFailPoint->setMode(FailPoint::off, 0);
+ beforeRetryFailPoint->waitForTimesEntered(timesEnteredBeforeRetry + 1);
+
+ // Follow-up the QueryPlanKilled error with a NamespaceNotFound.
+ failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'UnknownError'}"));
+
+ beforeRetryFailPoint->setMode(FailPoint::off, 0);
+ clonerThread.join();
+}
+
+// We receive a CursorNotFound error, but the next query succeeds, indicating that the
+// collection still exists in the database, but we cannot resume the query.
+TEST_F(CollectionClonerTest, NonResumableCursorErrorThenSuccessEqualsFailure) {
+ // Set client wireVersion to 4.2, where we do not yet support resumable cloning.
+ _mockClient->setWireVersions(WireVersion::SHARDED_TRANSACTIONS,
+ WireVersion::SHARDED_TRANSACTIONS);
+
+ // Set up data for preliminary stages
+ auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_");
+ _mockServer->setCommandReply("count", createCountResponse(3));
+ _mockServer->setCommandReply("listIndexes",
+ createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec)));
+
+ auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
+ auto timesEnteredBeforeStage = beforeStageFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'CollectionCloner', stage: 'query'}"));
+
+ // Set up documents to be returned from upstream node.
+ _mockServer->insert(_nss.ns(), BSON("_id" << 1));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 2));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 3));
+
+ auto cloner = makeCollectionCloner();
+ cloner->setBatchSize_forTest(2);
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ auto status = cloner->run();
+ ASSERT_EQUALS(ErrorCodes::InitialSyncFailure, status);
+ ASSERT_STRING_CONTAINS(status.reason(), "Collection clone failed and is not resumable");
+ });
+
+ // Wait until we get to the query stage.
+ beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1);
+
+ // This will cause the next batch to fail once (transiently), but we do not support resume.
+ auto failNextBatch = globalFailPointRegistry().find("mockCursorThrowErrorOnGetMore");
+ failNextBatch->setMode(FailPoint::nTimes, 1, fromjson("{errorType: 'CursorNotFound'}"));
+
+ // Let us begin with the query stage. We let the next retry succeed this time.
+ beforeStageFailPoint->setMode(FailPoint::off, 0);
+ clonerThread.join();
+}
+
} // namespace repl
} // namespace mongo