diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2017-11-28 11:26:14 -0500 |
---|---|---|
committer | Matthew Russotto <matthew.russotto@10gen.com> | 2018-01-03 12:03:10 -0500 |
commit | be7e525b14dddf56fbec90190da60dc020abb0f9 (patch) | |
tree | a4265016dd54a18217d8acc1c2435e23562a8edf | |
parent | d6b8d447e900164341611cd34ac2fb69a1e87634 (diff) | |
download | mongo-be7e525b14dddf56fbec90190da60dc020abb0f9.tar.gz |
SERVER-31267 CollectionCloner fails if collection is dropped between getMore calls
(cherry picked from commit 43ab81ebc79de03844e55ff92224bdfb69e050f1)
-rw-r--r-- | jstests/replsets/initial_sync_drop_collection.js | 158 | ||||
-rw-r--r-- | jstests/replsets/libs/two_phase_drops.js | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.cpp | 112 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner.h | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/database_cloner.cpp | 5 |
5 files changed, 287 insertions, 8 deletions
diff --git a/jstests/replsets/initial_sync_drop_collection.js b/jstests/replsets/initial_sync_drop_collection.js new file mode 100644 index 00000000000..b4954817afe --- /dev/null +++ b/jstests/replsets/initial_sync_drop_collection.js @@ -0,0 +1,158 @@ +// Test that CollectionCloner completes without error when a collection is dropped during cloning. + +(function() { + "use strict"; + + load("jstests/libs/check_log.js"); + load('jstests/replsets/libs/two_phase_drops.js'); + load("jstests/libs/uuid_util.js"); + + // Set up replica set. + const testName = "initial_sync_drop_collection"; + const dbName = testName; + var replTest = new ReplSetTest({name: testName, nodes: 2}); + replTest.startSet(); + replTest.initiate(); + + var primary = replTest.getPrimary(); + var primaryDB = primary.getDB(dbName); + var secondary = replTest.getSecondary(); + var secondaryDB = secondary.getDB(dbName); + const collName = "testcoll"; + var primaryColl = primaryDB[collName]; + var secondaryColl = secondaryDB[collName]; + var pRenameColl = primaryDB["r_" + collName]; + var nss = primaryColl.getFullName(); + + // This function adds data to the collection, restarts the secondary node with the given + // parameters and setting the given failpoint, waits for the failpoint to be hit, + // drops the collection, then disables the failpoint. It then optionally waits for the + // expectedLog message and waits for the secondary to complete initial sync, then ensures + // the collection on the secondary is empty. + function setupTest({failPoint, secondaryStartupParams}) { + jsTestLog("Writing data to collection."); + assert.writeOK(primaryColl.insert([{_id: 1}, {_id: 2}])); + + jsTestLog("Restarting secondary with failPoint " + failPoint + " set for " + nss); + secondaryStartupParams = secondaryStartupParams || {}; + secondaryStartupParams['failpoint.' + failPoint] = + tojson({mode: 'alwaysOn', data: {nss: nss}}); + replTest.restart(secondary, {startClean: true, setParameter: secondaryStartupParams}); + + jsTestLog("Waiting for secondary to reach failPoint " + failPoint); + checkLog.contains(secondary, failPoint + " fail point enabled for " + nss); + } + + function finishTest({failPoint, secondaryStartupParams, expectedLog, waitForDrop, createNew}) { + // Get the uuid for use in checking the log line. + let uuid = getUUIDFromListCollections(primaryDB, collName); + + jsTestLog("Dropping collection on primary: " + primaryColl.getFullName()); + assert(primaryColl.drop()); + + if (waitForDrop) { + jsTestLog("Waiting for drop to commit on primary"); + TwoPhaseDropCollectionTest.waitForDropToComplete(primaryDB, collName); + } + + if (createNew) { + jsTestLog("Creating a new collection with the same name: " + primaryColl.getFullName()); + assert.writeOK(primaryColl.insert({_id: "not the same collection"})); + } + + jsTestLog("Allowing secondary to continue."); + assert.commandWorked(secondary.adminCommand({configureFailPoint: failPoint, mode: 'off'})); + + if (expectedLog) { + jsTestLog(eval(expectedLog)); + checkLog.contains(secondary, eval(expectedLog)); + } + + jsTestLog("Waiting for initial sync to complete."); + replTest.waitForState(secondary, ReplSetTest.State.SECONDARY); + + let res = + assert.commandWorked(secondary.adminCommand({replSetGetStatus: 1, initialSync: 1})); + assert.eq(0, res.initialSyncStatus.failedInitialSyncAttempts); + + if (createNew) { + assert.eq([{_id: "not the same collection"}], secondaryColl.find().toArray()); + assert(primaryColl.drop()); + } else { + assert.eq(0, secondaryColl.find().itcount()); + } + replTest.checkReplicatedDataHashes(); + } + + function runDropTest(params) { + setupTest(params); + finishTest(params); + } + + jsTestLog("Testing dropping between listIndexes and find."); + runDropTest({failPoint: "initialSyncHangCollectionClonerBeforeEstablishingCursor"}); + + jsTestLog( + "Testing dropping between listIndexes and find, with new same-name collection created."); + runDropTest( + {failPoint: "initialSyncHangCollectionClonerBeforeEstablishingCursor", createNew: true}); + + jsTestLog("Testing drop-pending between getMore calls."); + runDropTest({ + failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse", + secondaryStartupParams: {collectionClonerBatchSize: 1}, + expectedLog: + "`CollectionCloner ns: '${nss}' uuid: ${uuid} stopped because collection was dropped.`" + }); + + jsTestLog("Testing drop-pending with new same-name collection created, between getMore calls."); + runDropTest({ + failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse", + secondaryStartupParams: {collectionClonerBatchSize: 1}, + expectedLog: + "`CollectionCloner ns: '${nss}' uuid: ${uuid} stopped because collection was dropped.`", + createNew: true + }); + + jsTestLog("Testing committed drop between getMore calls."); + + // Add another node to the set, so when we drop the collection it can commit. This other + // secondary will be finished with initial sync when the drop happens. + var secondary2 = replTest.add({}); + replTest.reInitiate(); + replTest.waitForState(secondary2, ReplSetTest.State.SECONDARY); + + runDropTest({ + failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse", + secondaryStartupParams: {collectionClonerBatchSize: 1}, + waitForDrop: true, + expectedLog: + "`CollectionCloner ns: '${nss}' uuid: ${uuid} stopped because collection was dropped.`" + }); + + jsTestLog("Testing rename between getMores."); + setupTest({ + failPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse", + secondaryStartupParams: {collectionClonerBatchSize: 1}, + }); + jsTestLog("Renaming collection on primary"); + assert.commandWorked(primary.adminCommand({ + renameCollection: primaryColl.getFullName(), + to: pRenameColl.getFullName(), + dropTarget: false + })); + + jsTestLog("Allowing secondary to continue."); + // Make sure we don't reach the fassert() indicating initial sync failure. + assert.commandWorked(secondary.adminCommand( + {configureFailPoint: "initialSyncHangBeforeFinish", mode: 'alwaysOn'})); + + assert.commandWorked(secondary.adminCommand({ + configureFailPoint: "initialSyncHangCollectionClonerAfterHandlingBatchResponse", + mode: 'off' + })); + jsTestLog("Waiting for initial sync to complete."); + checkLog.contains(secondary, + "The maximum number of retries have been exhausted for initial sync."); + replTest.stopSet(); +})(); diff --git a/jstests/replsets/libs/two_phase_drops.js b/jstests/replsets/libs/two_phase_drops.js index 4ee460cc626..e9ceec14564 100644 --- a/jstests/replsets/libs/two_phase_drops.js +++ b/jstests/replsets/libs/two_phase_drops.js @@ -142,6 +142,14 @@ class TwoPhaseDropCollectionTest { } /** + * Waits until 'collName' in database 'db' is not in drop pending state. + */ + static waitForDropToComplete(db, collName) { + assert.soon( + () => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(db, collName)); + } + + /** * Puts a collection with name 'collName' into the drop pending state. Returns the name of the * collection after it has been renamed to the 'system.drop' namespace. */ diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 289941e94e8..f3458607a02 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -79,9 +79,13 @@ MONGO_FP_DECLARE(initialSyncHangBeforeCollectionClone); MONGO_FP_DECLARE(initialSyncHangDuringCollectionClone); // Failpoint which causes initial sync to hang after handling the next batch of results from the -// 'AsyncResultsMerger' for a specific collection. +// 'AsyncResultsMerger', optionally limited to a specific collection. MONGO_FP_DECLARE(initialSyncHangCollectionClonerAfterHandlingBatchResponse); +// Failpoint which causes initial sync to hang before establishing the cursors (but after +// listIndexes), optionally limited to a specific collection. +MONGO_FP_DECLARE(initialSyncHangCollectionClonerBeforeEstablishingCursor); + BSONObj makeCommandWithUUIDorCollectionName(StringData command, OptionalCollectionUUID uuid, const NamespaceString& nss) { @@ -250,6 +254,10 @@ void CollectionCloner::_cancelRemainingWork_inlock() { if (_establishCollectionCursorsScheduler) { _establishCollectionCursorsScheduler->shutdown(); } + if (_verifyCollectionDroppedScheduler) { + _verifyCollectionDroppedScheduler->shutdown(); + _verifyCollectionDroppedScheduler.reset(); + } _dbWorkTaskRunner.cancel(); } @@ -441,6 +449,20 @@ void CollectionCloner::_beginCollectionCallback(const executor::TaskExecutor::Ca _finishCallback(cbd.status); return; } + MONGO_FAIL_POINT_BLOCK(initialSyncHangCollectionClonerBeforeEstablishingCursor, nssData) { + const BSONObj& data = nssData.getData(); + auto nss = data["nss"].str(); + // Only hang when cloning the specified collection, or if no collection was specified. + if (nss.empty() || _destNss.toString() == nss) { + while (MONGO_FAIL_POINT(initialSyncHangCollectionClonerBeforeEstablishingCursor) && + !_isShuttingDown()) { + log() << "initialSyncHangCollectionClonerBeforeEstablishingCursor fail point " + "enabled for " + << _destNss.toString() << ". Blocking until fail point is disabled."; + mongo::sleepsecs(1); + } + } + } if (!_idIndexSpec.isEmpty() && _options.autoIndexId == CollectionOptions::NO) { warning() << "Found the _id_ index spec but the collection specified autoIndexId of false on ns:" @@ -627,6 +649,8 @@ void CollectionCloner::_establishCollectionCursorsCallback(const RemoteCommandCa _clusterClientCursorParams = stdx::make_unique<ClusterClientCursorParams>(_sourceNss, UserNameIterator()); _clusterClientCursorParams->remotes = std::move(remoteCursors); + if (_collectionCloningBatchSize > 0) + _clusterClientCursorParams->batchSize = _collectionCloningBatchSize; Client::initThreadIfNotAlready(); _arm = stdx::make_unique<AsyncResultsMerger>( cc().getOperationContext(), _executor, _clusterClientCursorParams.get()); @@ -734,7 +758,17 @@ void CollectionCloner::_handleARMResultsCallback( UniqueLock lk(_mutex); auto nextBatchStatus = _bufferNextBatchFromArm(lk); if (!nextBatchStatus.isOK()) { - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, nextBatchStatus); + if (_options.uuid && (nextBatchStatus.code() == ErrorCodes::OperationFailed || + nextBatchStatus.code() == ErrorCodes::CursorNotFound)) { + // With these errors, it's possible the collection was dropped while we were + // cloning. If so, we'll execute the drop during oplog application, so it's OK to + // just stop cloning. This is only safe if cloning by UUID; if we are cloning by + // name, we have no way to detect if the collection was dropped and another + // collection with the same name created in the interim. + _verifyCollectionWasDropped(lk, nextBatchStatus, onCompletionGuard, cbd.opCtx); + } else { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, nextBatchStatus); + } return; } @@ -762,13 +796,13 @@ void CollectionCloner::_handleARMResultsCallback( MONGO_FAIL_POINT_BLOCK(initialSyncHangCollectionClonerAfterHandlingBatchResponse, nssData) { const BSONObj& data = nssData.getData(); auto nss = data["nss"].str(); - // Only hang when cloning the specified collection. - if (_destNss.toString() == nss) { + // Only hang when cloning the specified collection, or if no collection was specified. + if (nss.empty() || _destNss.toString() == nss) { while (MONGO_FAIL_POINT(initialSyncHangCollectionClonerAfterHandlingBatchResponse) && !_isShuttingDown()) { log() << "initialSyncHangCollectionClonerAfterHandlingBatchResponse fail point " "enabled for " - << nss << ". Blocking until fail point is disabled."; + << _destNss.toString() << ". Blocking until fail point is disabled."; mongo::sleepsecs(1); } } @@ -785,6 +819,74 @@ void CollectionCloner::_handleARMResultsCallback( } } +void CollectionCloner::_verifyCollectionWasDropped( + const stdx::unique_lock<stdx::mutex>& lk, + Status batchStatus, + std::shared_ptr<OnCompletionGuard> onCompletionGuard, + OperationContext* opCtx) { + // If we already have a _verifyCollectionDroppedScheduler, just return; the existing + // scheduler will take care of cleaning up. + if (_verifyCollectionDroppedScheduler) { + return; + } + BSONObjBuilder cmdObj; + _options.uuid->appendToBuilder(&cmdObj, "find"); + cmdObj.append("batchSize", 0); + _verifyCollectionDroppedScheduler = stdx::make_unique<RemoteCommandRetryScheduler>( + _executor, + RemoteCommandRequest(_source, + _sourceNss.db().toString(), + cmdObj.obj(), + ReadPreferenceSetting::secondaryPreferredMetadata(), + opCtx, + RemoteCommandRequest::kNoTimeout), + [this, opCtx, batchStatus, onCompletionGuard](const RemoteCommandCallbackArgs& args) { + // If the attempt to determine if the collection was dropped fails for any reason other + // than NamespaceNotFound, return the original error code. + // + // Otherwise, if the collection was dropped, either the error will be NamespaceNotFound, + // or it will be a drop-pending collection and the find will succeed and give us a + // collection with a drop-pending name. + UniqueLock lk(_mutex); + Status finalStatus(batchStatus); + if (args.response.isOK()) { + auto response = CursorResponse::parseFromBSON(args.response.data); + if (response.getStatus().code() == ErrorCodes::NamespaceNotFound || + (response.isOK() && response.getValue().getNSS().isDropPendingNamespace())) { + log() << "CollectionCloner ns: '" << _sourceNss.ns() << "' uuid: UUID(\"" + << *_options.uuid << "\") stopped because collection was dropped."; + finalStatus = Status::OK(); + } else if (!response.isOK()) { + log() << "CollectionCloner received an unexpected error when verifying drop of " + "ns: '" + << _sourceNss.ns() << "' uuid: UUID(\"" << *_options.uuid + << "\"), status " << response.getStatus(); + } + } else { + log() << "CollectionCloner is unable to verify drop of ns: '" << _sourceNss.ns() + << "' uuid: UUID(\"" << *_options.uuid << "\"), status " + << args.response.status; + } + // Because setResultAndCancelRemainingWork destroys the RemoteCommandRetryScheduler, it + // must be done outside this callback. + auto result = _executor->scheduleWork([this, finalStatus, onCompletionGuard]( + const executor::TaskExecutor::CallbackArgs& args) { + UniqueLock lk(_mutex); + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, finalStatus); + }); + invariant(result.isOK() || result.getStatus().code() == ErrorCodes::ShutdownInProgress); + }, + RemoteCommandRetryScheduler::makeNoRetryPolicy()); + + auto status = _verifyCollectionDroppedScheduler->startup(); + if (!status.isOK()) { + log() << "CollectionCloner is unable to start verification of ns: '" << _sourceNss.ns() + << "' uuid: UUID(\"" << *_options.uuid << "\"), status " << status; + // If we can't run the command, assume this wasn't a drop and just use the original error. + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lk, batchStatus); + } +} + void CollectionCloner::_insertDocumentsCallback( const executor::TaskExecutor::CallbackArgs& cbd, bool lastBatch, diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index 1f8701c3ca5..46dd6d7903f 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -251,6 +251,15 @@ private: std::shared_ptr<OnCompletionGuard> onCompletionGuard); /** + * Verifies that an error from the ARM was the result of a collection drop. If + * so, cloning is stopped with no error. Otherwise it is stopped with the given error. + */ + void _verifyCollectionWasDropped(const stdx::unique_lock<stdx::mutex>& lk, + Status batchStatus, + std::shared_ptr<OnCompletionGuard> onCompletionGuard, + OperationContext* opCtx); + + /** * Reports completion status. * Commits/aborts collection building. * Sets cloner to inactive. @@ -304,6 +313,9 @@ private: // (M) Scheduler used to establish the initial cursor or set of cursors. std::unique_ptr<RemoteCommandRetryScheduler> _establishCollectionCursorsScheduler; + // (M) Scheduler used to determine if a cursor was closed because the collection was dropped. + std::unique_ptr<RemoteCommandRetryScheduler> _verifyCollectionDroppedScheduler; + // State transitions: // PreStart --> Running --> ShuttingDown --> Complete // It is possible to skip intermediate states. For example, diff --git a/src/mongo/db/repl/database_cloner.cpp b/src/mongo/db/repl/database_cloner.cpp index 2873a205f21..c8a7465008f 100644 --- a/src/mongo/db/repl/database_cloner.cpp +++ b/src/mongo/db/repl/database_cloner.cpp @@ -63,11 +63,10 @@ const char* kNameFieldName = "name"; const char* kOptionsFieldName = "options"; const char* kInfoFieldName = "info"; const char* kUUIDFieldName = "uuid"; -// 16MB max batch size / 12 byte min doc size * 10 (for good measure) = defaultBatchSize to use. -const auto defaultBatchSize = (16 * 1024 * 1024) / 12 * 10; // The batchSize to use for the find/getMore queries called by the CollectionCloner -MONGO_EXPORT_STARTUP_SERVER_PARAMETER(collectionClonerBatchSize, int, defaultBatchSize); +constexpr int kUseARMDefaultBatchSize = -1; +MONGO_EXPORT_STARTUP_SERVER_PARAMETER(collectionClonerBatchSize, int, kUseARMDefaultBatchSize); // The number of attempts for the listCollections commands. MONGO_EXPORT_SERVER_PARAMETER(numInitialSyncListCollectionsAttempts, int, 3); |