summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2017-11-28 11:26:14 -0500
committerMatthew Russotto <matthew.russotto@10gen.com>2018-01-03 12:03:10 -0500
commitbe7e525b14dddf56fbec90190da60dc020abb0f9 (patch)
treea4265016dd54a18217d8acc1c2435e23562a8edf
parentd6b8d447e900164341611cd34ac2fb69a1e87634 (diff)
downloadmongo-be7e525b14dddf56fbec90190da60dc020abb0f9.tar.gz
SERVER-31267 CollectionCloner fails if collection is dropped between getMore calls
(cherry picked from commit 43ab81ebc79de03844e55ff92224bdfb69e050f1)
-rw-r--r--jstests/replsets/initial_sync_drop_collection.js158
-rw-r--r--jstests/replsets/libs/two_phase_drops.js8
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp112
-rw-r--r--src/mongo/db/repl/collection_cloner.h12
-rw-r--r--src/mongo/db/repl/database_cloner.cpp5
5 files changed, 287 insertions, 8 deletions
diff --git a/jstests/replsets/initial_sync_drop_collection.js b/jstests/replsets/initial_sync_drop_collection.js
new file mode 100644
index 00000000000..b4954817afe
--- /dev/null
+++ b/jstests/replsets/initial_sync_drop_collection.js
@@ -0,0 +1,158 @@
+// Test that CollectionCloner completes without error when a collection is dropped during cloning.
+
+(function() {
+ "use strict";
+
+ load("jstests/libs/check_log.js");
+ load('jstests/replsets/libs/two_phase_drops.js');
+ load("jstests/libs/uuid_util.js");
+
+ // Set up replica set.
+ const testName = "initial_sync_drop_collection";
+ const dbName = testName;
+ var replTest = new ReplSetTest({name: testName, nodes: 2});
+ replTest.startSet();
+ replTest.initiate();
+
+ var primary = replTest.getPrimary();
+ var primaryDB = primary.getDB(dbName);
+ var secondary = replTest.getSecondary();
+ var secondaryDB = secondary.getDB(dbName);
+ const collName = "testcoll";
+ var primaryColl = primaryDB[collName];
+ var secondaryColl = secondaryDB[collName];
+ var pRenameColl = primaryDB["r_" + collName];
+ var nss = primaryColl.getFullName();
+
+ // This function adds data to the collection, restarts the secondary node with the given
+ // parameters and setting the given failpoint, waits for the failpoint to be hit,
+ // drops the collection, then disables the failpoint. It then optionally waits for the
+ // expectedLog message and waits for the secondary to complete initial sync, then ensures
+ // the collection on the secondary is empty.
+ function setupTest({failPoint, secondaryStartupParams}) {
+ jsTestLog("Writing data to collection.");
+ assert.writeOK(primaryColl.insert([{_id: 1}, {_id: 2}]));
+
+ jsTestLog("Restarting secondary with failPoint " + failPoint + " set for " + nss);
+ secondaryStartupParams = secondaryStartupParams || {};
+ secondaryStartupParams['failpoint.' + failPoint] =
+ tojson({mode: 'alwaysOn', data: {nss: nss}});
+ replTest.restart(secondary, {startClean: true, setParameter: secondaryStartupParams});
+
+ jsTestLog("Waiting for secondary to reach failPoint " + failPoint);
+ checkLog.contains(secondary, failPoint + " fail point enabled for " + nss);
+ }
+
+ function finishTest({failPoint, secondaryStartupParams, expectedLog, waitForDrop, createNew}) {
+ // Get the uuid for use in checking the log line.
+ let uuid = getUUIDFromListCollections(primaryDB, collName);
+
+ jsTestLog("Dropping collection on primary: " + primaryColl.getFullName());
+ assert(primaryColl.drop());
+
+ if (waitForDrop) {
+ jsTestLog("Waiting for drop to commit on primary");
+ TwoPhaseDropCollectionTest.waitForDropToComplete(primaryDB, collName);
+ }
+
+ if (createNew) {
+ jsTestLog("Creating a new collection with the same name: " + primaryColl.getFullName());
+ assert.writeOK(primaryColl.insert({_id: "not the same collection"}));
+ }
+
+ jsTestLog("Allowing secondary to continue.");
+ assert.commandWorked(secondary.adminCommand({configureFailPoint: failPoint, mode: 'off'}));
+
+ if (expectedLog) {
+ jsTestLog(eval(expectedLog));
+ checkLog.contains(secondary, eval(expectedLog));
+ }
+
+ jsTestLog("Waiting for initial sync to complete.");
+ replTest.waitForState(secondary, ReplSetTest.State.SECONDARY);
+
+ let res =
+ assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1, initialSync: 1}));
+ assert.eq(0, res.initialSyncStatus.failedInitialSyncAttempts);
+
+ if (createNew) {
+ assert.eq([{_id: "not the same collection"}], secondaryColl.find().toArray());
+ assert(primaryColl.drop());
+ } else {
+ assert.eq(0, secondaryColl.find().itcount());
+ }
+ replTest.checkReplicatedDataHashes();
+ }
+
+ function runDropTest(params) {
+ setupTest(params);
+ finishTest(params);
+ }
+
+ jsTestLog("Testing dropping between listIndexes and find.");
+ runDropTest({failPoint: "initialSyncHangCollectionClonerBeforeEstablishingCursor"});
+
+ jsTestLog(
+ "Testing dropping between listIndexes and find, with new same-name collection created.");
+ runDropTest(
+ {failPoint: "initialSyncHangCollectionClonerBeforeEstablishingCursor", createNew: true});
+
+ jsTestLog("Testing drop-pending between getMore calls.");
+ runDropTest({
+ failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse",
+ secondaryStartupParams: {collectionClonerBatchSize: 1},
+ expectedLog:
+ "`CollectionCloner ns: '${nss}' uuid: ${uuid} stopped because collection was dropped.`"
+ });
+
+ jsTestLog("Testing drop-pending with new same-name collection created, between getMore calls.");
+ runDropTest({
+ failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse",
+ secondaryStartupParams: {collectionClonerBatchSize: 1},
+ expectedLog:
+ "`CollectionCloner ns: '${nss}' uuid: ${uuid} stopped because collection was dropped.`",
+ createNew: true
+ });
+
+ jsTestLog("Testing committed drop between getMore calls.");
+
+ // Add another node to the set, so when we drop the collection it can commit. This other
+ // secondary will be finished with initial sync when the drop happens.
+ var secondary2 = replTest.add({});
+ replTest.reInitiate();
+ replTest.waitForState(secondary2, ReplSetTest.State.SECONDARY);
+
+ runDropTest({
+ failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse",
+ secondaryStartupParams: {collectionClonerBatchSize: 1},
+ waitForDrop: true,
+ expectedLog:
+ "`CollectionCloner ns: '${nss}' uuid: ${uuid} stopped because collection was dropped.`"
+ });
+
+ jsTestLog("Testing rename between getMores.");
+ setupTest({
+ failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse",
+ secondaryStartupParams: {collectionClonerBatchSize: 1},
+ });
+ jsTestLog("Renaming collection on primary");
+ assert.commandWorked(primary.adminCommand({
+ renameCollection: primaryColl.getFullName(),
+ to: pRenameColl.getFullName(),
+ dropTarget: false
+ }));
+
+ jsTestLog("Allowing secondary to continue.");
+ // Make sure we don't reach the fassert() indicating initial sync failure.
+ assert.commandWorked(secondary.adminCommand(
+ {configureFailPoint: "initialSyncHangBeforeFinish", mode: 'alwaysOn'}));
+
+ assert.commandWorked(secondary.adminCommand({
+ configureFailPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse",
+ mode: 'off'
+ }));
+ jsTestLog("Waiting for initial sync to complete.");
+ checkLog.contains(secondary,
+ "The maximum number of retries have been exhausted for initial sync.");
+ replTest.stopSet();
+})();
diff --git a/jstests/replsets/libs/two_phase_drops.js b/jstests/replsets/libs/two_phase_drops.js
index 4ee460cc626..e9ceec14564 100644
--- a/jstests/replsets/libs/two_phase_drops.js
+++ b/jstests/replsets/libs/two_phase_drops.js
@@ -142,6 +142,14 @@ class TwoPhaseDropCollectionTest {
}
/**
+ * Waits until 'collName' in database 'db' is not in drop pending state.
+ */
+ static waitForDropToComplete(db, collName) {
+ assert.soon(
+ () => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(db, collName));
+ }
+
+ /**
* Puts a collection with name 'collName' into the drop pending state. Returns the name of the
* collection after it has been renamed to the 'system.drop' namespace.
*/
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index 289941e94e8..f3458607a02 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -79,9 +79,13 @@ MONGO_FP_DECLARE(initialSyncHangBeforeCollectionClone);
MONGO_FP_DECLARE(initialSyncHangDuringCollectionClone);
// Failpoint which causes initial sync to hang after handling the next batch of results from the
-// 'AsyncResultsMerger' for a specific collection.
+// 'AsyncResultsMerger', optionally limited to a specific collection.
MONGO_FP_DECLARE(initialSyncHangCollectionClonerAfterHandlingBatchResponse);
+// Failpoint which causes initial sync to hang before establishing the cursors (but after
+// listIndexes), optionally limited to a specific collection.
+MONGO_FP_DECLARE(initialSyncHangCollectionClonerBeforeEstablishingCursor);
+
BSONObj makeCommandWithUUIDorCollectionName(StringData command,
OptionalCollectionUUID uuid,
const NamespaceString& nss) {
@@ -250,6 +254,10 @@ void CollectionCloner::_cancelRemainingWork_inlock() {
if (_establishCollectionCursorsScheduler) {
_establishCollectionCursorsScheduler->shutdown();
}
+ if (_verifyCollectionDroppedScheduler) {
+ _verifyCollectionDroppedScheduler->shutdown();
+ _verifyCollectionDroppedScheduler.reset();
+ }
_dbWorkTaskRunner.cancel();
}
@@ -441,6 +449,20 @@ void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::Ca
_finishCallback(cbd.status);
return;
}
+ MONGO_FAIL_POINT_BLOCK(initialSyncHangCollectionClonerBeforeEstablishingCursor, nssData) {
+ const BSONObj& data = nssData.getData();
+ auto nss = data["nss"].str();
+ // Only hang when cloning the specified collection, or if no collection was specified.
+ if (nss.empty() || _destNss.toString() == nss) {
+ while (MONGO_FAIL_POINT(initialSyncHangCollectionClonerBeforeEstablishingCursor) &&
+ !_isShuttingDown()) {
+ log() << "initialSyncHangCollectionClonerBeforeEstablishingCursor fail point "
+ "enabled for "
+ << _destNss.toString() << ". Blocking until fail point is disabled.";
+ mongo::sleepsecs(1);
+ }
+ }
+ }
if (!_idIndexSpec.isEmpty() && _options.autoIndexId == CollectionOptions::NO) {
warning()
<< "Found the _id_ index spec but the collection specified autoIndexId of false on ns:"
@@ -627,6 +649,8 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa
_clusterClientCursorParams =
stdx::make_unique<ClusterClientCursorParams>(_sourceNss, UserNameIterator());
_clusterClientCursorParams->remotes = std::move(remoteCursors);
+ if (_collectionCloningBatchSize > 0)
+ _clusterClientCursorParams->batchSize = _collectionCloningBatchSize;
Client::initThreadIfNotAlready();
_arm = stdx::make_unique<AsyncResultsMerger>(
cc().getOperationContext(), _executor, _clusterClientCursorParams.get());
@@ -734,7 +758,17 @@ void CollectionCloner::_handleARMResultsCallback(
UniqueLock lk(_mutex);
auto nextBatchStatus = _bufferNextBatchFromArm(lk);
if (!nextBatchStatus.isOK()) {
- onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, nextBatchStatus);
+ if (_options.uuid && (nextBatchStatus.code() == ErrorCodes::OperationFailed ||
+ nextBatchStatus.code() == ErrorCodes::CursorNotFound)) {
+ // With these errors, it's possible the collection was dropped while we were
+ // cloning. If so, we'll execute the drop during oplog application, so it's OK to
+ // just stop cloning. This is only safe if cloning by UUID; if we are cloning by
+ // name, we have no way to detect if the collection was dropped and another
+ // collection with the same name created in the interim.
+ _verifyCollectionWasDropped(lk, nextBatchStatus, onCompletionGuard, cbd.opCtx);
+ } else {
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, nextBatchStatus);
+ }
return;
}
@@ -762,13 +796,13 @@ void CollectionCloner::_handleARMResultsCallback(
MONGO_FAIL_POINT_BLOCK(initialSyncHangCollectionClonerAfterHandlingBatchResponse, nssData) {
const BSONObj& data = nssData.getData();
auto nss = data["nss"].str();
- // Only hang when cloning the specified collection.
- if (_destNss.toString() == nss) {
+ // Only hang when cloning the specified collection, or if no collection was specified.
+ if (nss.empty() || _destNss.toString() == nss) {
while (MONGO_FAIL_POINT(initialSyncHangCollectionClonerAfterHandlingBatchResponse) &&
!_isShuttingDown()) {
log() << "initialSyncHangCollectionClonerAfterHandlingBatchResponse fail point "
"enabled for "
- << nss << ". Blocking until fail point is disabled.";
+ << _destNss.toString() << ". Blocking until fail point is disabled.";
mongo::sleepsecs(1);
}
}
@@ -785,6 +819,74 @@ void CollectionCloner::_handleARMResultsCallback(
}
}
+void CollectionCloner::_verifyCollectionWasDropped(
+ const stdx::unique_lock<stdx::mutex>& lk,
+ Status batchStatus,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard,
+ OperationContext* opCtx) {
+ // If we already have a _verifyCollectionDroppedScheduler, just return; the existing
+ // scheduler will take care of cleaning up.
+ if (_verifyCollectionDroppedScheduler) {
+ return;
+ }
+ BSONObjBuilder cmdObj;
+ _options.uuid->appendToBuilder(&cmdObj, "find");
+ cmdObj.append("batchSize", 0);
+ _verifyCollectionDroppedScheduler = stdx::make_unique<RemoteCommandRetryScheduler>(
+ _executor,
+ RemoteCommandRequest(_source,
+ _sourceNss.db().toString(),
+ cmdObj.obj(),
+ ReadPreferenceSetting::secondaryPreferredMetadata(),
+ opCtx,
+ RemoteCommandRequest::kNoTimeout),
+ [this, opCtx, batchStatus, onCompletionGuard](const RemoteCommandCallbackArgs& args) {
+ // If the attempt to determine if the collection was dropped fails for any reason other
+ // than NamespaceNotFound, return the original error code.
+ //
+ // Otherwise, if the collection was dropped, either the error will be NamespaceNotFound,
+ // or it will be a drop-pending collection and the find will succeed and give us a
+ // collection with a drop-pending name.
+ UniqueLock lk(_mutex);
+ Status finalStatus(batchStatus);
+ if (args.response.isOK()) {
+ auto response = CursorResponse::parseFromBSON(args.response.data);
+ if (response.getStatus().code() == ErrorCodes::NamespaceNotFound ||
+ (response.isOK() && response.getValue().getNSS().isDropPendingNamespace())) {
+ log() << "CollectionCloner ns: '" << _sourceNss.ns() << "' uuid: UUID(\""
+ << *_options.uuid << "\") stopped because collection was dropped.";
+ finalStatus = Status::OK();
+ } else if (!response.isOK()) {
+ log() << "CollectionCloner received an unexpected error when verifying drop of "
+ "ns: '"
+ << _sourceNss.ns() << "' uuid: UUID(\"" << *_options.uuid
+ << "\"), status " << response.getStatus();
+ }
+ } else {
+ log() << "CollectionCloner is unable to verify drop of ns: '" << _sourceNss.ns()
+ << "' uuid: UUID(\"" << *_options.uuid << "\"), status "
+ << args.response.status;
+ }
+ // Because setResultAndCancelRemainingWork destroys the RemoteCommandRetryScheduler, it
+ // must be done outside this callback.
+ auto result = _executor->scheduleWork([this, finalStatus, onCompletionGuard](
+ const executor::TaskExecutor::CallbackArgs& args) {
+ UniqueLock lk(_mutex);
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, finalStatus);
+ });
+ invariant(result.isOK() || result.getStatus().code() == ErrorCodes::ShutdownInProgress);
+ },
+ RemoteCommandRetryScheduler::makeNoRetryPolicy());
+
+ auto status = _verifyCollectionDroppedScheduler->startup();
+ if (!status.isOK()) {
+ log() << "CollectionCloner is unable to start verification of ns: '" << _sourceNss.ns()
+ << "' uuid: UUID(\"" << *_options.uuid << "\"), status " << status;
+ // If we can't run the command, assume this wasn't a drop and just use the original error.
+ onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, batchStatus);
+ }
+}
+
void CollectionCloner::_insertDocumentsCallback(
const executor::TaskExecutor::CallbackArgs& cbd,
bool lastBatch,
diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h
index 1f8701c3ca5..46dd6d7903f 100644
--- a/src/mongo/db/repl/collection_cloner.h
+++ b/src/mongo/db/repl/collection_cloner.h
@@ -251,6 +251,15 @@ private:
std::shared_ptr<OnCompletionGuard> onCompletionGuard);
/**
+ * Verifies that an error from the ARM was the result of a collection drop. If
+ * so, cloning is stopped with no error. Otherwise it is stopped with the given error.
+ */
+ void _verifyCollectionWasDropped(const stdx::unique_lock<stdx::mutex>& lk,
+ Status batchStatus,
+ std::shared_ptr<OnCompletionGuard> onCompletionGuard,
+ OperationContext* opCtx);
+
+ /**
* Reports completion status.
* Commits/aborts collection building.
* Sets cloner to inactive.
@@ -304,6 +313,9 @@ private:
// (M) Scheduler used to establish the initial cursor or set of cursors.
std::unique_ptr<RemoteCommandRetryScheduler> _establishCollectionCursorsScheduler;
+ // (M) Scheduler used to determine if a cursor was closed because the collection was dropped.
+ std::unique_ptr<RemoteCommandRetryScheduler> _verifyCollectionDroppedScheduler;
+
// State transitions:
// PreStart --> Running --> ShuttingDown --> Complete
// It is possible to skip intermediate states. For example,
diff --git a/src/mongo/db/repl/database_cloner.cpp b/src/mongo/db/repl/database_cloner.cpp
index 2873a205f21..c8a7465008f 100644
--- a/src/mongo/db/repl/database_cloner.cpp
+++ b/src/mongo/db/repl/database_cloner.cpp
@@ -63,11 +63,10 @@ const char* kNameFieldName = "name";
const char* kOptionsFieldName = "options";
const char* kInfoFieldName = "info";
const char* kUUIDFieldName = "uuid";
-// 16MB max batch size / 12 byte min doc size * 10 (for good measure) = defaultBatchSize to use.
-const auto defaultBatchSize = (16 * 1024 * 1024) / 12 * 10;
// The batchSize to use for the find/getMore queries called by the CollectionCloner
-MONGO_EXPORT_STARTUP_SERVER_PARAMETER(collectionClonerBatchSize, int, defaultBatchSize);
+constexpr int kUseARMDefaultBatchSize = -1;
+MONGO_EXPORT_STARTUP_SERVER_PARAMETER(collectionClonerBatchSize, int, kUseARMDefaultBatchSize);
// The number of attempts for the listCollections commands.
MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncListCollectionsAttempts, int, 3);