diff options
author | Vesselina Ratcheva <vesselina.ratcheva@10gen.com> | 2020-08-27 01:17:01 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-09-02 07:06:59 +0000 |
commit | 424f5664acf946638f1b66f007d96bc20a559d4c (patch) | |
tree | 10344fe44cba4ef505ca18a4790cfeda53063102 | |
parent | b6cfd0bfc1670da1c570fad4da9c28e99e69857e (diff) | |
download | mongo-424f5664acf946638f1b66f007d96bc20a559d4c.tar.gz |
SERVER-50492 Split BaseCloner into InitialSyncBaseCloner and TenantMigrationBaseCloner
20 files changed, 476 insertions, 248 deletions
diff --git a/jstests/replsets/initial_sync_rename_collection.js b/jstests/replsets/initial_sync_rename_collection.js index 82576400c60..d571e83c869 100644 --- a/jstests/replsets/initial_sync_rename_collection.js +++ b/jstests/replsets/initial_sync_rename_collection.js @@ -161,8 +161,8 @@ runRenameTest({ }); const expectedLogFor5and7 = isJsonLogNoConn() - ? '`Initial Sync retrying cloner stage due to error","attr":{"cloner":"CollectionCloner","stage":"query","error":{"code":175,"codeName":"QueryPlanKilled","errmsg":"collection renamed from \'${nss}\' to \'${rnss}\'. UUID ${uuid}"}}}`' - : "`Initial Sync retrying CollectionCloner stage query due to QueryPlanKilled: collection renamed from '${nss}' to '${rnss}'. UUID ${uuid}`"; + ? '`Sync process retrying cloner stage due to error","attr":{"cloner":"CollectionCloner","stage":"query","error":{"code":175,"codeName":"QueryPlanKilled","errmsg":"collection renamed from \'${nss}\' to \'${rnss}\'. UUID ${uuid}"}}}`' + : "`Sync process retrying CollectionCloner stage query due to QueryPlanKilled: collection renamed from '${nss}' to '${rnss}'. UUID ${uuid}`"; jsTestLog("[5] Testing rename between getMores."); runRenameTest({ @@ -188,10 +188,10 @@ if (isJsonLogNoConn()) { if (TwoPhaseDropCollectionTest.supportsDropPendingNamespaces(replTest)) { if (isJsonLogNoConn()) { expectedLogFor6and8 = - '`Initial Sync retrying cloner stage due to error","attr":{"cloner":"CollectionCloner","stage":"query","error":{"code":175,"codeName":"QueryPlanKilled","errmsg":"collection renamed from \'${nss}\' to \'${dropPendingNss}\'. UUID ${uuid}`'; + '`Sync process retrying cloner stage due to error","attr":{"cloner":"CollectionCloner","stage":"query","error":{"code":175,"codeName":"QueryPlanKilled","errmsg":"collection renamed from \'${nss}\' to \'${dropPendingNss}\'. UUID ${uuid}`'; } else { expectedLogFor6and8 = - "`Initial Sync retrying CollectionCloner stage query due to QueryPlanKilled: collection renamed from '${nss}' to '${dropPendingNss}'. UUID ${uuid}`"; + "`Sync process retrying CollectionCloner stage query due to QueryPlanKilled: collection renamed from '${nss}' to '${dropPendingNss}'. UUID ${uuid}`"; } } diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 479434c95a5..6bb2fc91044 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -982,6 +982,7 @@ env.Library( 'all_database_cloner.cpp', 'collection_cloner.cpp', 'database_cloner.cpp', + 'initial_sync_base_cloner.cpp', ], LIBDEPS = [ '$BUILD_DIR/mongo/base', @@ -1011,6 +1012,7 @@ env.Library( 'tenant_all_database_cloner.cpp', 'tenant_collection_cloner.cpp', 'tenant_database_cloner.cpp', + 'tenant_migration_base_cloner.cpp', ], LIBDEPS = [ '$BUILD_DIR/mongo/base', diff --git a/src/mongo/db/repl/all_database_cloner.cpp b/src/mongo/db/repl/all_database_cloner.cpp index 527f7a64837..1e961f306d1 100644 --- a/src/mongo/db/repl/all_database_cloner.cpp +++ b/src/mongo/db/repl/all_database_cloner.cpp @@ -47,7 +47,8 @@ AllDatabaseCloner::AllDatabaseCloner(InitialSyncSharedData* sharedData, DBClientConnection* client, StorageInterface* storageInterface, ThreadPool* dbPool) - : BaseCloner("AllDatabaseCloner"_sd, sharedData, source, client, storageInterface, dbPool), + : InitialSyncBaseCloner( + "AllDatabaseCloner"_sd, sharedData, source, client, storageInterface, dbPool), _connectStage("connect", this, &AllDatabaseCloner::connectStage), _getInitialSyncIdStage("getInitialSyncId", this, &AllDatabaseCloner::getInitialSyncIdStage), _listDatabasesStage("listDatabases", this, &AllDatabaseCloner::listDatabasesStage) {} diff --git a/src/mongo/db/repl/all_database_cloner.h b/src/mongo/db/repl/all_database_cloner.h index 434a5e189b0..c6a9f9f126c 100644 --- a/src/mongo/db/repl/all_database_cloner.h +++ b/src/mongo/db/repl/all_database_cloner.h @@ -31,15 +31,15 @@ #include <vector> -#include "mongo/base/checked_cast.h" #include "mongo/db/repl/base_cloner.h" #include "mongo/db/repl/database_cloner.h" +#include "mongo/db/repl/initial_sync_base_cloner.h" #include "mongo/db/repl/initial_sync_shared_data.h" namespace mongo { namespace repl { -class AllDatabaseCloner final : public BaseCloner { +class AllDatabaseCloner final : public InitialSyncBaseCloner { public: struct Stats { size_t databasesCloned{0}; @@ -65,10 +65,6 @@ public: protected: ClonerStages getStages() final; - InitialSyncSharedData* getSharedData() const final { - return checked_cast<InitialSyncSharedData*>(BaseCloner::getSharedData()); - } - private: friend class AllDatabaseClonerTest; class ConnectStage : public ClonerStage<AllDatabaseCloner> { diff --git a/src/mongo/db/repl/base_cloner.cpp b/src/mongo/db/repl/base_cloner.cpp index 521fcf05e3c..271b950c865 100644 --- a/src/mongo/db/repl/base_cloner.cpp +++ b/src/mongo/db/repl/base_cloner.cpp @@ -32,8 +32,6 @@ #include "mongo/platform/basic.h" #include "mongo/db/repl/base_cloner.h" -#include "mongo/db/repl/replication_consistency_markers_gen.h" -#include "mongo/db/repl/replication_consistency_markers_impl.h" #include "mongo/logv2/log.h" #include "mongo/util/scopeguard.h" @@ -41,15 +39,11 @@ namespace mongo { namespace { MONGO_FAIL_POINT_DEFINE(hangBeforeClonerStage); MONGO_FAIL_POINT_DEFINE(hangBeforeRetryingClonerStage); -MONGO_FAIL_POINT_DEFINE(hangBeforeCheckingRollBackIdClonerStage); MONGO_FAIL_POINT_DEFINE(hangAfterClonerStage); } // namespace using executor::TaskExecutor; namespace repl { -// These failpoints are shared with initial_syncer and so must not be in the unnamed namespace. -MONGO_FAIL_POINT_DEFINE(initialSyncFuzzerSynchronizationPoint1); -MONGO_FAIL_POINT_DEFINE(initialSyncFuzzerSynchronizationPoint2); BaseCloner::BaseCloner(StringData clonerName, ReplSyncSharedData* sharedData, @@ -106,36 +100,6 @@ bool BaseCloner::isMyFailPoint(const BSONObj& data) const { return data["cloner"].str() == getClonerName(); } -void BaseCloner::pauseForFuzzer(BaseClonerStage* stage) { - // These are the stages that the initial sync fuzzer expects to be able to pause on using the - // syncronization fail points. - static const auto initialSyncPauseStages = - std::vector<std::string>{"listCollections", "listIndexes", "listDatabases"}; - - if (MONGO_unlikely(initialSyncFuzzerSynchronizationPoint1.shouldFail())) { - if (std::find(initialSyncPauseStages.begin(), - initialSyncPauseStages.end(), - stage->getName()) != initialSyncPauseStages.end()) { - // These failpoints are set and unset by the InitialSyncTest fixture to cause initial - // sync to pause so that the Initial Sync Fuzzer can run commands on the sync source. - // nb: This log message is specifically checked for in - // initial_sync_test_fixture_test.js, so if you change it here you will need to change - // it there. - LOGV2(21066, - "Collection Cloner scheduled a remote command on the {stage}", - "Collection Cloner scheduled a remote command", - "stage"_attr = describeForFuzzer(stage)); - LOGV2(21067, "initialSyncFuzzerSynchronizationPoint1 fail point enabled"); - initialSyncFuzzerSynchronizationPoint1.pauseWhileSet(); - - if (MONGO_unlikely(initialSyncFuzzerSynchronizationPoint2.shouldFail())) { - LOGV2(21068, "initialSyncFuzzerSynchronizationPoint2 fail point enabled"); - initialSyncFuzzerSynchronizationPoint2.pauseWhileSet(); - } - } - } -} - BaseCloner::AfterStageBehavior BaseCloner::runStage(BaseClonerStage* stage) { LOGV2_DEBUG(21069, 1, @@ -181,81 +145,6 @@ BaseCloner::AfterStageBehavior BaseCloner::runStage(BaseClonerStage* stage) { return afterStageBehavior; } -void BaseCloner::clearRetryingState() { - _retryableOp = boost::none; -} - -Status BaseCloner::checkSyncSourceIsStillValid() { - WireVersion wireVersion; - { - stdx::lock_guard<ReplSyncSharedData> lk(*_sharedData); - auto wireVersionOpt = _sharedData->getSyncSourceWireVersion(lk); - // The wire version should always have been set by the time this is called. - invariant(wireVersionOpt); - wireVersion = *wireVersionOpt; - } - if (wireVersion >= WireVersion::RESUMABLE_INITIAL_SYNC) { - auto status = checkInitialSyncIdIsUnchanged(); - if (!status.isOK()) - return status; - } - return checkRollBackIdIsUnchanged(); -} - -Status BaseCloner::checkInitialSyncIdIsUnchanged() { - uassert(ErrorCodes::InitialSyncFailure, - "Sync source was downgraded and no longer supports resumable initial sync", - getClient()->getMaxWireVersion() >= WireVersion::RESUMABLE_INITIAL_SYNC); - BSONObj initialSyncId; - try { - initialSyncId = getClient()->findOne( - ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(), Query()); - } catch (DBException& e) { - if (ErrorCodes::isRetriableError(e)) { - auto status = e.toStatus().withContext( - ": failed while attempting to retrieve initial sync ID after re-connect"); - LOGV2_DEBUG( - 4608505, 1, "Retrieving Initial Sync ID retriable error", "error"_attr = status); - return status; - } - throw; - } - uassert(ErrorCodes::InitialSyncFailure, - "Cannot retrieve sync source initial sync ID", - !initialSyncId.isEmpty()); - InitialSyncIdDocument initialSyncIdDoc = - InitialSyncIdDocument::parse(IDLParserErrorContext("initialSyncId"), initialSyncId); - - stdx::lock_guard<ReplSyncSharedData> lk(*_sharedData); - uassert(ErrorCodes::InitialSyncFailure, - "Sync source has been resynced since we started syncing from it", - _sharedData->getInitialSyncSourceId(lk) == initialSyncIdDoc.get_id()); - return Status::OK(); -} - -Status BaseCloner::checkRollBackIdIsUnchanged() { - BSONObj info; - try { - getClient()->simpleCommand("admin", &info, "replSetGetRBID"); - } catch (DBException& e) { - if (ErrorCodes::isRetriableError(e)) { - static constexpr char errorMsg[] = - "Failed while attempting to retrieve rollBackId after re-connect"; - LOGV2_DEBUG(21073, 1, errorMsg, "error"_attr = e); - return e.toStatus().withContext(errorMsg); - } - throw; - } - uassert( - 31298, "Sync source returned invalid result from replSetGetRBID", info["rbid"].isNumber()); - auto rollBackId = info["rbid"].numberInt(); - uassert(ErrorCodes::UnrecoverableRollbackError, - str::stream() << "Rollback occurred on our sync source " << getSource() - << " during initial sync", - rollBackId == _sharedData->getRollBackId()); - return Status::OK(); -} - BaseCloner::AfterStageBehavior BaseCloner::runStageWithRetries(BaseClonerStage* stage) { ON_BLOCK_EXIT([this] { clearRetryingState(); }); Status lastError = Status::OK(); @@ -283,48 +172,14 @@ BaseCloner::AfterStageBehavior BaseCloner::runStageWithRetries(BaseClonerStage* }, isThisStageFailPoint); LOGV2(21075, - "Initial Sync retrying {cloner} stage {stage} due to " + "Sync process retrying {cloner} stage {stage} due to " "{error}", - "Initial Sync retrying cloner stage due to error", + "Sync process retrying cloner stage due to error", "cloner"_attr = getClonerName(), "stage"_attr = stage->getName(), "error"_attr = lastError); - bool shouldRetry = [&] { - stdx::lock_guard<ReplSyncSharedData> lk(*_sharedData); - return _sharedData->shouldRetryOperation(lk, &_retryableOp); - }(); - if (!shouldRetry) { - auto status = lastError.withContext( - str::stream() << ": Exceeded initialSyncTransientErrorRetryPeriodSeconds " - << _sharedData->getAllowedOutageDuration( - stdx::lock_guard<ReplSyncSharedData>(*_sharedData))); - setSyncFailedStatus(status); - uassertStatusOK(status); - } - hangBeforeCheckingRollBackIdClonerStage.executeIf( - [&](const BSONObj& data) { - LOGV2(21076, - "Cloner {cloner} hanging before checking rollBackId for stage " - "{stage}", - "Cloner hanging before checking rollBackId", - "cloner"_attr = getClonerName(), - "stage"_attr = stage->getName()); - while (!mustExit() && - hangBeforeCheckingRollBackIdClonerStage.shouldFail( - isThisStageFailPoint)) { - sleepmillis(100); - } - }, - isThisStageFailPoint); - if (stage->checkSyncSourceValidityOnRetry()) { - // If checkSyncSourceIsStillValid fails without throwing, it means a network - // error occurred and it's safe to continue (which will cause another retry). - if (!checkSyncSourceIsStillValid().isOK()) - continue; - // After successfully checking the sync source validity, the client should - // always be OK. - invariant(!getClient()->isFailed()); - } + // Execute any per-retry logic needed by the cloner. + handleStageAttemptFailed(stage, lastError); } return stage->run(); } catch (DBException& e) { diff --git a/src/mongo/db/repl/base_cloner.h b/src/mongo/db/repl/base_cloner.h index f20eac12433..4ff4a5f7346 100644 --- a/src/mongo/db/repl/base_cloner.h +++ b/src/mongo/db/repl/base_cloner.h @@ -59,7 +59,7 @@ public: /** * run() catches all database exceptions and stores them in _status, to simplify error * handling in the caller above. It returns its own _status if that is not OK, otherwise - * the shared initial sync status. + * the shared sync status. */ Status run(); @@ -97,7 +97,7 @@ protected: // isTransientError() method determines whether the exception is retryable or not; usually // network errors will be retryable and other errors will not. If the error is retryable, // the BaseCloner framework will attempt to reconnect the client and run the stage again. If - // it is not, the exception will be propagated up and fail the initial sync attempt entirely. + // it is not, the exception will be propagated up and fail the sync attempt entirely. class BaseClonerStage { public: BaseClonerStage(std::string name) : _name(name){}; @@ -201,7 +201,7 @@ protected: void setSyncFailedStatus(Status status); /** - * Takes the initial sync status lock and checks the initial sync status. + * Takes the sync status lock and checks the sync status. * Used to make sure failpoints exit on process shutdown. */ bool mustExit(); @@ -210,54 +210,52 @@ protected: * A stage may, but is not required, to call this when we should clear the retrying state * because the operation has at least partially succeeded. If the stage does not call this, * the retrying state is cleared upon successful completion of the entire stage. + * + * Left blank here but may be overriden. */ - void clearRetryingState(); - -private: - virtual ClonerStages getStages() = 0; + virtual void clearRetryingState() {} /** - * Code to be run before and after the stages respectively. This code is not subject to the - * retry logic used in the cloner stages. + * Called every time the base cloner receives an error from a stage. Use this to + * execute any cloner-specific logic such as evaluating retry eligibility, running + * checks on the sync source, etc. + * + * Left blank here but may be overriden. */ - virtual void preStage() {} - virtual void postStage() {} - - AfterStageBehavior runStage(BaseClonerStage* stage); - - AfterStageBehavior runStageWithRetries(BaseClonerStage* stage); + virtual void handleStageAttemptFailed(BaseClonerStage* stage, Status lastError) {} /** - * Make sure the initial sync ID on the sync source has not changed. Throws an exception - * if it has. Returns a not-OK status if a network error occurs. + * Supports pausing at certain stages for a fuzzer test framework. + * + * Left blank but may be overriden. */ - Status checkInitialSyncIdIsUnchanged(); + virtual void pauseForFuzzer(BaseClonerStage* stage) {} /** - * Make sure the rollback ID has not changed. Throws an exception if it has. Returns - * a not-OK status if a network error occurs. + * Provides part of a log message for the sync process describing the namespace the + * cloner is operating on. It must start with the database name, followed by the + * string ' db: { ', followed by the stage name, followed by ': ' and the collection UUID + * if known. + * + * Left blank but may be overriden. */ - Status checkRollBackIdIsUnchanged(); + virtual std::string describeForFuzzer(BaseClonerStage*) const { + return ""; + } - /** - * Does validity checks on the sync source. If the sync source is now no longer usable, - * throws an exception. Returns a not-OK status if a network error occurs or if the sync - * source is temporarily unusable (e.g. restarting). - */ - Status checkSyncSourceIsStillValid(); +private: + virtual ClonerStages getStages() = 0; /** - * Supports pausing at certain stages for the initial sync fuzzer test framework. + * Code to be run before and after the stages respectively. This code is not subject to the + * retry logic used in the cloner stages. */ - void pauseForFuzzer(BaseClonerStage* stage); + virtual void preStage() {} + virtual void postStage() {} - /** - * Provides part of a log message for the initial sync describing the namespace the - * cloner is operating on. It must start with the database name, followed by the - * string ' db: { ', followed by the stage name, followed by ': ' and the collection UUID - * if known. - */ - virtual std::string describeForFuzzer(BaseClonerStage*) const = 0; + AfterStageBehavior runStage(BaseClonerStage* stage); + + AfterStageBehavior runStageWithRetries(BaseClonerStage* stage); AfterStageBehavior runStages(); @@ -287,9 +285,6 @@ private: // _stopAfterStage is used for unit testing and causes the cloner to exit after a given // stage. std::string _stopAfterStage; // (X) - - // Operation that may currently be retrying. - ReplSyncSharedData::RetryableOperation _retryableOp; // (X) }; } // namespace repl diff --git a/src/mongo/db/repl/collection_cloner.cpp b/src/mongo/db/repl/collection_cloner.cpp index f5d4c6a0847..77a37ee1a38 100644 --- a/src/mongo/db/repl/collection_cloner.cpp +++ b/src/mongo/db/repl/collection_cloner.cpp @@ -61,7 +61,8 @@ CollectionCloner::CollectionCloner(const NamespaceString& sourceNss, DBClientConnection* client, StorageInterface* storageInterface, ThreadPool* dbPool) - : BaseCloner("CollectionCloner"_sd, sharedData, source, client, storageInterface, dbPool), + : InitialSyncBaseCloner( + "CollectionCloner"_sd, sharedData, source, client, storageInterface, dbPool), _sourceNss(sourceNss), _collectionOptions(collectionOptions), _sourceDbAndUuid(NamespaceString("UNINITIALIZED")), diff --git a/src/mongo/db/repl/collection_cloner.h b/src/mongo/db/repl/collection_cloner.h index 8c171c4021a..7a2033d0192 100644 --- a/src/mongo/db/repl/collection_cloner.h +++ b/src/mongo/db/repl/collection_cloner.h @@ -32,8 +32,8 @@ #include <memory> #include <vector> -#include "mongo/base/checked_cast.h" #include "mongo/db/repl/base_cloner.h" +#include "mongo/db/repl/initial_sync_base_cloner.h" #include "mongo/db/repl/initial_sync_shared_data.h" #include "mongo/db/repl/task_runner.h" #include "mongo/util/progress_meter.h" @@ -46,7 +46,7 @@ const int kProgressMeterSecondsBetween = 60; const int kProgressMeterCheckInterval = 128; } // namespace -class CollectionCloner final : public BaseCloner { +class CollectionCloner final : public InitialSyncBaseCloner { public: struct Stats { static constexpr StringData kDocumentsToCopyFieldName = "documentsToCopy"_sd; @@ -124,10 +124,6 @@ protected: bool isMyFailPoint(const BSONObj& data) const final; - InitialSyncSharedData* getSharedData() const override { - return checked_cast<InitialSyncSharedData*>(BaseCloner::getSharedData()); - } - private: friend class CollectionClonerTest; diff --git a/src/mongo/db/repl/database_cloner.cpp b/src/mongo/db/repl/database_cloner.cpp index 26c0bcb2a40..3686d5a2cb1 100644 --- a/src/mongo/db/repl/database_cloner.cpp +++ b/src/mongo/db/repl/database_cloner.cpp @@ -48,7 +48,8 @@ DatabaseCloner::DatabaseCloner(const std::string& dbName, DBClientConnection* client, StorageInterface* storageInterface, ThreadPool* dbPool) - : BaseCloner("DatabaseCloner"_sd, sharedData, source, client, storageInterface, dbPool), + : InitialSyncBaseCloner( + "DatabaseCloner"_sd, sharedData, source, client, storageInterface, dbPool), _dbName(dbName), _listCollectionsStage("listCollections", this, &DatabaseCloner::listCollectionsStage) { invariant(!dbName.empty()); diff --git a/src/mongo/db/repl/database_cloner.h b/src/mongo/db/repl/database_cloner.h index e1787ad7a5e..c172c80876a 100644 --- a/src/mongo/db/repl/database_cloner.h +++ b/src/mongo/db/repl/database_cloner.h @@ -31,15 +31,15 @@ #include <vector> -#include "mongo/base/checked_cast.h" #include "mongo/db/repl/base_cloner.h" #include "mongo/db/repl/collection_cloner.h" +#include "mongo/db/repl/initial_sync_base_cloner.h" #include "mongo/db/repl/initial_sync_shared_data.h" namespace mongo { namespace repl { -class DatabaseCloner final : public BaseCloner { +class DatabaseCloner final : public InitialSyncBaseCloner { public: struct Stats { std::string dbname; @@ -74,11 +74,6 @@ protected: bool isMyFailPoint(const BSONObj& data) const final; - InitialSyncSharedData* getSharedData() const override { - return checked_cast<InitialSyncSharedData*>(BaseCloner::getSharedData()); - } - - private: friend class DatabaseClonerTest; diff --git a/src/mongo/db/repl/initial_sync_base_cloner.cpp b/src/mongo/db/repl/initial_sync_base_cloner.cpp new file mode 100644 index 00000000000..2840191d985 --- /dev/null +++ b/src/mongo/db/repl/initial_sync_base_cloner.cpp @@ -0,0 +1,209 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplicationInitialSync + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/initial_sync_base_cloner.h" +#include "mongo/db/repl/replication_consistency_markers_gen.h" +#include "mongo/db/repl/replication_consistency_markers_impl.h" +#include "mongo/logv2/log.h" + +namespace mongo { +namespace { +MONGO_FAIL_POINT_DEFINE(hangBeforeCheckingRollBackIdClonerStage); +} // namespace +namespace repl { + +// These failpoints are shared with initial_syncer and so must not be in the unnamed namespace. +MONGO_FAIL_POINT_DEFINE(initialSyncFuzzerSynchronizationPoint1); +MONGO_FAIL_POINT_DEFINE(initialSyncFuzzerSynchronizationPoint2); + +InitialSyncBaseCloner::InitialSyncBaseCloner(StringData clonerName, + InitialSyncSharedData* sharedData, + const HostAndPort& source, + DBClientConnection* client, + StorageInterface* storageInterface, + ThreadPool* dbPool) + : BaseCloner(clonerName, sharedData, source, client, storageInterface, dbPool) {} + +void InitialSyncBaseCloner::clearRetryingState() { + _retryableOp = boost::none; +} + +void InitialSyncBaseCloner::handleStageAttemptFailed(BaseClonerStage* stage, Status lastError) { + auto isThisStageFailPoint = [this, stage](const BSONObj& data) { + return data["stage"].str() == stage->getName() && isMyFailPoint(data); + }; + + bool shouldRetry = [&] { + stdx::lock_guard<InitialSyncSharedData> lk(*getSharedData()); + return getSharedData()->shouldRetryOperation(lk, &_retryableOp); + }(); + if (!shouldRetry) { + auto status = lastError.withContext( + str::stream() << ": Exceeded initialSyncTransientErrorRetryPeriodSeconds " + << getSharedData()->getAllowedOutageDuration( + stdx::lock_guard<InitialSyncSharedData>(*getSharedData()))); + setSyncFailedStatus(status); + uassertStatusOK(status); + } + hangBeforeCheckingRollBackIdClonerStage.executeIf( + [&](const BSONObj& data) { + LOGV2(21076, + "Initial sync cloner {cloner} hanging before checking rollBackId for stage " + "{stage}", + "Initial sync cloner hanging before checking rollBackId", + "cloner"_attr = getClonerName(), + "stage"_attr = stage->getName()); + while (!mustExit() && + hangBeforeCheckingRollBackIdClonerStage.shouldFail(isThisStageFailPoint)) { + sleepmillis(100); + } + }, + isThisStageFailPoint); + + // This includes checking the sync source member state, checking the rollback ID, + // and checking the sync source initial sync ID. + if (stage->checkSyncSourceValidityOnRetry()) { + // If checkSyncSourceIsStillValid fails without throwing, it means a network + // error occurred and it's safe to continue (which will cause another retry). + if (!checkSyncSourceIsStillValid().isOK()) + return; + // After successfully checking the sync source validity, the client should + // always be OK. + invariant(!getClient()->isFailed()); + } +} + +Status InitialSyncBaseCloner::checkSyncSourceIsStillValid() { + + WireVersion wireVersion; + { + stdx::lock_guard<ReplSyncSharedData> lk(*getSharedData()); + auto wireVersionOpt = getSharedData()->getSyncSourceWireVersion(lk); + // The wire version should always have been set by the time this is called. + invariant(wireVersionOpt); + wireVersion = *wireVersionOpt; + } + if (wireVersion >= WireVersion::RESUMABLE_INITIAL_SYNC) { + auto status = checkInitialSyncIdIsUnchanged(); + if (!status.isOK()) + return status; + } + return checkRollBackIdIsUnchanged(); +} + +Status InitialSyncBaseCloner::checkInitialSyncIdIsUnchanged() { + uassert(ErrorCodes::InitialSyncFailure, + "Sync source was downgraded and no longer supports resumable initial sync", + getClient()->getMaxWireVersion() >= WireVersion::RESUMABLE_INITIAL_SYNC); + BSONObj initialSyncId; + try { + initialSyncId = getClient()->findOne( + ReplicationConsistencyMarkersImpl::kDefaultInitialSyncIdNamespace.toString(), Query()); + } catch (DBException& e) { + if (ErrorCodes::isRetriableError(e)) { + auto status = e.toStatus().withContext( + ": failed while attempting to retrieve initial sync ID after re-connect"); + LOGV2_DEBUG( + 4608505, 1, "Retrieving Initial Sync ID retriable error", "error"_attr = status); + return status; + } + throw; + } + uassert(ErrorCodes::InitialSyncFailure, + "Cannot retrieve sync source initial sync ID", + !initialSyncId.isEmpty()); + InitialSyncIdDocument initialSyncIdDoc = + InitialSyncIdDocument::parse(IDLParserErrorContext("initialSyncId"), initialSyncId); + + stdx::lock_guard<ReplSyncSharedData> lk(*getSharedData()); + uassert(ErrorCodes::InitialSyncFailure, + "Sync source has been resynced since we started syncing from it", + getSharedData()->getInitialSyncSourceId(lk) == initialSyncIdDoc.get_id()); + return Status::OK(); +} + +Status InitialSyncBaseCloner::checkRollBackIdIsUnchanged() { + BSONObj info; + try { + getClient()->simpleCommand("admin", &info, "replSetGetRBID"); + } catch (DBException& e) { + if (ErrorCodes::isRetriableError(e)) { + static constexpr char errorMsg[] = + "Failed while attempting to retrieve rollBackId after re-connect"; + LOGV2_DEBUG(21073, 1, errorMsg, "error"_attr = e); + return e.toStatus().withContext(errorMsg); + } + throw; + } + uassert( + 31298, "Sync source returned invalid result from replSetGetRBID", info["rbid"].isNumber()); + auto rollBackId = info["rbid"].numberInt(); + uassert(ErrorCodes::UnrecoverableRollbackError, + str::stream() << "Rollback occurred on our sync source " << getSource() + << " during initial sync", + rollBackId == getSharedData()->getRollBackId()); + return Status::OK(); +} + +void InitialSyncBaseCloner::pauseForFuzzer(BaseClonerStage* stage) { + // These are the stages that the initial sync fuzzer expects to be able to pause on using the + // syncronization fail points. + static const auto initialSyncPauseStages = + std::vector<std::string>{"listCollections", "listIndexes", "listDatabases"}; + + if (MONGO_unlikely(initialSyncFuzzerSynchronizationPoint1.shouldFail())) { + if (std::find(initialSyncPauseStages.begin(), + initialSyncPauseStages.end(), + stage->getName()) != initialSyncPauseStages.end()) { + // These failpoints are set and unset by the InitialSyncTest fixture to cause initial + // sync to pause so that the Initial Sync Fuzzer can run commands on the sync source. + // nb: This log message is specifically checked for in + // initial_sync_test_fixture_test.js, so if you change it here you will need to change + // it there. + LOGV2(21066, + "Collection Cloner scheduled a remote command on the {stage}", + "Collection Cloner scheduled a remote command", + "stage"_attr = describeForFuzzer(stage)); + LOGV2(21067, "initialSyncFuzzerSynchronizationPoint1 fail point enabled"); + initialSyncFuzzerSynchronizationPoint1.pauseWhileSet(); + + if (MONGO_unlikely(initialSyncFuzzerSynchronizationPoint2.shouldFail())) { + LOGV2(21068, "initialSyncFuzzerSynchronizationPoint2 fail point enabled"); + initialSyncFuzzerSynchronizationPoint2.pauseWhileSet(); + } + } + } +} + +} // namespace repl +} // namespace mongo
\ No newline at end of file diff --git a/src/mongo/db/repl/initial_sync_base_cloner.h b/src/mongo/db/repl/initial_sync_base_cloner.h new file mode 100644 index 00000000000..ff1f28142a9 --- /dev/null +++ b/src/mongo/db/repl/initial_sync_base_cloner.h @@ -0,0 +1,95 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/checked_cast.h" +#include "mongo/db/repl/base_cloner.h" +#include "mongo/db/repl/initial_sync_shared_data.h" + +namespace mongo { +namespace repl { + +class InitialSyncBaseCloner : public BaseCloner { +public: + InitialSyncBaseCloner(StringData clonerName, + InitialSyncSharedData* sharedData, + const HostAndPort& source, + DBClientConnection* client, + StorageInterface* storageInterface, + ThreadPool* dbPool); + virtual ~InitialSyncBaseCloner() = default; + +protected: + InitialSyncSharedData* getSharedData() const final { + return checked_cast<InitialSyncSharedData*>(BaseCloner::getSharedData()); + } + +private: + /** + * Make sure the initial sync ID on the sync source has not changed. Throws an exception + * if it has. Returns a not-OK status if a network error occurs. + */ + Status checkInitialSyncIdIsUnchanged(); + + /** + * Make sure the rollback ID has not changed. Throws an exception if it has. Returns + * a not-OK status if a network error occurs. + */ + Status checkRollBackIdIsUnchanged(); + + /** + * Does validity checks on the sync source. If the sync source is now no longer usable, + * throws an exception. Returns a not-OK status if a network error occurs or if the sync + * source is temporarily unusable (e.g. restarting). + */ + Status checkSyncSourceIsStillValid(); + + /** + * Clears _retryableOp. + */ + void clearRetryingState() final; + + /** + * Checks to see if we are still within our allowed outage duration. + * Also probes the sync source for clone-fatal conditions, such as rollback. + */ + void handleStageAttemptFailed(BaseClonerStage* stage, Status lastError); + + /** + * Allows the initial sync fuzzer to pause cloner execution at specific points. + */ + void pauseForFuzzer(BaseClonerStage* stage) final; + + // Operation that may currently be retrying. + ReplSyncSharedData::RetryableOperation _retryableOp; +}; + +} // namespace repl +} // namespace mongo
\ No newline at end of file diff --git a/src/mongo/db/repl/tenant_all_database_cloner.cpp b/src/mongo/db/repl/tenant_all_database_cloner.cpp index 6a4533bf8f2..5aaf3a116c7 100644 --- a/src/mongo/db/repl/tenant_all_database_cloner.cpp +++ b/src/mongo/db/repl/tenant_all_database_cloner.cpp @@ -53,7 +53,7 @@ TenantAllDatabaseCloner::TenantAllDatabaseCloner(TenantMigrationSharedData* shar StorageInterface* storageInterface, ThreadPool* dbPool, StringData tenantId) - : BaseCloner( + : TenantMigrationBaseCloner( "TenantAllDatabaseCloner"_sd, sharedData, source, client, storageInterface, dbPool), _tenantId(tenantId), _listDatabasesStage("listDatabases", this, &TenantAllDatabaseCloner::listDatabasesStage) {} diff --git a/src/mongo/db/repl/tenant_all_database_cloner.h b/src/mongo/db/repl/tenant_all_database_cloner.h index 07405375e46..681e7a2bf4e 100644 --- a/src/mongo/db/repl/tenant_all_database_cloner.h +++ b/src/mongo/db/repl/tenant_all_database_cloner.h @@ -31,15 +31,15 @@ #include <vector> -#include "mongo/base/checked_cast.h" #include "mongo/db/repl/base_cloner.h" #include "mongo/db/repl/tenant_database_cloner.h" +#include "mongo/db/repl/tenant_migration_base_cloner.h" #include "mongo/db/repl/tenant_migration_shared_data.h" namespace mongo { namespace repl { -class TenantAllDatabaseCloner final : public BaseCloner { +class TenantAllDatabaseCloner final : public TenantMigrationBaseCloner { public: struct Stats { size_t databasesCloned{0}; @@ -68,10 +68,6 @@ public: protected: ClonerStages getStages() final; - TenantMigrationSharedData* getSharedData() const override { - return checked_cast<TenantMigrationSharedData*>(BaseCloner::getSharedData()); - } - private: friend class TenantAllDatabaseClonerTest; @@ -100,10 +96,6 @@ private: */ void postStage() final; - std::string describeForFuzzer(BaseClonerStage* stage) const final { - return "admin db: { " + stage->getName() + ": 1 }"; - } - // All member variables are labeled with one of the following codes indicating the // synchronization rules for accessing them. // diff --git a/src/mongo/db/repl/tenant_collection_cloner.cpp b/src/mongo/db/repl/tenant_collection_cloner.cpp index 5f70673ed3b..8d90c77a2d3 100644 --- a/src/mongo/db/repl/tenant_collection_cloner.cpp +++ b/src/mongo/db/repl/tenant_collection_cloner.cpp @@ -71,7 +71,8 @@ TenantCollectionCloner::TenantCollectionCloner(const NamespaceString& sourceNss, StorageInterface* storageInterface, ThreadPool* dbPool, StringData tenantId) - : BaseCloner("TenantCollectionCloner"_sd, sharedData, source, client, storageInterface, dbPool), + : TenantMigrationBaseCloner( + "TenantCollectionCloner"_sd, sharedData, source, client, storageInterface, dbPool), _sourceNss(sourceNss), _collectionOptions(collectionOptions), _sourceDbAndUuid(NamespaceString("UNINITIALIZED")), diff --git a/src/mongo/db/repl/tenant_collection_cloner.h b/src/mongo/db/repl/tenant_collection_cloner.h index b86a259972b..76b44cbfca3 100644 --- a/src/mongo/db/repl/tenant_collection_cloner.h +++ b/src/mongo/db/repl/tenant_collection_cloner.h @@ -32,16 +32,16 @@ #include <memory> #include <vector> -#include "mongo/base/checked_cast.h" #include "mongo/db/repl/base_cloner.h" #include "mongo/db/repl/task_runner.h" +#include "mongo/db/repl/tenant_migration_base_cloner.h" #include "mongo/db/repl/tenant_migration_shared_data.h" #include "mongo/util/progress_meter.h" namespace mongo { namespace repl { -class TenantCollectionCloner : public BaseCloner { +class TenantCollectionCloner : public TenantMigrationBaseCloner { public: struct Stats { static constexpr StringData kDocumentsToCopyFieldName = "documentsToCopy"_sd; @@ -117,10 +117,6 @@ protected: bool isMyFailPoint(const BSONObj& data) const final; - TenantMigrationSharedData* getSharedData() const override { - return checked_cast<TenantMigrationSharedData*>(BaseCloner::getSharedData()); - } - private: friend class TenantCollectionClonerTest; friend class TenantCollectionClonerStage; @@ -139,11 +135,6 @@ private: } }; - std::string describeForFuzzer(BaseClonerStage* stage) const final { - return _sourceNss.db() + " db: { " + stage->getName() + ": UUID(\"" + - _sourceDbAndUuid.uuid()->toString() + "\") coll: " + _sourceNss.coll() + " }"; - } - /** * The preStage sets the start time in _stats. */ diff --git a/src/mongo/db/repl/tenant_database_cloner.cpp b/src/mongo/db/repl/tenant_database_cloner.cpp index dfa6ef52d0b..2d83fa28018 100644 --- a/src/mongo/db/repl/tenant_database_cloner.cpp +++ b/src/mongo/db/repl/tenant_database_cloner.cpp @@ -55,7 +55,8 @@ TenantDatabaseCloner::TenantDatabaseCloner(const std::string& dbName, StorageInterface* storageInterface, ThreadPool* dbPool, StringData tenantId) - : BaseCloner("TenantDatabaseCloner"_sd, sharedData, source, client, storageInterface, dbPool), + : TenantMigrationBaseCloner( + "TenantDatabaseCloner"_sd, sharedData, source, client, storageInterface, dbPool), _dbName(dbName), _listCollectionsStage("listCollections", this, &TenantDatabaseCloner::listCollectionsStage), _tenantId(tenantId) { diff --git a/src/mongo/db/repl/tenant_database_cloner.h b/src/mongo/db/repl/tenant_database_cloner.h index 4730381f485..87ba04ea3a5 100644 --- a/src/mongo/db/repl/tenant_database_cloner.h +++ b/src/mongo/db/repl/tenant_database_cloner.h @@ -31,15 +31,15 @@ #include <vector> -#include "mongo/base/checked_cast.h" #include "mongo/db/repl/base_cloner.h" #include "mongo/db/repl/tenant_collection_cloner.h" +#include "mongo/db/repl/tenant_migration_base_cloner.h" #include "mongo/db/repl/tenant_migration_shared_data.h" namespace mongo { namespace repl { -class TenantDatabaseCloner final : public BaseCloner { +class TenantDatabaseCloner final : public TenantMigrationBaseCloner { public: struct Stats { std::string dbname; @@ -75,10 +75,6 @@ protected: bool isMyFailPoint(const BSONObj& data) const final; - TenantMigrationSharedData* getSharedData() const override { - return checked_cast<TenantMigrationSharedData*>(BaseCloner::getSharedData()); - } - private: friend class TenantDatabaseClonerTest; @@ -111,10 +107,6 @@ private: */ void postStage() final; - std::string describeForFuzzer(BaseClonerStage* stage) const final { - return _dbName + " db: { " + stage->getName() + ": 1 } "; - } - // All member variables are labeled with one of the following codes indicating the // synchronization rules for accessing them. // diff --git a/src/mongo/db/repl/tenant_migration_base_cloner.cpp b/src/mongo/db/repl/tenant_migration_base_cloner.cpp new file mode 100644 index 00000000000..977ddb5247f --- /dev/null +++ b/src/mongo/db/repl/tenant_migration_base_cloner.cpp @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kReplicationInitialSync + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/tenant_migration_base_cloner.h" +#include "mongo/logv2/log.h" + +namespace mongo { +namespace repl { + +TenantMigrationBaseCloner::TenantMigrationBaseCloner(StringData clonerName, + TenantMigrationSharedData* sharedData, + const HostAndPort& source, + DBClientConnection* client, + StorageInterface* storageInterface, + ThreadPool* dbPool) + : BaseCloner(clonerName, sharedData, source, client, storageInterface, dbPool) {} + +} // namespace repl +} // namespace mongo
\ No newline at end of file diff --git a/src/mongo/db/repl/tenant_migration_base_cloner.h b/src/mongo/db/repl/tenant_migration_base_cloner.h new file mode 100644 index 00000000000..9f9d6509a75 --- /dev/null +++ b/src/mongo/db/repl/tenant_migration_base_cloner.h @@ -0,0 +1,56 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/checked_cast.h" +#include "mongo/db/repl/base_cloner.h" +#include "mongo/db/repl/tenant_migration_shared_data.h" + +namespace mongo { +namespace repl { + +class TenantMigrationBaseCloner : public BaseCloner { +public: + TenantMigrationBaseCloner(StringData clonerName, + TenantMigrationSharedData* sharedData, + const HostAndPort& source, + DBClientConnection* client, + StorageInterface* storageInterface, + ThreadPool* dbPool); + virtual ~TenantMigrationBaseCloner() = default; + +protected: + TenantMigrationSharedData* getSharedData() const override { + return checked_cast<TenantMigrationSharedData*>(BaseCloner::getSharedData()); + } +}; + +} // namespace repl +} // namespace mongo
\ No newline at end of file |