diff options
Diffstat (limited to 'src/mongo/db/repl/collection_cloner.cpp')
-rw-r--r-- | src/mongo/db/repl/collection_cloner.cpp | 101 |
1 files changed, 22 insertions, 79 deletions
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index 08701e697f6..f38cb4e64c6 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -103,9 +103,6 @@ CollectionCloner::CollectionCloner(const NamespaceString& sourceNss, invariant(collectionOptions.uuid); _sourceDbAndUuid = NamespaceStringOrUUID(sourceNss.db().toString(), *collectionOptions.uuid); _stats.ns = _sourceNss.ns(); - - // Find out whether the sync source supports resumable queries. - _resumeSupported = (getClient()->getMaxWireVersion() >= WireVersion::RESUMABLE_INITIAL_SYNC); } BaseCloner::ClonerStages CollectionCloner::getStages() { @@ -320,72 +317,32 @@ void CollectionCloner::runQuery() { // Non-resumable query. Query query; - if (_resumeSupported) { - if (_resumeToken) { - // Resume the query from where we left off. - LOGV2_DEBUG(21133, 1, "Collection cloner will resume the last successful query"); - query.requestResumeToken(true).resumeAfter(_resumeToken.get()); - } else { - // New attempt at a resumable query. - LOGV2_DEBUG(21134, 1, "Collection cloner will run a new query"); - query.requestResumeToken(true); - } - query.hint(BSON("$natural" << 1)); + if (_resumeToken) { + // Resume the query from where we left off. + LOGV2_DEBUG(21133, 1, "Collection cloner will resume the last successful query"); + query.requestResumeToken(true).resumeAfter(_resumeToken.get()); + } else { + // New attempt at a resumable query. + LOGV2_DEBUG(21134, 1, "Collection cloner will run a new query"); + query.requestResumeToken(true); } + query.hint(BSON("$natural" << 1)); // We reset this every time we retry or resume a query. // We distinguish the first batch from the rest so that we only store the remote cursor id // the first time we get it. _firstBatchOfQueryRound = true; - try { - getClient()->query_DEPRECATED( - [this](DBClientCursorBatchIterator& iter) { handleNextBatch(iter); }, - _sourceDbAndUuid, - BSONObj{}, - query, - nullptr /* fieldsToReturn */, - QueryOption_NoCursorTimeout | QueryOption_SecondaryOk | - (collectionClonerUsesExhaust ? QueryOption_Exhaust : 0), - _collectionClonerBatchSize, - ReadConcernArgs::kImplicitDefault); - } catch (...) { - auto status = exceptionToStatus(); - - // If the collection was dropped at any point, we can just move on to the next cloner. - // This applies to both resumable (4.4) and non-resumable (4.2) queries. - if (status == ErrorCodes::NamespaceNotFound) { - throw; // This will re-throw the NamespaceNotFound, resulting in a clean exit. - } - - // Wire version 4.2 only. - if (!_resumeSupported) { - // If we lost our cursor last round, the only time we can can continue is if we find out - // this round that the collection was dropped on the source (that scenario is covered - // right above). If that is not the case, then the cloner would have more work to do, - // but since we cannot resume the query, we must abort initial sync. - if (_lostNonResumableCursor) { - abortNonResumableClone(status); - } - - // Collection has changed upstream. This will trigger the code block above next round, - // (unless we find out the collection was dropped via getting a NamespaceNotFound). - if (_queryStage.isCursorError(status)) { - LOGV2(21135, - "Lost cursor during non-resumable query: {error}", - "Lost cursor during non-resumable query", - "error"_attr = status); - _lostNonResumableCursor = true; - throw; - } - // Any other errors (including network errors, but excluding NamespaceNotFound) result - // in immediate failure. - abortNonResumableClone(status); - } - - // Re-throw all query errors for resumable (4.4) queries. - throw; - } + getClient()->query_DEPRECATED( + [this](DBClientCursorBatchIterator& iter) { handleNextBatch(iter); }, + _sourceDbAndUuid, + BSONObj{}, + query, + nullptr /* fieldsToReturn */, + QueryOption_NoCursorTimeout | QueryOption_SecondaryOk | + (collectionClonerUsesExhaust ? QueryOption_Exhaust : 0), + _collectionClonerBatchSize, + ReadConcernArgs::kImplicitDefault); } void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) { @@ -408,7 +365,7 @@ void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) { uasserted(ErrorCodes::InitialSyncFailure, "Lost remote cursor"); } - if (_firstBatchOfQueryRound && _resumeSupported) { + if (_firstBatchOfQueryRound) { // Store the cursorId of the remote cursor. _remoteCursorId = iter.getCursorId(); } @@ -433,10 +390,8 @@ void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) { uassertStatusOK(newStatus); } - if (_resumeSupported) { - // Store the resume token for this batch. - _resumeToken = iter.getPostBatchResumeToken(); - } + // Store the resume token for this batch. + _resumeToken = iter.getPostBatchResumeToken(); initialSyncHangCollectionClonerAfterHandlingBatchResponse.executeIf( [&](const BSONObj&) { @@ -511,18 +466,6 @@ void CollectionCloner::waitForDatabaseWorkToComplete() { _dbWorkTaskRunner.join(); } -// Throws. -void CollectionCloner::abortNonResumableClone(const Status& status) { - invariant(!_resumeSupported); - LOGV2(21141, - "Error during non-resumable clone: {error}", - "Error during non-resumable clone", - "error"_attr = status); - std::string message = str::stream() - << "Collection clone failed and is not resumable. nss: " << _sourceNss; - uasserted(ErrorCodes::InitialSyncFailure, message); -} - CollectionCloner::Stats CollectionCloner::getStats() const { stdx::lock_guard<Latch> lk(_mutex); return _stats; |