diff options
author | Vesselina Ratcheva <vesselina.ratcheva@10gen.com> | 2020-08-03 20:24:06 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-08-12 20:57:45 +0000 |
commit | c999fbe8a0cb200ed0e23e23cddafde4abb028e8 (patch) | |
tree | 4b1e330597d2ff4e462447316a988bf52a06bc5b | |
parent | d86d2f5df9c93da3c9a9d6c7e8852b5c674cb0bb (diff) | |
download | mongo-c999fbe8a0cb200ed0e23e23cddafde4abb028e8.tar.gz |
SERVER-48845 Implement TenantCollectionCloner
-rw-r--r-- | src/mongo/db/repl/SConscript | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/cloner_utils.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/cloner_utils.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface.h | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_mock.h | 15 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_all_database_cloner.cpp | 35 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_all_database_cloner.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_all_database_cloner_test.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_collection_cloner.cpp | 336 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_collection_cloner.h | 161 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_collection_cloner_test.cpp | 574 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_database_cloner.cpp | 41 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_database_cloner.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/tenant_database_cloner_test.cpp | 421 |
16 files changed, 1397 insertions, 261 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index e0185666445..f6b8e2054f2 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1008,10 +1008,14 @@ env.Library( 'base_cloner', 'cloner_utils', 'initial_sync_shared_data', + 'task_runner', '$BUILD_DIR/mongo/base', ], LIBDEPS_PRIVATE=[ + 'repl_server_parameters', '$BUILD_DIR/mongo/db/commands/list_collections_filter', + '$BUILD_DIR/mongo/rpc/metadata', + '$BUILD_DIR/mongo/util/progress_meter', ] ) @@ -1504,6 +1508,7 @@ env.CppUnitTest( 'database_cloner_test.cpp', 'initial_sync_shared_data_test.cpp', 'tenant_all_database_cloner_test.cpp', + 'tenant_collection_cloner_test.cpp', 'tenant_database_cloner_test.cpp' ], LIBDEPS=[ diff --git a/src/mongo/db/repl/cloner_utils.cpp b/src/mongo/db/repl/cloner_utils.cpp index 988259c0b46..184f9107e4d 100644 --- a/src/mongo/db/repl/cloner_utils.cpp +++ b/src/mongo/db/repl/cloner_utils.cpp @@ -51,5 +51,9 @@ BSONObj ClonerUtils::buildMajorityWaitRequest(Timestamp operationTime) { return bob.obj(); } +bool ClonerUtils::isNamespaceForTenant(NamespaceString nss, StringData prefix) { + return nss.db().startsWith(prefix + "_"); +} + } // namespace repl } // namespace mongo
\ No newline at end of file diff --git a/src/mongo/db/repl/cloner_utils.h b/src/mongo/db/repl/cloner_utils.h index 9f37ac91648..b9acd75b2ee 100644 --- a/src/mongo/db/repl/cloner_utils.h +++ b/src/mongo/db/repl/cloner_utils.h @@ -33,6 +33,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/timestamp.h" +#include "mongo/db/namespace_string.h" namespace mongo { namespace repl { @@ -54,6 +55,11 @@ public: * Assembles a majority read using the operationTime specified as the afterClusterTime. */ static BSONObj buildMajorityWaitRequest(Timestamp operationTime); + + /** + * Checks if the collection belongs to the given tenant. + */ + static bool isNamespaceForTenant(NamespaceString nss, StringData prefix); }; diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h index bb722d04fcb..1aea0e40b93 100644 --- a/src/mongo/db/repl/storage_interface.h +++ b/src/mongo/db/repl/storage_interface.h @@ -160,6 +160,14 @@ public: const CollectionOptions& options) = 0; /** + * Creates all the specified non-_id indexes on a given collection, which must be empty. + */ + virtual Status createIndexesOnEmptyCollection( + OperationContext* opCtx, + const NamespaceString& nss, + const std::vector<BSONObj>& secondaryIndexSpecs) = 0; + + /** * Drops a collection. */ virtual Status dropCollection(OperationContext* opCtx, const NamespaceString& nss) = 0; diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 4127ad6b410..a6e9efc5011 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -493,6 +493,30 @@ Status StorageInterfaceImpl::createCollection(OperationContext* opCtx, }); } +Status StorageInterfaceImpl::createIndexesOnEmptyCollection( + OperationContext* opCtx, + const NamespaceString& nss, + const std::vector<BSONObj>& secondaryIndexSpecs) { + return writeConflictRetry(opCtx, "createIndexesOnEmptyCollection", nss.ns(), [&] { + AutoGetCollection autoColl(opCtx, nss, fixLockModeForSystemDotViewsChanges(nss, MODE_IX)); + WriteUnitOfWork wunit(opCtx); + + for (auto&& spec : secondaryIndexSpecs) { + // Will error if collection is not empty. + auto secIndexSW = + autoColl.getCollection()->getIndexCatalog()->createIndexOnEmptyCollection(opCtx, + spec); + auto status = secIndexSW.getStatus(); + if (!status.isOK()) { + return status; + } + } + + wunit.commit(); + return Status::OK(); + }); +} + Status StorageInterfaceImpl::dropCollection(OperationContext* opCtx, const NamespaceString& nss) { return writeConflictRetry(opCtx, "StorageInterfaceImpl::dropCollection", nss.ns(), [&] { AutoGetDb autoDb(opCtx, nss.db(), MODE_IX); diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h index ac5a4497f9b..6c38fdd5b72 100644 --- a/src/mongo/db/repl/storage_interface_impl.h +++ b/src/mongo/db/repl/storage_interface_impl.h @@ -83,6 +83,10 @@ public: const NamespaceString& nss, const CollectionOptions& options) override; + Status createIndexesOnEmptyCollection(OperationContext* opCtx, + const NamespaceString& nss, + const std::vector<BSONObj>& secondaryIndexSpecs) override; + Status dropCollection(OperationContext* opCtx, const NamespaceString& nss) override; Status truncateCollection(OperationContext* opCtx, const NamespaceString& nss) override; diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h index 3c4ec8e9176..a57c2298110 100644 --- a/src/mongo/db/repl/storage_interface_mock.h +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -104,6 +104,8 @@ public: using CreateOplogFn = std::function<Status(OperationContext*, const NamespaceString&)>; using CreateCollectionFn = std::function<Status(OperationContext*, const NamespaceString&, const CollectionOptions&)>; + using CreateIndexesOnEmptyCollectionFn = std::function<Status( + OperationContext*, const NamespaceString&, const std::vector<BSONObj>&)>; using TruncateCollectionFn = std::function<Status(OperationContext*, const NamespaceString& nss)>; using DropCollectionFn = std::function<Status(OperationContext*, const NamespaceString& nss)>; @@ -172,6 +174,13 @@ public: return createCollFn(opCtx, nss, options); } + Status createIndexesOnEmptyCollection( + OperationContext* opCtx, + const NamespaceString& nss, + const std::vector<BSONObj>& secondaryIndexSpecs) override { + return createIndexesOnEmptyCollFn(opCtx, nss, secondaryIndexSpecs); + } + Status dropCollection(OperationContext* opCtx, const NamespaceString& nss) override { return dropCollFn(opCtx, nss); }; @@ -381,6 +390,12 @@ public: [](OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options) { return Status{ErrorCodes::IllegalOperation, "CreateCollectionFn not implemented."}; }; + CreateIndexesOnEmptyCollectionFn createIndexesOnEmptyCollFn = [](OperationContext* opCtx, + const NamespaceString& nss, + const std::vector<BSONObj>& + secondaryIndexSpecs) { + return Status{ErrorCodes::IllegalOperation, "createIndexesOnEmptyCollFn not implemented."}; + }; TruncateCollectionFn truncateCollFn = [](OperationContext* opCtx, const NamespaceString& nss) { return Status{ErrorCodes::IllegalOperation, "TruncateCollectionFn not implemented."}; }; diff --git a/src/mongo/db/repl/tenant_all_database_cloner.cpp b/src/mongo/db/repl/tenant_all_database_cloner.cpp index 1005ddc5b25..b27e070a4a7 100644 --- a/src/mongo/db/repl/tenant_all_database_cloner.cpp +++ b/src/mongo/db/repl/tenant_all_database_cloner.cpp @@ -52,10 +52,10 @@ TenantAllDatabaseCloner::TenantAllDatabaseCloner(InitialSyncSharedData* sharedDa DBClientConnection* client, StorageInterface* storageInterface, ThreadPool* dbPool, - StringData databasePrefix) + StringData tenantId) : BaseCloner( "TenantAllDatabaseCloner"_sd, sharedData, source, client, storageInterface, dbPool), - _databasePrefix(databasePrefix), + _tenantId(tenantId), _listDatabasesStage("listDatabases", this, &TenantAllDatabaseCloner::listDatabasesStage) {} BaseCloner::ClonerStages TenantAllDatabaseCloner::getStages() { @@ -63,19 +63,23 @@ BaseCloner::ClonerStages TenantAllDatabaseCloner::getStages() { } BaseCloner::AfterStageBehavior TenantAllDatabaseCloner::listDatabasesStage() { + // This will be set after a successful listDatabases command. + _operationTime = Timestamp(); + BSONObj res; - const BSONObj filter = ClonerUtils::makeTenantDatabaseFilter(_databasePrefix); + const BSONObj filter = ClonerUtils::makeTenantDatabaseFilter(_tenantId); auto databasesArray = getClient()->getDatabaseInfos(filter, true /* nameOnly */); - // Do a speculative majority read on the sync source to make sure the databases listed - // exist on a majority of nodes in the set. We do not check the rollbackId - rollback - // would lead to the sync source closing connections so the stage would fail. + // Do a majority read on the sync source to make sure the databases listed exist on a majority + // of nodes in the set. We do not check the rollbackId - rollback would lead to the sync source + // closing connections so the stage would fail. _operationTime = getClient()->getOperationTime(); if (MONGO_unlikely(tenantAllDatabaseClonerHangAfterGettingOperationTime.shouldFail())) { LOGV2(4881504, "Failpoint 'tenantAllDatabaseClonerHangAfterGettingOperationTime' enabled. Blocking " - "until it is disabled."); + "until it is disabled.", + "tenantId"_attr = _tenantId); tenantAllDatabaseClonerHangAfterGettingOperationTime.pauseWhileSet(); } @@ -88,7 +92,11 @@ BaseCloner::AfterStageBehavior TenantAllDatabaseCloner::listDatabasesStage() { // Process and verify the listDatabases results. for (const auto& dbBSON : databasesArray) { - LOGV2_DEBUG(4881508, 2, "Cloner received listDatabases entry", "db"_attr = dbBSON); + LOGV2_DEBUG(4881508, + 2, + "Cloner received listDatabases entry", + "db"_attr = dbBSON, + "tenantId"_attr = _tenantId); uassert(4881505, "Result from donor must have 'name' set", dbBSON.hasField("name")); const auto& dbName = dbBSON["name"].str(); @@ -117,7 +125,8 @@ void TenantAllDatabaseCloner::postStage() { getSource(), getClient(), getStorageInterface(), - getDBPool()); + getDBPool(), + _tenantId); } auto dbStatus = _currentDatabaseCloner->run(); if (dbStatus.isOK()) { @@ -125,14 +134,16 @@ void TenantAllDatabaseCloner::postStage() { 1, "Tenant migration database clone finished", "dbName"_attr = dbName, - "status"_attr = dbStatus); + "status"_attr = dbStatus, + "tenantId"_attr = _tenantId); } else { LOGV2_WARNING(4881501, "Tenant migration database clone failed", "dbName"_attr = dbName, "dbNumber"_attr = (_stats.databasesCloned + 1), "totalDbs"_attr = _databases.size(), - "error"_attr = dbStatus.toString()); + "error"_attr = dbStatus.toString(), + "tenantId"_attr = _tenantId); setInitialSyncFailedStatus(dbStatus); return; } @@ -158,7 +169,7 @@ std::string TenantAllDatabaseCloner::toString() const { stdx::lock_guard<Latch> lk(_mutex); return str::stream() << "tenant migration --" << " active:" << isActive(lk) << " status:" << getStatus(lk).toString() - << " source:" << getSource() + << " source:" << getSource() << " tenantId: " << _tenantId << " db cloners completed:" << _stats.databasesCloned; } diff --git a/src/mongo/db/repl/tenant_all_database_cloner.h b/src/mongo/db/repl/tenant_all_database_cloner.h index 75859f95383..c02ec12046b 100644 --- a/src/mongo/db/repl/tenant_all_database_cloner.h +++ b/src/mongo/db/repl/tenant_all_database_cloner.h @@ -53,7 +53,7 @@ public: DBClientConnection* client, StorageInterface* storageInterface, ThreadPool* dbPool, - StringData databasePrefix); + StringData tenantId); virtual ~TenantAllDatabaseCloner() = default; @@ -111,7 +111,7 @@ private: std::unique_ptr<TenantDatabaseCloner> _currentDatabaseCloner; // (MX) // The database name prefix of the tenant associated with this migration. - std::string _databasePrefix; // (R) + std::string _tenantId; // (R) TenantAllDatabaseClonerStage _listDatabasesStage; // (R) diff --git a/src/mongo/db/repl/tenant_all_database_cloner_test.cpp b/src/mongo/db/repl/tenant_all_database_cloner_test.cpp index 9ae8973c5a3..ae853a9f56f 100644 --- a/src/mongo/db/repl/tenant_all_database_cloner_test.cpp +++ b/src/mongo/db/repl/tenant_all_database_cloner_test.cpp @@ -61,7 +61,7 @@ protected: _mockClient.get(), &_storageInterface, _dbWorkThreadPool.get(), - _databasePrefix); + _tenantId); } std::vector<std::string> getDatabasesFromCloner(TenantAllDatabaseCloner* cloner) { @@ -80,7 +80,7 @@ protected: } static Timestamp _operationTime; - static std::string _databasePrefix; + static std::string _tenantId; static std::string _tenantDbA; static std::string _tenantDbAAB; static std::string _tenantDbABC; @@ -89,11 +89,11 @@ protected: /* static */ Timestamp TenantAllDatabaseClonerTest::_operationTime = Timestamp(12345, 67); -std::string TenantAllDatabaseClonerTest::_databasePrefix = "tenant42"; -std::string TenantAllDatabaseClonerTest::_tenantDbA = _databasePrefix + "_a"; -std::string TenantAllDatabaseClonerTest::_tenantDbAAB = _databasePrefix + "_aab"; -std::string TenantAllDatabaseClonerTest::_tenantDbABC = _databasePrefix + "_abc"; -std::string TenantAllDatabaseClonerTest::_tenantDbB = _databasePrefix + "_b"; +std::string TenantAllDatabaseClonerTest::_tenantId = "tenant42"; +std::string TenantAllDatabaseClonerTest::_tenantDbA = _tenantId + "_a"; +std::string TenantAllDatabaseClonerTest::_tenantDbAAB = _tenantId + "_aab"; +std::string TenantAllDatabaseClonerTest::_tenantDbABC = _tenantId + "_abc"; +std::string TenantAllDatabaseClonerTest::_tenantDbB = _tenantId + "_b"; TEST_F(TenantAllDatabaseClonerTest, FailsOnListDatabases) { Status expectedResult{ErrorCodes::BadValue, "foo"}; diff --git a/src/mongo/db/repl/tenant_collection_cloner.cpp b/src/mongo/db/repl/tenant_collection_cloner.cpp index be1d6c0a3de..a3948e352d4 100644 --- a/src/mongo/db/repl/tenant_collection_cloner.cpp +++ b/src/mongo/db/repl/tenant_collection_cloner.cpp @@ -33,26 +33,74 @@ #include "mongo/base/string_data.h" #include "mongo/db/commands/list_collections_filter.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/repl/cloner_utils.h" #include "mongo/db/repl/database_cloner_gen.h" +#include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/db/repl/tenant_collection_cloner.h" #include "mongo/logv2/log.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/metadata/repl_set_metadata.h" #include "mongo/util/assert_util.h" namespace mongo { namespace repl { +namespace { +const int kProgressMeterSecondsBetween = 60; +const int kProgressMeterCheckInterval = 128; +} // namespace + +// Failpoint which causes the tenant database cloner to hang after it has successfully run +// listIndexes and recorded the results and the operationTime. +MONGO_FAIL_POINT_DEFINE(tenantCollectionClonerHangAfterGettingOperationTime); + +// Failpoint which causes tenant migration to hang after handling the next batch of results from the +// DBClientConnection, optionally limited to a specific collection. +MONGO_FAIL_POINT_DEFINE(tenantMigrationHangCollectionClonerAfterHandlingBatchResponse); + +// Failpoint which causes tenant migration to hang when it has cloned 'numDocsToClone' documents to +// collection 'namespace'. +MONGO_FAIL_POINT_DEFINE(tenantMigrationHangDuringCollectionClone); + TenantCollectionCloner::TenantCollectionCloner(const NamespaceString& sourceNss, const CollectionOptions& collectionOptions, InitialSyncSharedData* sharedData, const HostAndPort& source, DBClientConnection* client, StorageInterface* storageInterface, - ThreadPool* dbPool) + ThreadPool* dbPool, + StringData tenantId) : BaseCloner("TenantCollectionCloner"_sd, sharedData, source, client, storageInterface, dbPool), _sourceNss(sourceNss), _collectionOptions(collectionOptions), _sourceDbAndUuid(NamespaceString("UNINITIALIZED")), - _placeholderStage("placeholder", this, &TenantCollectionCloner::placeholderStage) { + _countStage("count", this, &TenantCollectionCloner::countStage), + _listIndexesStage("listIndexes", this, &TenantCollectionCloner::listIndexesStage), + _createCollectionStage( + "createCollection", this, &TenantCollectionCloner::createCollectionStage), + _queryStage("query", this, &TenantCollectionCloner::queryStage), + _progressMeter(1U, // total will be replaced with count command result. + kProgressMeterSecondsBetween, + kProgressMeterCheckInterval, + "documents copied", + str::stream() << _sourceNss.toString() << " tenant collection clone progress"), + _scheduleDbWorkFn([this](executor::TaskExecutor::CallbackFn work) { + auto task = [ this, work = std::move(work) ]( + OperationContext * opCtx, + const Status& status) mutable noexcept->TaskRunner::NextAction { + try { + work(executor::TaskExecutor::CallbackArgs(nullptr, {}, status, opCtx)); + } catch (const DBException& e) { + setInitialSyncFailedStatus(e.toStatus()); + } + return TaskRunner::NextAction::kDisposeOperationContext; + }; + _dbWorkTaskRunner.schedule(std::move(task)); + return executor::TaskExecutor::CallbackHandle(); + }), + _dbWorkTaskRunner(dbPool), + _tenantId(tenantId) { invariant(sourceNss.isValid()); invariant(collectionOptions.uuid); _sourceDbAndUuid = NamespaceStringOrUUID(sourceNss.db().toString(), *collectionOptions.uuid); @@ -60,18 +108,293 @@ TenantCollectionCloner::TenantCollectionCloner(const NamespaceString& sourceNss, } BaseCloner::ClonerStages TenantCollectionCloner::getStages() { - return {&_placeholderStage}; + return {&_countStage, &_listIndexesStage, &_createCollectionStage, &_queryStage}; +} + +void TenantCollectionCloner::preStage() { + stdx::lock_guard<Latch> lk(_mutex); + _stats.start = getSharedData()->getClock()->now(); +} + +void TenantCollectionCloner::postStage() { + stdx::lock_guard<Latch> lk(_mutex); + _stats.end = getSharedData()->getClock()->now(); +} + +BaseCloner::AfterStageBehavior TenantCollectionCloner::TenantCollectionClonerStage::run() { + try { + return ClonerStage<TenantCollectionCloner>::run(); + } catch (const DBException&) { + getCloner()->waitForDatabaseWorkToComplete(); + throw; + } +} + +BaseCloner::AfterStageBehavior TenantCollectionCloner::countStage() { + auto count = getClient()->count(_sourceDbAndUuid, + {} /* Query */, + QueryOption_SlaveOk, + 0 /* limit */, + 0 /* skip */, + ReadConcernArgs::kImplicitDefault); + + // The count command may return a negative value after an unclean shutdown, + // so we set it to zero here to avoid aborting the collection clone. + // Note that this count value is only used for reporting purposes. + if (count < 0) { + LOGV2_WARNING(4884502, + "Count command returned negative value. Updating to 0 to allow progress " + "meter to function properly", + "namespace"_attr = _sourceNss.ns(), + "tenantId"_attr = _tenantId); + count = 0; + } + + _progressMeter.setTotalWhileRunning(static_cast<unsigned long long>(count)); + { + stdx::lock_guard<Latch> lk(_mutex); + _stats.documentToCopy = count; + } + return kContinueNormally; } -BaseCloner::AfterStageBehavior TenantCollectionCloner::placeholderStage() { +BaseCloner::AfterStageBehavior TenantCollectionCloner::listIndexesStage() { + // This will be set after a successful listCollections command. + _operationTime = Timestamp(); + + auto indexSpecs = getClient()->getIndexSpecs( + _sourceDbAndUuid, false /* includeBuildUUIDs */, QueryOption_SlaveOk); + + // Do a majority read on the sync source to make sure the indexes listed exist on a majority of + // nodes in the set. We do not check the rollbackId - rollback would lead to the sync source + // closing connections so the stage would fail. + _operationTime = getClient()->getOperationTime(); + + tenantCollectionClonerHangAfterGettingOperationTime.executeIf( + [&](const BSONObj&) { + while ( + MONGO_unlikely(tenantCollectionClonerHangAfterGettingOperationTime.shouldFail()) && + !mustExit()) { + LOGV2(4884509, + "tenantCollectionClonerHangAfterGettingOperationTime fail point " + "enabled. Blocking until fail point is disabled", + "namespace"_attr = _sourceNss.toString(), + "tenantId"_attr = _tenantId); + mongo::sleepsecs(1); + } + }, + [&](const BSONObj& data) { + // Only hang when cloning the specified collection, or if no collection was specified. + auto nss = data["nss"].str(); + return nss.empty() || nss == _sourceNss.toString(); + }); + + BSONObj readResult; + BSONObj cmd = ClonerUtils::buildMajorityWaitRequest(_operationTime); + getClient()->runCommand("admin", cmd, readResult, QueryOption_SlaveOk); + uassertStatusOKWithContext( + getStatusFromCommandResult(readResult), + "TenantCollectionCloner failed to get listIndexes result majority-committed"); + + // Process the listIndexes results for finished indexes only. + if (indexSpecs.empty()) { + LOGV2_WARNING(4884503, + "No indexes found for collection while cloning", + "namespace"_attr = _sourceNss.ns(), + "source"_attr = getSource(), + "tenantId"_attr = _tenantId); + } + for (auto&& spec : indexSpecs) { + if (spec.hasField("name") && spec.getStringField("name") == "_id_"_sd) { + _idIndexSpec = spec.getOwned(); + } else { + _readyIndexSpecs.push_back(spec.getOwned()); + } + } + { + stdx::lock_guard<Latch> lk(_mutex); + _stats.indexes = _readyIndexSpecs.size() + (_idIndexSpec.isEmpty() ? 0 : 1); + }; + + if (!_idIndexSpec.isEmpty() && _collectionOptions.autoIndexId == CollectionOptions::NO) { + LOGV2_WARNING(4884504, + "Found the _id index spec but the collection specified autoIndexId of false", + "namespace"_attr = this->_sourceNss, + "tenantId"_attr = _tenantId); + } + return kContinueNormally; +} + +BaseCloner::AfterStageBehavior TenantCollectionCloner::createCollectionStage() { + auto opCtx = cc().makeOperationContext(); + + auto status = + getStorageInterface()->createCollection(opCtx.get(), _sourceNss, _collectionOptions); + if (status == ErrorCodes::NamespaceExists) { + uassert(4884501, + "Collection exists but does not belong to tenant", + ClonerUtils::isNamespaceForTenant(_sourceNss, _tenantId)); + } else { + uassertStatusOKWithContext(status, "Tenant collection cloner: create collection"); + } + + // This will start building the indexes whose specs we saved last stage. + status = getStorageInterface()->createIndexesOnEmptyCollection( + opCtx.get(), _sourceNss, _readyIndexSpecs); + + uassertStatusOKWithContext(status, "Tenant collection cloner: create indexes"); + return kContinueNormally; } +BaseCloner::AfterStageBehavior TenantCollectionCloner::queryStage() { + ON_BLOCK_EXIT([this] { this->unsetMetadataReader(); }); + setMetadataReader(); + runQuery(); + waitForDatabaseWorkToComplete(); + return kContinueNormally; +} + +void TenantCollectionCloner::runQuery() { + auto query = QUERY("query" << BSONObj()); + query.hint(BSON("_id" << 1)); + + getClient()->query([this](DBClientCursorBatchIterator& iter) { handleNextBatch(iter); }, + _sourceDbAndUuid, + query, + nullptr /* fieldsToReturn */, + QueryOption_NoCursorTimeout | QueryOption_SlaveOk | + (collectionClonerUsesExhaust ? QueryOption_Exhaust : 0), + _collectionClonerBatchSize); + _dbWorkTaskRunner.join(); +} + +void TenantCollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) { + { + stdx::lock_guard<Latch> lk(_mutex); + _stats.receivedBatches++; + while (iter.moreInCurrentBatch()) { + _documentsToInsert.emplace_back(InsertStatement(iter.nextSafe())); + } + } + + // Schedule the next document batch insertion. + auto&& scheduleResult = _scheduleDbWorkFn( + [=](const executor::TaskExecutor::CallbackArgs& cbd) { insertDocumentsCallback(cbd); }); + + if (!scheduleResult.isOK()) { + Status newStatus = scheduleResult.getStatus().withContext( + str::stream() << "Error cloning collection '" << _sourceNss.ns() << "'"); + // We must throw an exception to terminate query. + uassertStatusOK(newStatus); + } + + tenantMigrationHangCollectionClonerAfterHandlingBatchResponse.executeIf( + [&](const BSONObj&) { + while ( + MONGO_unlikely( + tenantMigrationHangCollectionClonerAfterHandlingBatchResponse.shouldFail()) && + !mustExit()) { + LOGV2(4884506, + "tenantMigrationHangCollectionClonerAfterHandlingBatchResponse fail point " + "enabled. Blocking until fail point is disabled", + "namespace"_attr = _sourceNss.toString(), + "tenantId"_attr = _tenantId); + mongo::sleepsecs(1); + } + }, + [&](const BSONObj& data) { + // Only hang when cloning the specified collection, or if no collection was specified. + auto nss = data["nss"].str(); + return nss.empty() || nss == _sourceNss.toString(); + }); +} + + +void TenantCollectionCloner::insertDocumentsCallback( + const executor::TaskExecutor::CallbackArgs& cbd) { + uassertStatusOK(cbd.status); + std::vector<InsertStatement> docs; + + { + stdx::lock_guard<Latch> lk(_mutex); + if (_documentsToInsert.size() == 0) { + LOGV2_WARNING(4884507, + "insertDocumentsCallback, but no documents to insert", + "namespace"_attr = _sourceNss, + "tenantId"_attr = _tenantId); + return; + } + _documentsToInsert.swap(docs); + _stats.documentsCopied += docs.size(); + ++_stats.insertedBatches; + _progressMeter.hit(int(docs.size())); + } + + uassertStatusOK(getStorageInterface()->insertDocuments(cbd.opCtx, _sourceDbAndUuid, docs)); + + tenantMigrationHangDuringCollectionClone.executeIf( + [&](const BSONObj&) { + LOGV2(4884508, + "initial sync - tenantMigrationHangDuringCollectionClone fail point " + "enabled. Blocking until fail point is disabled", + "namespace"_attr = _sourceNss.ns(), + "tenantId"_attr = _tenantId); + while (MONGO_unlikely(tenantMigrationHangDuringCollectionClone.shouldFail()) && + !mustExit()) { + mongo::sleepsecs(1); + } + }, + [&](const BSONObj& data) { + return data["namespace"].String() == _sourceNss.ns() && + static_cast<int>(_stats.documentsCopied) >= data["numDocsToClone"].numberInt(); + }); +} + +void TenantCollectionCloner::waitForDatabaseWorkToComplete() { + _dbWorkTaskRunner.join(); +} + +void TenantCollectionCloner::setMetadataReader() { + getClient()->setReplyMetadataReader( + [this](OperationContext* opCtx, const BSONObj& metadataObj, StringData source) { + auto readResult = rpc::ReplSetMetadata::readFromMetadata(metadataObj); + if (!readResult.isOK()) { + return readResult.getStatus().withContext( + "tenant collection cloner failed to read repl set metadata"); + } + this->setLastVisibleOpTime(readResult.getValue().getLastOpVisible()); + return Status::OK(); + }); +} + +void TenantCollectionCloner::unsetMetadataReader() { + getClient()->setReplyMetadataReader([this](OperationContext* opCtx, + const BSONObj& metadataObj, + StringData source) { return Status::OK(); }); +} + +bool TenantCollectionCloner::isMyFailPoint(const BSONObj& data) const { + auto nss = data["nss"].str(); + return (nss.empty() || nss == _sourceNss.toString()) && BaseCloner::isMyFailPoint(data); +} + TenantCollectionCloner::Stats TenantCollectionCloner::getStats() const { stdx::lock_guard<Latch> lk(_mutex); return _stats; } +std::string TenantCollectionCloner::Stats::toString() const { + return toBSON().toString(); +} + +BSONObj TenantCollectionCloner::Stats::toBSON() const { + BSONObjBuilder bob; + bob.append("ns", ns); + append(&bob); + return bob.obj(); +} + void TenantCollectionCloner::Stats::append(BSONObjBuilder* builder) const { builder->appendNumber(kDocumentsToCopyFieldName, documentToCopy); builder->appendNumber(kDocumentsCopiedFieldName, documentsCopied); @@ -89,5 +412,10 @@ void TenantCollectionCloner::Stats::append(BSONObjBuilder* builder) const { builder->appendNumber("receivedBatches", receivedBatches); } +Timestamp TenantCollectionCloner::getOperationTime_forTest() { + return _operationTime; +} + + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/tenant_collection_cloner.h b/src/mongo/db/repl/tenant_collection_cloner.h index ee8c9bf2ced..2dd94cb2191 100644 --- a/src/mongo/db/repl/tenant_collection_cloner.h +++ b/src/mongo/db/repl/tenant_collection_cloner.h @@ -59,31 +59,152 @@ public: void append(BSONObjBuilder* builder) const; }; + /** + * Type of function to schedule storage interface tasks with the executor. + * + * Used for testing only. + */ + using ScheduleDbWorkFn = unique_function<StatusWith<executor::TaskExecutor::CallbackHandle>( + executor::TaskExecutor::CallbackFn)>; + TenantCollectionCloner(const NamespaceString& ns, const CollectionOptions& collectionOptions, InitialSyncSharedData* sharedData, const HostAndPort& source, DBClientConnection* client, StorageInterface* storageInterface, - ThreadPool* dbPool); + ThreadPool* dbPool, + StringData tenantId); virtual ~TenantCollectionCloner() = default; Stats getStats() const; + std::string toString() const; + + NamespaceString getSourceNss() const { + return _sourceNss; + } + UUID getSourceUuid() const { + return *_sourceDbAndUuid.uuid(); + } + + /** + * Set the cloner batch size. + * + * Used for testing only. Set by server parameter 'collectionClonerBatchSize' in normal + * operation. + */ + void setBatchSize_forTest(int batchSize) { + _collectionClonerBatchSize = batchSize; + } + + /** + * Overrides how executor schedules database work. + * + * For testing only. + */ + void setScheduleDbWorkFn_forTest(ScheduleDbWorkFn scheduleDbWorkFn) { + _scheduleDbWorkFn = std::move(scheduleDbWorkFn); + } + + Timestamp getOperationTime_forTest(); + protected: ClonerStages getStages() final; + bool isMyFailPoint(const BSONObj& data) const final; + private: + friend class TenantCollectionClonerTest; + friend class TenantCollectionClonerStage; + + class TenantCollectionClonerStage : public ClonerStage<TenantCollectionCloner> { + public: + TenantCollectionClonerStage(std::string name, + TenantCollectionCloner* cloner, + ClonerRunFn stageFunc) + : ClonerStage<TenantCollectionCloner>(name, cloner, stageFunc) {} + AfterStageBehavior run() override; + + bool isTransientError(const Status& status) override { + // Always abort on error. + return false; + } + }; + std::string describeForFuzzer(BaseClonerStage* stage) const final { return _sourceNss.db() + " db: { " + stage->getName() + ": UUID(\"" + _sourceDbAndUuid.uuid()->toString() + "\") coll: " + _sourceNss.coll() + " }"; } /** - * Temporary no-op stage. + * The preStage sets the start time in _stats. */ - AfterStageBehavior placeholderStage(); + void preStage() final; + + /** + * The postStage sets the end time in _stats. + */ + void postStage() final; + + /** + * Stage function that counts the number of documents in the collection on the source in order + * to generate progress information. + */ + AfterStageBehavior countStage(); + + /** + * Stage function that gets the index information of the collection on the source to re-create + * it. + */ + AfterStageBehavior listIndexesStage(); + + /** + * Stage function that creates the collection using the storageInterface. This stage does not + * actually contact the sync source. + */ + AfterStageBehavior createCollectionStage(); + + /** + * Stage function that executes a query to retrieve all documents in the collection. For each + * batch returned by the upstream node, handleNextBatch will be called with the data. This + * stage will finish when the entire query is finished or failed. + */ + AfterStageBehavior queryStage(); + + /** + * Put all results from a query batch into a buffer to be inserted, and schedule + * it to be inserted. + */ + void handleNextBatch(DBClientCursorBatchIterator& iter); + + /** + * Called whenever there is a new batch of documents ready from the DBClientConnection. + */ + void insertDocumentsCallback(const executor::TaskExecutor::CallbackArgs& cbd); + + /** + * Sends a query command to the source. + */ + void runQuery(); + + /** + * Waits for any database work to finish or fail. + */ + void waitForDatabaseWorkToComplete(); + + /** + * Sets up tracking the lastVisibleOpTime from response metadata. + */ + void setMetadataReader(); + void unsetMetadataReader(); + void setLastVisibleOpTime(OpTime opTime) { + _lastVisibleOpTime = opTime; + } + OpTime getLastVisibleOpTime() { + return _lastVisibleOpTime; + } // All member variables are labeled with one of the following codes indicating the // synchronization rules for accessing them. @@ -96,10 +217,36 @@ private: const CollectionOptions _collectionOptions; // (R) // Despite the type name, this member must always contain a UUID. NamespaceStringOrUUID _sourceDbAndUuid; // (R) - - ClonerStage<TenantCollectionCloner> _placeholderStage; // (R) - - Stats _stats; // (M) + // The size of the batches of documents returned in collection cloning. + int _collectionClonerBatchSize; // (R) + + TenantCollectionClonerStage _countStage; // (R) + TenantCollectionClonerStage _listIndexesStage; // (R) + TenantCollectionClonerStage _createCollectionStage; // (R) + TenantCollectionClonerStage _queryStage; // (R) + + ProgressMeter _progressMeter; // (X) progress meter for this instance. + std::vector<BSONObj> _readyIndexSpecs; // (X) Except for _id_ + BSONObj _idIndexSpec; // (X) + // Function for scheduling database work using the executor. + ScheduleDbWorkFn _scheduleDbWorkFn; // (R) + // Documents read from source to insert. + std::vector<InsertStatement> _documentsToInsert; // (M) + Stats _stats; // (M) + // We put _dbWorkTaskRunner after anything the database threads depend on to ensure it is + // only destroyed after those threads exit. + TaskRunner _dbWorkTaskRunner; // (R) + + // TODO(SERVER-49780): Move this into TenantMigrationSharedData. + OpTime _lastVisibleOpTime; // (X) + + // The database name prefix of the tenant associated with this migration. + // TODO(SERVER-49780): Consider moving this into TenantMigrationSharedData. + std::string _tenantId; // (R) + + // The operationTime returned with the listIndexes result. + // TODO(SERVER-49780): Consider moving this into TenantMigrationSharedData. + Timestamp _operationTime; // (X) }; } // namespace repl diff --git a/src/mongo/db/repl/tenant_collection_cloner_test.cpp b/src/mongo/db/repl/tenant_collection_cloner_test.cpp new file mode 100644 index 00000000000..d365f098178 --- /dev/null +++ b/src/mongo/db/repl/tenant_collection_cloner_test.cpp @@ -0,0 +1,574 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <vector> + +#include "mongo/bson/bsonmisc.h" +#include "mongo/db/repl/cloner_test_fixture.h" +#include "mongo/db/repl/storage_interface.h" +#include "mongo/db/repl/storage_interface_mock.h" +#include "mongo/db/repl/tenant_collection_cloner.h" +#include "mongo/db/service_context_test_fixture.h" +#include "mongo/dbtests/mock/mock_dbclient_connection.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/concurrency/thread_pool.h" + +namespace mongo { +namespace repl { + +class MockCallbackState final : public mongo::executor::TaskExecutor::CallbackState { +public: + MockCallbackState() = default; + void cancel() override {} + void waitForCompletion() override {} + bool isCanceled() const override { + return false; + } +}; + +class TenantCollectionClonerTest : public ClonerTestFixture { +public: + TenantCollectionClonerTest() {} + +protected: + void setUp() override { + ClonerTestFixture::setUp(); + _standardCreateCollectionFn = [this](OperationContext* opCtx, + const NamespaceString& nss, + const CollectionOptions& options) -> Status { + this->_collCreated = true; + return Status::OK(); + }; + _storageInterface.createCollFn = _standardCreateCollectionFn; + _standardCreateIndexesOnEmptyCollectionFn = + [this](OperationContext* opCtx, + const NamespaceString& nss, + const std::vector<BSONObj>& secondaryIndexSpecs) -> Status { + this->_numSecondaryIndexesCreated += secondaryIndexSpecs.size(); + return Status::OK(); + }; + _storageInterface.createIndexesOnEmptyCollFn = _standardCreateIndexesOnEmptyCollectionFn; + _storageInterface.insertDocumentsFn = [this](OperationContext* opCtx, + const NamespaceStringOrUUID& nsOrUUID, + const std::vector<InsertStatement>& ops) { + this->_numDocsInserted += ops.size(); + return Status::OK(); + }; + + _mockServer->assignCollectionUuid(_nss.ns(), _collUuid); + _mockServer->setCommandReply("replSetGetRBID", + BSON("ok" << 1 << "rbid" << _sharedData->getRollBackId())); + _mockClient->setOperationTime(_operationTime); + } + std::unique_ptr<TenantCollectionCloner> makeCollectionCloner( + CollectionOptions options = CollectionOptions()) { + options.uuid = _collUuid; + _options = options; + return std::make_unique<TenantCollectionCloner>(_nss, + options, + _sharedData.get(), + _source, + _mockClient.get(), + &_storageInterface, + _dbWorkThreadPool.get(), + _tenantId); + } + + BSONObj createFindResponse(ErrorCodes::Error code = ErrorCodes::OK) { + BSONObjBuilder bob; + if (code != ErrorCodes::OK) { + bob.append("ok", 0); + bob.append("code", code); + } else { + bob.append("ok", 1); + } + return bob.obj(); + } + + ProgressMeter& getProgressMeter(TenantCollectionCloner* cloner) { + return cloner->_progressMeter; + } + + std::vector<BSONObj> getIndexSpecs(TenantCollectionCloner* cloner) { + return cloner->_readyIndexSpecs; + } + + BSONObj& getIdIndexSpec(TenantCollectionCloner* cloner) { + return cloner->_idIndexSpec; + } + + StorageInterfaceMock::CreateCollectionFn _standardCreateCollectionFn; + StorageInterfaceMock::CreateIndexesOnEmptyCollectionFn + _standardCreateIndexesOnEmptyCollectionFn; + bool _collCreated = false; + size_t _numSecondaryIndexesCreated{0}; + size_t _numDocsInserted{0}; + CollectionOptions _options; + + UUID _collUuid = UUID::gen(); + BSONObj _idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" + << "_id_"); + + std::vector<BSONObj> _secondaryIndexSpecs{BSON("v" << 1 << "key" << BSON("a" << 1) << "name" + << "a_1"), + BSON("v" << 1 << "key" << BSON("b" << 1) << "name" + << "b_1")}; + static std::string _tenantId; + static NamespaceString _nss; + static Timestamp _operationTime; +}; + +/* static */ +std::string TenantCollectionClonerTest::_tenantId = "tenant42"; +NamespaceString TenantCollectionClonerTest::_nss = {_tenantId + "_testDb", "testcoll"}; +Timestamp TenantCollectionClonerTest::_operationTime = Timestamp(12345, 42); + + +TEST_F(TenantCollectionClonerTest, CountStage) { + auto cloner = makeCollectionCloner(); + cloner->setStopAfterStage_forTest("count"); + _mockServer->setCommandReply("count", createCountResponse(100)); + ASSERT_OK(cloner->run()); + ASSERT_EQ(100, getProgressMeter(cloner.get()).total()); +} + +// On a negative count, the CollectionCloner should use a zero count. +TEST_F(TenantCollectionClonerTest, CountStageNegativeCount) { + auto cloner = makeCollectionCloner(); + cloner->setStopAfterStage_forTest("count"); + _mockServer->setCommandReply("count", createCountResponse(-100)); + ASSERT_OK(cloner->run()); + ASSERT_EQ(0, getProgressMeter(cloner.get()).total()); +} + +TEST_F(TenantCollectionClonerTest, CollectionClonerPassesThroughNonRetriableErrorFromCountCommand) { + auto cloner = makeCollectionCloner(); + _mockServer->setCommandReply("count", Status(ErrorCodes::OperationFailed, "")); + ASSERT_EQUALS(ErrorCodes::OperationFailed, cloner->run()); +} + +TEST_F(TenantCollectionClonerTest, + CollectionClonerReturnsNoSuchKeyOnMissingDocumentCountFieldName) { + auto cloner = makeCollectionCloner(); + cloner->setStopAfterStage_forTest("count"); + _mockServer->setCommandReply("count", BSON("ok" << 1)); + auto status = cloner->run(); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, status); +} + +TEST_F(TenantCollectionClonerTest, ListIndexesReturnedNoIndexes) { + auto cloner = makeCollectionCloner(); + cloner->setStopAfterStage_forTest("listIndexes"); + _mockServer->setCommandReply("count", createCountResponse(1)); + _mockServer->setCommandReply("listIndexes", createCursorResponse(_nss.ns(), BSONArray())); + _mockServer->setCommandReply("find", createFindResponse()); + ASSERT_OK(cloner->run()); + ASSERT(getIdIndexSpec(cloner.get()).isEmpty()); + ASSERT(getIndexSpecs(cloner.get()).empty()); + ASSERT_EQ(0, cloner->getStats().indexes); +} + +TEST_F(TenantCollectionClonerTest, ListIndexesHasResults) { + auto cloner = makeCollectionCloner(); + cloner->setStopAfterStage_forTest("listIndexes"); + _mockServer->setCommandReply("count", createCountResponse(1)); + _mockServer->setCommandReply( + "listIndexes", + createCursorResponse( + _nss.ns(), + BSON_ARRAY(_secondaryIndexSpecs[0] << _idIndexSpec << _secondaryIndexSpecs[1]))); + _mockServer->setCommandReply("find", createFindResponse()); + ASSERT_OK(cloner->run()); + ASSERT_BSONOBJ_EQ(_idIndexSpec, getIdIndexSpec(cloner.get())); + ASSERT_EQ(2, getIndexSpecs(cloner.get()).size()); + ASSERT_BSONOBJ_EQ(_secondaryIndexSpecs[0], getIndexSpecs(cloner.get())[0]); + ASSERT_BSONOBJ_EQ(_secondaryIndexSpecs[1], getIndexSpecs(cloner.get())[1]); + ASSERT_EQ(3, cloner->getStats().indexes); +} + +TEST_F(TenantCollectionClonerTest, ListIndexesNonRetriableError) { + auto cloner = makeCollectionCloner(); + _mockServer->setCommandReply("count", createCountResponse(1)); + _mockServer->setCommandReply("listIndexes", Status(ErrorCodes::OperationFailed, "")); + ASSERT_EQUALS(ErrorCodes::OperationFailed, cloner->run()); +} + +TEST_F(TenantCollectionClonerTest, ListIndexesRemoteUnreachableBeforeMajorityFind) { + auto cloner = makeCollectionCloner(); + _mockServer->setCommandReply("count", createCountResponse(1)); + _mockServer->setCommandReply("listIndexes", createCursorResponse(_nss.ns(), BSONArray())); + + auto clonerOperationTimeFP = + globalFailPointRegistry().find("tenantCollectionClonerHangAfterGettingOperationTime"); + auto timesEntered = clonerOperationTimeFP->setMode(FailPoint::alwaysOn, 0); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + ASSERT_NOT_OK(cloner->run()); + }); + // Wait for the failpoint to be reached + clonerOperationTimeFP->waitForTimesEntered(timesEntered + 1); + _mockServer->shutdown(); + + // Finish test + clonerOperationTimeFP->setMode(FailPoint::off, 0); + clonerThread.join(); +} + +TEST_F(TenantCollectionClonerTest, ListIndexesRecordsCorrectOperationTime) { + auto cloner = makeCollectionCloner(); + _mockServer->setCommandReply("count", createCountResponse(1)); + _mockServer->setCommandReply("listIndexes", createCursorResponse(_nss.ns(), BSONArray())); + _mockServer->setCommandReply("find", createFindResponse()); + + auto clonerOperationTimeFP = + globalFailPointRegistry().find("tenantCollectionClonerHangAfterGettingOperationTime"); + auto timesEntered = clonerOperationTimeFP->setMode(FailPoint::alwaysOn, 0); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + ASSERT_OK(cloner->run()); + }); + // Wait for the failpoint to be reached + clonerOperationTimeFP->waitForTimesEntered(timesEntered + 1); + ASSERT_EQUALS(_operationTime, cloner->getOperationTime_forTest()); + + // Finish test + clonerOperationTimeFP->setMode(FailPoint::off, 0); + clonerThread.join(); +} + +TEST_F(TenantCollectionClonerTest, BeginCollection) { + NamespaceString collNss; + CollectionOptions collOptions; + BSONObj collIdIndexSpec; + std::vector<BSONObj> collSecondaryIndexSpecs; + + _storageInterface.createCollFn = + [&](OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options) { + collNss = nss; + collOptions = options; + return _standardCreateCollectionFn(opCtx, nss, options); + }; + + _storageInterface.createIndexesOnEmptyCollFn = + [&](OperationContext* opCtx, + const NamespaceString& nss, + const std::vector<BSONObj>& secondaryIndexSpecs) { + collSecondaryIndexSpecs = secondaryIndexSpecs; + return _standardCreateIndexesOnEmptyCollectionFn(opCtx, nss, secondaryIndexSpecs); + }; + + auto cloner = makeCollectionCloner(); + cloner->setStopAfterStage_forTest("createCollection"); + _mockServer->setCommandReply("count", createCountResponse(1)); + BSONArrayBuilder indexSpecs; + indexSpecs.append(_idIndexSpec); + for (const auto& secondaryIndexSpec : _secondaryIndexSpecs) { + indexSpecs.append(secondaryIndexSpec); + } + _mockServer->setCommandReply("listIndexes", createCursorResponse(_nss.ns(), indexSpecs.arr())); + _mockServer->setCommandReply("find", createFindResponse()); + + ASSERT_EQUALS(Status::OK(), cloner->run()); + + ASSERT_EQUALS(_nss.ns(), collNss.ns()); + ASSERT_BSONOBJ_EQ(_options.toBSON(), collOptions.toBSON()); + ASSERT_EQUALS(_secondaryIndexSpecs.size(), collSecondaryIndexSpecs.size()); + for (std::vector<BSONObj>::size_type i = 0; i < _secondaryIndexSpecs.size(); ++i) { + ASSERT_BSONOBJ_EQ(_secondaryIndexSpecs[i], collSecondaryIndexSpecs[i]); + } +} + +TEST_F(TenantCollectionClonerTest, BeginCollectionFailed) { + _storageInterface.createCollFn = + [&](OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options) { + return Status(ErrorCodes::OperationFailed, ""); + }; + + auto cloner = makeCollectionCloner(); + cloner->setStopAfterStage_forTest("createCollection"); + _mockServer->setCommandReply("count", createCountResponse(1)); + _mockServer->setCommandReply("listIndexes", createCursorResponse(_nss.ns(), BSONArray())); + _mockServer->setCommandReply("find", createFindResponse()); + ASSERT_EQUALS(ErrorCodes::OperationFailed, cloner->run()); +} + +TEST_F(TenantCollectionClonerTest, InsertDocumentsSingleBatch) { + // Set up data for preliminary stages + _mockServer->setCommandReply("count", createCountResponse(2)); + _mockServer->setCommandReply("listIndexes", + createCursorResponse(_nss.ns(), BSON_ARRAY(_idIndexSpec))); + _mockServer->setCommandReply("find", createFindResponse()); + + // Set up documents to be returned from upstream node. + _mockServer->insert(_nss.ns(), BSON("_id" << 1)); + _mockServer->insert(_nss.ns(), BSON("_id" << 2)); + + auto cloner = makeCollectionCloner(); + ASSERT_OK(cloner->run()); + + ASSERT_EQUALS(2, _numDocsInserted); + + auto stats = cloner->getStats(); + ASSERT_EQUALS(1u, stats.receivedBatches); +} + +TEST_F(TenantCollectionClonerTest, InsertDocumentsMultipleBatches) { + // Set up data for preliminary stages + _mockServer->setCommandReply("count", createCountResponse(5)); + _mockServer->setCommandReply("listIndexes", + createCursorResponse(_nss.ns(), BSON_ARRAY(_idIndexSpec))); + _mockServer->setCommandReply("find", createFindResponse()); + + // Set up documents to be returned from upstream node. + _mockServer->insert(_nss.ns(), BSON("_id" << 1)); + _mockServer->insert(_nss.ns(), BSON("_id" << 2)); + _mockServer->insert(_nss.ns(), BSON("_id" << 3)); + _mockServer->insert(_nss.ns(), BSON("_id" << 4)); + _mockServer->insert(_nss.ns(), BSON("_id" << 5)); + + auto cloner = makeCollectionCloner(); + cloner->setBatchSize_forTest(2); + ASSERT_OK(cloner->run()); + + ASSERT_EQUALS(5, _numDocsInserted); + + auto stats = cloner->getStats(); + ASSERT_EQUALS(3u, stats.receivedBatches); +} + +TEST_F(TenantCollectionClonerTest, InsertDocumentsScheduleDBWorkFailed) { + // Set up data for preliminary stages + _mockServer->setCommandReply("count", createCountResponse(3)); + _mockServer->setCommandReply("listIndexes", + createCursorResponse(_nss.ns(), BSON_ARRAY(_idIndexSpec))); + _mockServer->setCommandReply("find", createFindResponse()); + + // Set up documents to be returned from upstream node. + _mockServer->insert(_nss.ns(), BSON("_id" << 1)); + _mockServer->insert(_nss.ns(), BSON("_id" << 2)); + _mockServer->insert(_nss.ns(), BSON("_id" << 3)); + + auto cloner = makeCollectionCloner(); + // Stop before running the query to set up the failure. + auto collClonerBeforeFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); + auto timesEntered = collClonerBeforeFailPoint->setMode( + FailPoint::alwaysOn, + 0, + fromjson("{cloner: 'TenantCollectionCloner', stage: 'query', nss: '" + _nss.ns() + "'}")); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + ASSERT_EQUALS(ErrorCodes::UnknownError, cloner->run()); + }); + // Wait for the failpoint to be reached + collClonerBeforeFailPoint->waitForTimesEntered(timesEntered + 1); + // Replace scheduleDbWork function so that cloner will fail to schedule DB work after + // getting documents. + cloner->setScheduleDbWorkFn_forTest([](const executor::TaskExecutor::CallbackFn& workFn) { + return StatusWith<executor::TaskExecutor::CallbackHandle>(ErrorCodes::UnknownError, ""); + }); + + // Continue and finish. Final status is checked in the thread. + collClonerBeforeFailPoint->setMode(FailPoint::off, 0); + clonerThread.join(); +} + +TEST_F(TenantCollectionClonerTest, InsertDocumentsCallbackCanceled) { + // Set up data for preliminary stages + _mockServer->setCommandReply("count", createCountResponse(3)); + _mockServer->setCommandReply("listIndexes", + createCursorResponse(_nss.ns(), BSON_ARRAY(_idIndexSpec))); + _mockServer->setCommandReply("find", createFindResponse()); + + // Set up documents to be returned from upstream node. + _mockServer->insert(_nss.ns(), BSON("_id" << 1)); + _mockServer->insert(_nss.ns(), BSON("_id" << 2)); + _mockServer->insert(_nss.ns(), BSON("_id" << 3)); + + auto cloner = makeCollectionCloner(); + // Stop before running the query to set up the failure. + auto collClonerBeforeFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); + auto timesEntered = collClonerBeforeFailPoint->setMode( + FailPoint::alwaysOn, + 0, + fromjson("{cloner: 'TenantCollectionCloner', stage: 'query', nss: '" + _nss.ns() + "'}")); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + ASSERT_EQUALS(ErrorCodes::CallbackCanceled, cloner->run()); + }); + // Wait for the failpoint to be reached + collClonerBeforeFailPoint->waitForTimesEntered(timesEntered + 1); + // Replace scheduleDbWork function so that cloner will fail to schedule DB work after + // getting documents. + cloner->setScheduleDbWorkFn_forTest([&](const executor::TaskExecutor::CallbackFn& workFn) { + executor::TaskExecutor::CallbackHandle handle(std::make_shared<MockCallbackState>()); + mongo::executor::TaskExecutor::CallbackArgs args{ + nullptr, + handle, + {ErrorCodes::CallbackCanceled, "Never run, but treat like cancelled."}}; + workFn(args); + return StatusWith<executor::TaskExecutor::CallbackHandle>(handle); + }); + + // Continue and finish. Final status is checked in the thread. + collClonerBeforeFailPoint->setMode(FailPoint::off, 0); + clonerThread.join(); +} + +TEST_F(TenantCollectionClonerTest, InsertDocumentsFailed) { + // Set up data for preliminary stages + _mockServer->setCommandReply("count", createCountResponse(3)); + _mockServer->setCommandReply("listIndexes", + createCursorResponse(_nss.ns(), BSON_ARRAY(_idIndexSpec))); + _mockServer->setCommandReply("find", createFindResponse()); + + // Set up documents to be returned from upstream node. + _mockServer->insert(_nss.ns(), BSON("_id" << 1)); + _mockServer->insert(_nss.ns(), BSON("_id" << 2)); + _mockServer->insert(_nss.ns(), BSON("_id" << 3)); + + auto cloner = makeCollectionCloner(); + // Stop before running the query to set up the failure. + auto collClonerBeforeFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); + auto timesEntered = collClonerBeforeFailPoint->setMode( + FailPoint::alwaysOn, + 0, + fromjson("{cloner: 'TenantCollectionCloner', stage: 'query', nss: '" + _nss.ns() + "'}")); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + ASSERT_EQUALS(ErrorCodes::OperationFailed, cloner->run()); + }); + + // Wait for the failpoint to be reached + collClonerBeforeFailPoint->waitForTimesEntered(timesEntered + 1); + + // Make the insertDocuments fail. + _storageInterface.insertDocumentsFn = [this](OperationContext* opCtx, + const NamespaceStringOrUUID& nsOrUUID, + const std::vector<InsertStatement>& ops) { + return Status(ErrorCodes::OperationFailed, ""); + }; + + + // Continue and finish. Final status is checked in the thread. + collClonerBeforeFailPoint->setMode(FailPoint::off, 0); + clonerThread.join(); +} + +TEST_F(TenantCollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) { + NamespaceString collNss; + CollectionOptions collOptions; + // We initialize collIndexSpecs with fake information to ensure it is overwritten by an empty + // vector. + std::vector<BSONObj> collIndexSpecs{BSON("fakeindexkeys" << 1)}; + _storageInterface.createCollFn = [&, this](OperationContext* opCtx, + const NamespaceString& nss, + const CollectionOptions& options) -> Status { + collNss = nss; + collOptions = options; + return _standardCreateCollectionFn(opCtx, nss, options); + }; + + _storageInterface.createIndexesOnEmptyCollFn = + [&](OperationContext* opCtx, + const NamespaceString& nss, + const std::vector<BSONObj>& secondaryIndexSpecs) { + collIndexSpecs = secondaryIndexSpecs; + return _standardCreateIndexesOnEmptyCollectionFn(opCtx, nss, secondaryIndexSpecs); + }; + + const BSONObj doc = BSON("_id" << 1); + _mockServer->insert(_nss.ns(), doc); + + _mockServer->setCommandReply("count", createCountResponse(1)); + _mockServer->setCommandReply("listIndexes", createCursorResponse(_nss.ns(), BSONArray())); + _mockServer->setCommandReply("find", createFindResponse()); + + CollectionOptions options; + options.autoIndexId = CollectionOptions::NO; + auto cloner = makeCollectionCloner(options); + ASSERT_OK(cloner->run()); + ASSERT_EQUALS(1, _numDocsInserted); + ASSERT_TRUE(_collCreated); + ASSERT_EQ(collOptions.autoIndexId, CollectionOptions::NO); + ASSERT_EQ(0UL, collIndexSpecs.size()); + ASSERT_EQ(collNss, _nss); +} + +TEST_F(TenantCollectionClonerTest, QueryFailure) { + // Set up data for preliminary stages + auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" + << "_id_"); + _mockServer->setCommandReply("count", createCountResponse(3)); + _mockServer->setCommandReply("listIndexes", + createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec))); + _mockServer->setCommandReply("find", createFindResponse()); + + auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); + auto timesEnteredBeforeStage = beforeStageFailPoint->setMode( + FailPoint::alwaysOn, 0, fromjson("{cloner: 'TenantCollectionCloner', stage: 'query'}")); + + // Set up documents to be returned from upstream node. + _mockServer->insert(_nss.ns(), BSON("_id" << 1)); + _mockServer->insert(_nss.ns(), BSON("_id" << 2)); + _mockServer->insert(_nss.ns(), BSON("_id" << 3)); + + auto cloner = makeCollectionCloner(); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + ASSERT_NOT_OK(cloner->run()); + }); + + // Wait until we get to the query stage. + beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1); + + // Bring the server down. + _mockServer->shutdown(); + + // Let us begin with the query stage. + beforeStageFailPoint->setMode(FailPoint::off, 0); + + clonerThread.join(); +} + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/tenant_database_cloner.cpp b/src/mongo/db/repl/tenant_database_cloner.cpp index 8baee18daca..462b6b042da 100644 --- a/src/mongo/db/repl/tenant_database_cloner.cpp +++ b/src/mongo/db/repl/tenant_database_cloner.cpp @@ -53,10 +53,12 @@ TenantDatabaseCloner::TenantDatabaseCloner(const std::string& dbName, const HostAndPort& source, DBClientConnection* client, StorageInterface* storageInterface, - ThreadPool* dbPool) + ThreadPool* dbPool, + StringData tenantId) : BaseCloner("TenantDatabaseCloner"_sd, sharedData, source, client, storageInterface, dbPool), _dbName(dbName), - _listCollectionsStage("listCollections", this, &TenantDatabaseCloner::listCollectionsStage) { + _listCollectionsStage("listCollections", this, &TenantDatabaseCloner::listCollectionsStage), + _tenantId(tenantId) { invariant(!dbName.empty()); _stats.dbname = dbName; } @@ -72,15 +74,14 @@ void TenantDatabaseCloner::preStage() { BaseCloner::AfterStageBehavior TenantDatabaseCloner::listCollectionsStage() { // This will be set after a successful listCollections command. - _operationTime = Timestamp(0, 0); + _operationTime = Timestamp(); - BSONObj res; auto collectionInfos = getClient()->getCollectionInfos(_dbName, ListCollectionsFilter::makeTypeCollectionFilter()); - // Do a speculative majority read on the sync source to make sure the collections listed - // exist on a majority of nodes in the set. We do not check the rollbackId - rollback - // would lead to the sync source closing connections so the stage would fail. + // Do a majority read on the sync source to make sure the collections listed exist on a majority + // of nodes in the set. We do not check the rollbackId - rollback would lead to the sync source + // closing connections so the stage would fail. _operationTime = getClient()->getOperationTime(); tenantDatabaseClonerHangAfterGettingOperationTime.executeIf( @@ -90,7 +91,8 @@ BaseCloner::AfterStageBehavior TenantDatabaseCloner::listCollectionsStage() { LOGV2(4881605, "tenantDatabaseClonerHangAfterGettingOperationTime fail point " "enabled. Blocking until fail point is disabled", - "dbName"_attr = _dbName); + "dbName"_attr = _dbName, + "tenantId"_attr = _tenantId); mongo::sleepsecs(1); } }, @@ -126,10 +128,16 @@ BaseCloner::AfterStageBehavior TenantDatabaseCloner::listCollectionsStage() { LOGV2_DEBUG(4881602, 1, "Database cloner skipping 'system' collection", - "namespace"_attr = collectionNamespace.ns()); + "namespace"_attr = collectionNamespace.ns(), + "tenantId"_attr = _tenantId); continue; } - LOGV2_DEBUG(4881603, 2, "Allowing cloning of collectionInfo", "info"_attr = info); + LOGV2_DEBUG(4881603, + 2, + "Allowing cloning of collectionInfo", + "info"_attr = info, + "db"_attr = _dbName, + "tenantId"_attr = _tenantId); bool isDuplicate = seen.insert(result.getName().toString()).second; uassert(4881604, @@ -173,17 +181,22 @@ void TenantDatabaseCloner::postStage() { getSource(), getClient(), getStorageInterface(), - getDBPool()); + getDBPool(), + _tenantId); } auto collStatus = _currentCollectionCloner->run(); if (collStatus.isOK()) { - LOGV2_DEBUG( - 4881600, 1, "Tenant collection clone finished", "namespace"_attr = sourceNss); + LOGV2_DEBUG(4881600, + 1, + "Tenant collection clone finished", + "namespace"_attr = sourceNss, + "tenantId"_attr = _tenantId); } else { LOGV2_ERROR(4881601, "Tenant collection clone failed", "namespace"_attr = sourceNss, - "error"_attr = collStatus.toString()); + "error"_attr = collStatus.toString(), + "tenantId"_attr = _tenantId); setInitialSyncFailedStatus( {collStatus.code(), collStatus diff --git a/src/mongo/db/repl/tenant_database_cloner.h b/src/mongo/db/repl/tenant_database_cloner.h index 1684231ab0d..e0bf8f98ca9 100644 --- a/src/mongo/db/repl/tenant_database_cloner.h +++ b/src/mongo/db/repl/tenant_database_cloner.h @@ -57,7 +57,8 @@ public: const HostAndPort& source, DBClientConnection* client, StorageInterface* storageInterface, - ThreadPool* dbPool); + ThreadPool* dbPool, + StringData tenantId); virtual ~TenantDatabaseCloner() = default; @@ -123,6 +124,9 @@ private: TenantDatabaseClonerStage _listCollectionsStage; // (R) + // The database name prefix of the tenant associated with this migration. + std::string _tenantId; // (R) + // The operationTime returned with the listCollections result. Timestamp _operationTime; // (X) diff --git a/src/mongo/db/repl/tenant_database_cloner_test.cpp b/src/mongo/db/repl/tenant_database_cloner_test.cpp index 7342c368db9..3331178e080 100644 --- a/src/mongo/db/repl/tenant_database_cloner_test.cpp +++ b/src/mongo/db/repl/tenant_database_cloner_test.cpp @@ -43,9 +43,9 @@ namespace mongo { namespace repl { -struct CollectionCloneInfo { - std::shared_ptr<CollectionMockStats> stats = std::make_shared<CollectionMockStats>(); - CollectionBulkLoaderMock* loader = nullptr; +struct TenantCollectionCloneInfo { + size_t numDocsInserted{0}; + bool collCreated = false; }; class TenantDatabaseClonerTest : public ClonerTestFixture { @@ -55,21 +55,26 @@ public: protected: void setUp() override { ClonerTestFixture::setUp(); - _storageInterface.createCollectionForBulkFn = - [this](const NamespaceString& nss, - const CollectionOptions& options, - const BSONObj& idIndexSpec, - const std::vector<BSONObj>& secondaryIndexSpecs) - -> StatusWith<std::unique_ptr<CollectionBulkLoaderMock>> { + _storageInterface.createCollFn = [this](OperationContext* opCtx, + const NamespaceString& nss, + const CollectionOptions& options) -> Status { const auto collInfo = &_collections[nss]; - - auto localLoader = std::make_unique<CollectionBulkLoaderMock>(collInfo->stats); - auto status = localLoader->init(secondaryIndexSpecs); - if (!status.isOK()) - return status; - collInfo->loader = localLoader.get(); - - return std::move(localLoader); + collInfo->collCreated = true; + collInfo->numDocsInserted = 0; + return Status::OK(); + }; + _storageInterface.createIndexesOnEmptyCollFn = + [this](OperationContext* opCtx, + const NamespaceString& nss, + const std::vector<BSONObj>& secondaryIndexSpecs) -> Status { + return Status::OK(); + }; + _storageInterface.insertDocumentsFn = [this](OperationContext* opCtx, + const NamespaceStringOrUUID& nsOrUUID, + const std::vector<InsertStatement>& ops) { + const auto collInfo = &_collections[nsOrUUID.nss().get()]; + collInfo->numDocsInserted += ops.size(); + return Status::OK(); }; setInitialSyncId(); _mockClient->setOperationTime(_operationTime); @@ -81,7 +86,8 @@ protected: _source, _mockClient.get(), &_storageInterface, - _dbWorkThreadPool.get()); + _dbWorkThreadPool.get(), + _tenantId); } BSONObj createListCollectionsResponse(const std::vector<BSONObj>& collections) { @@ -119,14 +125,16 @@ protected: return cloner->_collections; } - std::map<NamespaceString, CollectionCloneInfo> _collections; + std::map<NamespaceString, TenantCollectionCloneInfo> _collections; + static std::string _tenantId; static std::string _dbName; static Timestamp _operationTime; }; /* static */ -std::string TenantDatabaseClonerTest::_dbName = "testDb"; +std::string TenantDatabaseClonerTest::_tenantId = "tenant42"; +std::string TenantDatabaseClonerTest::_dbName = _tenantId + "_testDb"; Timestamp TenantDatabaseClonerTest::_operationTime = Timestamp(12345, 42); // A database may have no collections. Nothing to do for the tenant database cloner. @@ -454,200 +462,185 @@ TEST_F(TenantDatabaseClonerTest, ListCollectionsRecordsCorrectOperationTime) { clonerThread.join(); } -// TODO(SERVER-48845): Restore the below tests. - -// TEST_F(TenantDatabaseClonerTest, FirstCollectionListIndexesFailed) { -// auto uuid1 = UUID::gen(); -// auto uuid2 = UUID::gen(); -// const BSONObj idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" -// << "_id_"); -// const std::vector<BSONObj> sourceInfos = {BSON("name" -// << "a" -// << "type" -// << "collection" -// << "options" << BSONObj() << "info" -// << BSON("readOnly" << false << "uuid" << -// uuid1)), -// BSON( -// "name" -// << "b" -// << "type" -// << "collection" -// << "options" << BSONObj() << "info" -// << BSON("readOnly" << false << "uuid" << -// uuid2))}; -// _mockServer->setCommandReply("listCollections", -// createListCollectionsResponse({sourceInfos[0], -// sourceInfos[1]})); -// _mockServer->setCommandReply("find", createFindResponse()); -// _mockServer->setCommandReply("count", {createCountResponse(0), createCountResponse(0)}); -// _mockServer->setCommandReply("listIndexes", -// {BSON("ok" << 0 << "errmsg" -// << "fake message" -// << "code" << ErrorCodes::CursorNotFound), -// createCursorResponse(_dbName + ".b", -// BSON_ARRAY(idIndexSpec))}); -// auto cloner = makeDatabaseCloner(); -// auto status = cloner->run(); -// ASSERT_NOT_OK(status); - -// ASSERT_EQ(status.code(), ErrorCodes::InitialSyncFailure); -// ASSERT_EQUALS(0u, _collections.size()); -// } - -// TEST_F(TenantDatabaseClonerTest, CreateCollections) { -// auto uuid1 = UUID::gen(); -// auto uuid2 = UUID::gen(); -// const BSONObj idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" -// << "_id_"); -// const std::vector<BSONObj> sourceInfos = {BSON("name" -// << "a" -// << "type" -// << "collection" -// << "options" << BSONObj() << "info" -// << BSON("readOnly" << false << "uuid" << -// uuid1)), -// BSON( -// "name" -// << "b" -// << "type" -// << "collection" -// << "options" << BSONObj() << "info" -// << BSON("readOnly" << false << "uuid" << -// uuid2))}; -// _mockServer->setCommandReply("listCollections", -// createListCollectionsResponse({sourceInfos[0], -// sourceInfos[1]})); -// _mockServer->setCommandReply("find", createFindResponse()); -// _mockServer->setCommandReply("count", {createCountResponse(0), createCountResponse(0)}); -// _mockServer->setCommandReply("listIndexes", -// {createCursorResponse(_dbName + ".a", BSON_ARRAY(idIndexSpec)), -// createCursorResponse(_dbName + ".b", -// BSON_ARRAY(idIndexSpec))}); -// auto cloner = makeDatabaseCloner(); -// auto status = cloner->run(); -// ASSERT_OK(status); - -// ASSERT_EQUALS(2U, _collections.size()); - -// auto collInfo = _collections[NamespaceString{_dbName, "a"}]; -// auto stats = *collInfo.stats; -// ASSERT_EQUALS(0, stats.insertCount); -// ASSERT(stats.commitCalled); - -// collInfo = _collections[NamespaceString{_dbName, "b"}]; -// stats = *collInfo.stats; -// ASSERT_EQUALS(0, stats.insertCount); -// ASSERT(stats.commitCalled); -// } - -// TEST_F(TenantDatabaseClonerTest, DatabaseAndCollectionStats) { -// auto uuid1 = UUID::gen(); -// auto uuid2 = UUID::gen(); -// const BSONObj idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" -// << "_id_"); -// const BSONObj extraIndexSpec = BSON("v" << 1 << "key" << BSON("x" << 1) << "name" -// << "_extra_"); -// const std::vector<BSONObj> sourceInfos = {BSON("name" -// << "a" -// << "type" -// << "collection" -// << "options" << BSONObj() << "info" -// << BSON("readOnly" << false << "uuid" << -// uuid1)), -// BSON( -// "name" -// << "b" -// << "type" -// << "collection" -// << "options" << BSONObj() << "info" -// << BSON("readOnly" << false << "uuid" << -// uuid2))}; -// _mockServer->setCommandReply("listCollections", -// createListCollectionsResponse({sourceInfos[0], -// sourceInfos[1]})); -// _mockServer->setCommandReply("find", createFindResponse()); -// _mockServer->setCommandReply("count", {createCountResponse(0), createCountResponse(0)}); -// _mockServer->setCommandReply( -// "listIndexes", -// {createCursorResponse(_dbName + ".a", BSON_ARRAY(idIndexSpec << extraIndexSpec)), -// createCursorResponse(_dbName + ".b", BSON_ARRAY(idIndexSpec))}); -// auto cloner = makeDatabaseCloner(); - -// auto collClonerBeforeFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); -// auto collClonerAfterFailPoint = globalFailPointRegistry().find("hangAfterClonerStage"); -// auto timesEntered = collClonerBeforeFailPoint->setMode( -// FailPoint::alwaysOn, -// 0, -// fromjson("{cloner: 'CollectionCloner', stage: 'count', nss: '" + _dbName + ".a'}")); -// collClonerAfterFailPoint->setMode( -// FailPoint::alwaysOn, -// 0, -// fromjson("{cloner: 'CollectionCloner', stage: 'count', nss: '" + _dbName + ".a'}")); - -// // Run the cloner in a separate thread. -// stdx::thread clonerThread([&] { -// Client::initThread("ClonerRunner"); -// ASSERT_OK(cloner->run()); -// }); -// // Wait for the failpoint to be reached -// collClonerBeforeFailPoint->waitForTimesEntered(timesEntered + 1); - -// // Collection stats should be set up with namespace. -// auto stats = cloner->getStats(); -// ASSERT_EQ(_dbName, stats.dbname); -// ASSERT_EQ(_clock.now(), stats.start); -// ASSERT_EQ(2, stats.collections); -// ASSERT_EQ(0, stats.clonedCollections); -// ASSERT_EQ(2, stats.collectionStats.size()); -// ASSERT_EQ(_dbName + ".a", stats.collectionStats[0].ns); -// ASSERT_EQ(_dbName + ".b", stats.collectionStats[1].ns); -// ASSERT_EQ(_clock.now(), stats.collectionStats[0].start); -// ASSERT_EQ(Date_t(), stats.collectionStats[0].end); -// ASSERT_EQ(Date_t(), stats.collectionStats[1].start); -// ASSERT_EQ(0, stats.collectionStats[0].indexes); -// ASSERT_EQ(0, stats.collectionStats[1].indexes); -// _clock.advance(Minutes(1)); - -// // Move to the next collection -// timesEntered = collClonerBeforeFailPoint->setMode( -// FailPoint::alwaysOn, -// 0, -// fromjson("{cloner: 'CollectionCloner', stage: 'count', nss: '" + _dbName + ".b'}")); -// collClonerAfterFailPoint->setMode(FailPoint::off); - -// // Wait for the failpoint to be reached -// collClonerBeforeFailPoint->waitForTimesEntered(timesEntered + 1); - -// stats = cloner->getStats(); -// ASSERT_EQ(2, stats.collections); -// ASSERT_EQ(1, stats.clonedCollections); -// ASSERT_EQ(2, stats.collectionStats.size()); -// ASSERT_EQ(_dbName + ".a", stats.collectionStats[0].ns); -// ASSERT_EQ(_dbName + ".b", stats.collectionStats[1].ns); -// ASSERT_EQ(2, stats.collectionStats[0].indexes); -// ASSERT_EQ(0, stats.collectionStats[1].indexes); -// ASSERT_EQ(_clock.now(), stats.collectionStats[0].end); -// ASSERT_EQ(_clock.now(), stats.collectionStats[1].start); -// ASSERT_EQ(Date_t(), stats.collectionStats[1].end); -// _clock.advance(Minutes(1)); - -// // Finish -// collClonerBeforeFailPoint->setMode(FailPoint::off, 0); -// clonerThread.join(); - -// stats = cloner->getStats(); -// ASSERT_EQ(_dbName, stats.dbname); -// ASSERT_EQ(_clock.now(), stats.end); -// ASSERT_EQ(2, stats.collections); -// ASSERT_EQ(2, stats.clonedCollections); -// ASSERT_EQ(2, stats.collectionStats.size()); -// ASSERT_EQ(_dbName + ".a", stats.collectionStats[0].ns); -// ASSERT_EQ(_dbName + ".b", stats.collectionStats[1].ns); -// ASSERT_EQ(2, stats.collectionStats[0].indexes); -// ASSERT_EQ(1, stats.collectionStats[1].indexes); -// ASSERT_EQ(_clock.now(), stats.collectionStats[1].end); -// } +TEST_F(TenantDatabaseClonerTest, FirstCollectionListIndexesFailed) { + auto uuid1 = UUID::gen(); + auto uuid2 = UUID::gen(); + const BSONObj idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" + << "_id_"); + const std::vector<BSONObj> sourceInfos = {BSON("name" + << "a" + << "type" + << "collection" + << "options" << BSONObj() << "info" + << BSON("readOnly" << false << "uuid" << uuid1)), + BSON( + "name" + << "b" + << "type" + << "collection" + << "options" << BSONObj() << "info" + << BSON("readOnly" << false << "uuid" << uuid2))}; + _mockServer->setCommandReply("listCollections", + createListCollectionsResponse({sourceInfos[0], sourceInfos[1]})); + _mockServer->setCommandReply("find", createFindResponse()); + _mockServer->setCommandReply("count", {createCountResponse(0), createCountResponse(0)}); + _mockServer->setCommandReply("listIndexes", + {BSON("ok" << 0 << "errmsg" + << "fake message" + << "code" << ErrorCodes::CursorNotFound), + createCursorResponse(_dbName + ".b", BSON_ARRAY(idIndexSpec))}); + auto cloner = makeDatabaseCloner(); + auto status = cloner->run(); + ASSERT_NOT_OK(status); + + ASSERT_EQ(status.code(), ErrorCodes::CursorNotFound); + ASSERT_EQUALS(0u, _collections.size()); +} + +TEST_F(TenantDatabaseClonerTest, CreateCollections) { + auto uuid1 = UUID::gen(); + auto uuid2 = UUID::gen(); + const BSONObj idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" + << "_id_"); + const std::vector<BSONObj> sourceInfos = {BSON("name" + << "a" + << "type" + << "collection" + << "options" << BSONObj() << "info" + << BSON("readOnly" << false << "uuid" << uuid1)), + BSON( + "name" + << "b" + << "type" + << "collection" + << "options" << BSONObj() << "info" + << BSON("readOnly" << false << "uuid" << uuid2))}; + _mockServer->setCommandReply("listCollections", + createListCollectionsResponse({sourceInfos[0], sourceInfos[1]})); + _mockServer->setCommandReply("find", createFindResponse()); + _mockServer->setCommandReply("count", {createCountResponse(0), createCountResponse(0)}); + _mockServer->setCommandReply("listIndexes", + {createCursorResponse(_dbName + ".a", BSON_ARRAY(idIndexSpec)), + createCursorResponse(_dbName + ".b", BSON_ARRAY(idIndexSpec))}); + auto cloner = makeDatabaseCloner(); + auto status = cloner->run(); + ASSERT_OK(status); + + ASSERT_EQUALS(2U, _collections.size()); + + auto collInfo = _collections[NamespaceString{_dbName, "a"}]; + ASSERT(collInfo.collCreated); + ASSERT_EQUALS(0, collInfo.numDocsInserted); + + collInfo = _collections[NamespaceString{_dbName, "b"}]; + ASSERT(collInfo.collCreated); + ASSERT_EQUALS(0, collInfo.numDocsInserted); +} + +TEST_F(TenantDatabaseClonerTest, DatabaseAndCollectionStats) { + auto uuid1 = UUID::gen(); + auto uuid2 = UUID::gen(); + const BSONObj idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name" + << "_id_"); + const BSONObj extraIndexSpec = BSON("v" << 1 << "key" << BSON("x" << 1) << "name" + << "_extra_"); + const std::vector<BSONObj> sourceInfos = {BSON("name" + << "a" + << "type" + << "collection" + << "options" << BSONObj() << "info" + << BSON("readOnly" << false << "uuid" << uuid1)), + BSON( + "name" + << "b" + << "type" + << "collection" + << "options" << BSONObj() << "info" + << BSON("readOnly" << false << "uuid" << uuid2))}; + _mockServer->setCommandReply("listCollections", + createListCollectionsResponse({sourceInfos[0], sourceInfos[1]})); + _mockServer->setCommandReply("find", createFindResponse()); + _mockServer->setCommandReply("count", {createCountResponse(0), createCountResponse(0)}); + _mockServer->setCommandReply( + "listIndexes", + {createCursorResponse(_dbName + ".a", BSON_ARRAY(idIndexSpec << extraIndexSpec)), + createCursorResponse(_dbName + ".b", BSON_ARRAY(idIndexSpec))}); + auto cloner = makeDatabaseCloner(); + + auto collClonerBeforeFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage"); + auto collClonerAfterFailPoint = globalFailPointRegistry().find("hangAfterClonerStage"); + auto timesEntered = collClonerBeforeFailPoint->setMode( + FailPoint::alwaysOn, + 0, + fromjson("{cloner: 'TenantCollectionCloner', stage: 'count', nss: '" + _dbName + ".a'}")); + collClonerAfterFailPoint->setMode( + FailPoint::alwaysOn, + 0, + fromjson("{cloner: 'TenantCollectionCloner', stage: 'count', nss: '" + _dbName + ".a'}")); + + // Run the cloner in a separate thread. + stdx::thread clonerThread([&] { + Client::initThread("ClonerRunner"); + ASSERT_OK(cloner->run()); + }); + // Wait for the failpoint to be reached + collClonerBeforeFailPoint->waitForTimesEntered(timesEntered + 1); + + // Collection stats should be set up with namespace. + auto stats = cloner->getStats(); + ASSERT_EQ(_dbName, stats.dbname); + ASSERT_EQ(_clock.now(), stats.start); + ASSERT_EQ(2, stats.collections); + ASSERT_EQ(0, stats.clonedCollections); + ASSERT_EQ(2, stats.collectionStats.size()); + ASSERT_EQ(_dbName + ".a", stats.collectionStats[0].ns); + ASSERT_EQ(_dbName + ".b", stats.collectionStats[1].ns); + ASSERT_EQ(_clock.now(), stats.collectionStats[0].start); + ASSERT_EQ(Date_t(), stats.collectionStats[0].end); + ASSERT_EQ(Date_t(), stats.collectionStats[1].start); + ASSERT_EQ(0, stats.collectionStats[0].indexes); + ASSERT_EQ(0, stats.collectionStats[1].indexes); + _clock.advance(Minutes(1)); + + // Move to the next collection + timesEntered = collClonerBeforeFailPoint->setMode( + FailPoint::alwaysOn, + 0, + fromjson("{cloner: 'TenantCollectionCloner', stage: 'count', nss: '" + _dbName + ".b'}")); + collClonerAfterFailPoint->setMode(FailPoint::off); + + // Wait for the failpoint to be reached + collClonerBeforeFailPoint->waitForTimesEntered(timesEntered + 1); + + stats = cloner->getStats(); + ASSERT_EQ(2, stats.collections); + ASSERT_EQ(1, stats.clonedCollections); + ASSERT_EQ(2, stats.collectionStats.size()); + ASSERT_EQ(_dbName + ".a", stats.collectionStats[0].ns); + ASSERT_EQ(_dbName + ".b", stats.collectionStats[1].ns); + ASSERT_EQ(2, stats.collectionStats[0].indexes); + ASSERT_EQ(0, stats.collectionStats[1].indexes); + ASSERT_EQ(_clock.now(), stats.collectionStats[0].end); + ASSERT_EQ(_clock.now(), stats.collectionStats[1].start); + ASSERT_EQ(Date_t(), stats.collectionStats[1].end); + _clock.advance(Minutes(1)); + + // Finish + collClonerBeforeFailPoint->setMode(FailPoint::off, 0); + clonerThread.join(); + + stats = cloner->getStats(); + ASSERT_EQ(_dbName, stats.dbname); + ASSERT_EQ(_clock.now(), stats.end); + ASSERT_EQ(2, stats.collections); + ASSERT_EQ(2, stats.clonedCollections); + ASSERT_EQ(2, stats.collectionStats.size()); + ASSERT_EQ(_dbName + ".a", stats.collectionStats[0].ns); + ASSERT_EQ(_dbName + ".b", stats.collectionStats[1].ns); + ASSERT_EQ(2, stats.collectionStats[0].indexes); + ASSERT_EQ(1, stats.collectionStats[1].indexes); + ASSERT_EQ(_clock.now(), stats.collectionStats[1].end); +} } // namespace repl } // namespace mongo |