summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/collection_cloner.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/collection_cloner.cpp')
-rw-r--r--src/mongo/db/repl/collection_cloner.cpp101
1 files changed, 22 insertions, 79 deletions
diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp
index 08701e697f6..f38cb4e64c6 100644
--- a/src/mongo/db/repl/collection_cloner.cpp
+++ b/src/mongo/db/repl/collection_cloner.cpp
@@ -103,9 +103,6 @@ CollectionCloner::CollectionCloner(const NamespaceString& sourceNss,
invariant(collectionOptions.uuid);
_sourceDbAndUuid = NamespaceStringOrUUID(sourceNss.db().toString(), *collectionOptions.uuid);
_stats.ns = _sourceNss.ns();
-
- // Find out whether the sync source supports resumable queries.
- _resumeSupported = (getClient()->getMaxWireVersion() >= WireVersion::RESUMABLE_INITIAL_SYNC);
}
BaseCloner::ClonerStages CollectionCloner::getStages() {
@@ -320,72 +317,32 @@ void CollectionCloner::runQuery() {
// Non-resumable query.
Query query;
- if (_resumeSupported) {
- if (_resumeToken) {
- // Resume the query from where we left off.
- LOGV2_DEBUG(21133, 1, "Collection cloner will resume the last successful query");
- query.requestResumeToken(true).resumeAfter(_resumeToken.get());
- } else {
- // New attempt at a resumable query.
- LOGV2_DEBUG(21134, 1, "Collection cloner will run a new query");
- query.requestResumeToken(true);
- }
- query.hint(BSON("$natural" << 1));
+ if (_resumeToken) {
+ // Resume the query from where we left off.
+ LOGV2_DEBUG(21133, 1, "Collection cloner will resume the last successful query");
+ query.requestResumeToken(true).resumeAfter(_resumeToken.get());
+ } else {
+ // New attempt at a resumable query.
+ LOGV2_DEBUG(21134, 1, "Collection cloner will run a new query");
+ query.requestResumeToken(true);
}
+ query.hint(BSON("$natural" << 1));
// We reset this every time we retry or resume a query.
// We distinguish the first batch from the rest so that we only store the remote cursor id
// the first time we get it.
_firstBatchOfQueryRound = true;
- try {
- getClient()->query_DEPRECATED(
- [this](DBClientCursorBatchIterator& iter) { handleNextBatch(iter); },
- _sourceDbAndUuid,
- BSONObj{},
- query,
- nullptr /* fieldsToReturn */,
- QueryOption_NoCursorTimeout | QueryOption_SecondaryOk |
- (collectionClonerUsesExhaust ? QueryOption_Exhaust : 0),
- _collectionClonerBatchSize,
- ReadConcernArgs::kImplicitDefault);
- } catch (...) {
- auto status = exceptionToStatus();
-
- // If the collection was dropped at any point, we can just move on to the next cloner.
- // This applies to both resumable (4.4) and non-resumable (4.2) queries.
- if (status == ErrorCodes::NamespaceNotFound) {
- throw; // This will re-throw the NamespaceNotFound, resulting in a clean exit.
- }
-
- // Wire version 4.2 only.
- if (!_resumeSupported) {
- // If we lost our cursor last round, the only time we can can continue is if we find out
- // this round that the collection was dropped on the source (that scenario is covered
- // right above). If that is not the case, then the cloner would have more work to do,
- // but since we cannot resume the query, we must abort initial sync.
- if (_lostNonResumableCursor) {
- abortNonResumableClone(status);
- }
-
- // Collection has changed upstream. This will trigger the code block above next round,
- // (unless we find out the collection was dropped via getting a NamespaceNotFound).
- if (_queryStage.isCursorError(status)) {
- LOGV2(21135,
- "Lost cursor during non-resumable query: {error}",
- "Lost cursor during non-resumable query",
- "error"_attr = status);
- _lostNonResumableCursor = true;
- throw;
- }
- // Any other errors (including network errors, but excluding NamespaceNotFound) result
- // in immediate failure.
- abortNonResumableClone(status);
- }
-
- // Re-throw all query errors for resumable (4.4) queries.
- throw;
- }
+ getClient()->query_DEPRECATED(
+ [this](DBClientCursorBatchIterator& iter) { handleNextBatch(iter); },
+ _sourceDbAndUuid,
+ BSONObj{},
+ query,
+ nullptr /* fieldsToReturn */,
+ QueryOption_NoCursorTimeout | QueryOption_SecondaryOk |
+ (collectionClonerUsesExhaust ? QueryOption_Exhaust : 0),
+ _collectionClonerBatchSize,
+ ReadConcernArgs::kImplicitDefault);
}
void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) {
@@ -408,7 +365,7 @@ void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) {
uasserted(ErrorCodes::InitialSyncFailure, "Lost remote cursor");
}
- if (_firstBatchOfQueryRound && _resumeSupported) {
+ if (_firstBatchOfQueryRound) {
// Store the cursorId of the remote cursor.
_remoteCursorId = iter.getCursorId();
}
@@ -433,10 +390,8 @@ void CollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) {
uassertStatusOK(newStatus);
}
- if (_resumeSupported) {
- // Store the resume token for this batch.
- _resumeToken = iter.getPostBatchResumeToken();
- }
+ // Store the resume token for this batch.
+ _resumeToken = iter.getPostBatchResumeToken();
initialSyncHangCollectionClonerAfterHandlingBatchResponse.executeIf(
[&](const BSONObj&) {
@@ -511,18 +466,6 @@ void CollectionCloner::waitForDatabaseWorkToComplete() {
_dbWorkTaskRunner.join();
}
-// Throws.
-void CollectionCloner::abortNonResumableClone(const Status& status) {
- invariant(!_resumeSupported);
- LOGV2(21141,
- "Error during non-resumable clone: {error}",
- "Error during non-resumable clone",
- "error"_attr = status);
- std::string message = str::stream()
- << "Collection clone failed and is not resumable. nss: " << _sourceNss;
- uasserted(ErrorCodes::InitialSyncFailure, message);
-}
-
CollectionCloner::Stats CollectionCloner::getStats() const {
stdx::lock_guard<Latch> lk(_mutex);
return _stats;