summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVesselina Ratcheva <vesselina.ratcheva@10gen.com>2020-08-03 20:24:06 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-12 20:57:45 +0000
commitc999fbe8a0cb200ed0e23e23cddafde4abb028e8 (patch)
tree4b1e330597d2ff4e462447316a988bf52a06bc5b
parentd86d2f5df9c93da3c9a9d6c7e8852b5c674cb0bb (diff)
downloadmongo-c999fbe8a0cb200ed0e23e23cddafde4abb028e8.tar.gz
SERVER-48845 Implement TenantCollectionCloner
-rw-r--r--src/mongo/db/repl/SConscript5
-rw-r--r--src/mongo/db/repl/cloner_utils.cpp4
-rw-r--r--src/mongo/db/repl/cloner_utils.h6
-rw-r--r--src/mongo/db/repl/storage_interface.h8
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp24
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h4
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h15
-rw-r--r--src/mongo/db/repl/tenant_all_database_cloner.cpp35
-rw-r--r--src/mongo/db/repl/tenant_all_database_cloner.h4
-rw-r--r--src/mongo/db/repl/tenant_all_database_cloner_test.cpp14
-rw-r--r--src/mongo/db/repl/tenant_collection_cloner.cpp336
-rw-r--r--src/mongo/db/repl/tenant_collection_cloner.h161
-rw-r--r--src/mongo/db/repl/tenant_collection_cloner_test.cpp574
-rw-r--r--src/mongo/db/repl/tenant_database_cloner.cpp41
-rw-r--r--src/mongo/db/repl/tenant_database_cloner.h6
-rw-r--r--src/mongo/db/repl/tenant_database_cloner_test.cpp421
16 files changed, 1397 insertions, 261 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index e0185666445..f6b8e2054f2 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1008,10 +1008,14 @@ env.Library(
'base_cloner',
'cloner_utils',
'initial_sync_shared_data',
+ 'task_runner',
'$BUILD_DIR/mongo/base',
],
LIBDEPS_PRIVATE=[
+ 'repl_server_parameters',
'$BUILD_DIR/mongo/db/commands/list_collections_filter',
+ '$BUILD_DIR/mongo/rpc/metadata',
+ '$BUILD_DIR/mongo/util/progress_meter',
]
)
@@ -1504,6 +1508,7 @@ env.CppUnitTest(
'database_cloner_test.cpp',
'initial_sync_shared_data_test.cpp',
'tenant_all_database_cloner_test.cpp',
+ 'tenant_collection_cloner_test.cpp',
'tenant_database_cloner_test.cpp'
],
LIBDEPS=[
diff --git a/src/mongo/db/repl/cloner_utils.cpp b/src/mongo/db/repl/cloner_utils.cpp
index 988259c0b46..184f9107e4d 100644
--- a/src/mongo/db/repl/cloner_utils.cpp
+++ b/src/mongo/db/repl/cloner_utils.cpp
@@ -51,5 +51,9 @@ BSONObj ClonerUtils::buildMajorityWaitRequest(Timestamp operationTime) {
return bob.obj();
}
+bool ClonerUtils::isNamespaceForTenant(NamespaceString nss, StringData prefix) {
+ return nss.db().startsWith(prefix + "_");
+}
+
} // namespace repl
} // namespace mongo \ No newline at end of file
diff --git a/src/mongo/db/repl/cloner_utils.h b/src/mongo/db/repl/cloner_utils.h
index 9f37ac91648..b9acd75b2ee 100644
--- a/src/mongo/db/repl/cloner_utils.h
+++ b/src/mongo/db/repl/cloner_utils.h
@@ -33,6 +33,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/timestamp.h"
+#include "mongo/db/namespace_string.h"
namespace mongo {
namespace repl {
@@ -54,6 +55,11 @@ public:
* Assembles a majority read using the operationTime specified as the afterClusterTime.
*/
static BSONObj buildMajorityWaitRequest(Timestamp operationTime);
+
+ /**
+ * Checks if the collection belongs to the given tenant.
+ */
+ static bool isNamespaceForTenant(NamespaceString nss, StringData prefix);
};
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
index bb722d04fcb..1aea0e40b93 100644
--- a/src/mongo/db/repl/storage_interface.h
+++ b/src/mongo/db/repl/storage_interface.h
@@ -160,6 +160,14 @@ public:
const CollectionOptions& options) = 0;
/**
+ * Creates all the specified non-_id indexes on a given collection, which must be empty.
+ */
+ virtual Status createIndexesOnEmptyCollection(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const std::vector<BSONObj>& secondaryIndexSpecs) = 0;
+
+ /**
* Drops a collection.
*/
virtual Status dropCollection(OperationContext* opCtx, const NamespaceString& nss) = 0;
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index 4127ad6b410..a6e9efc5011 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -493,6 +493,30 @@ Status StorageInterfaceImpl::createCollection(OperationContext* opCtx,
});
}
+Status StorageInterfaceImpl::createIndexesOnEmptyCollection(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const std::vector<BSONObj>& secondaryIndexSpecs) {
+ return writeConflictRetry(opCtx, "createIndexesOnEmptyCollection", nss.ns(), [&] {
+ AutoGetCollection autoColl(opCtx, nss, fixLockModeForSystemDotViewsChanges(nss, MODE_IX));
+ WriteUnitOfWork wunit(opCtx);
+
+ for (auto&& spec : secondaryIndexSpecs) {
+ // Will error if collection is not empty.
+ auto secIndexSW =
+ autoColl.getCollection()->getIndexCatalog()->createIndexOnEmptyCollection(opCtx,
+ spec);
+ auto status = secIndexSW.getStatus();
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+
+ wunit.commit();
+ return Status::OK();
+ });
+}
+
Status StorageInterfaceImpl::dropCollection(OperationContext* opCtx, const NamespaceString& nss) {
return writeConflictRetry(opCtx, "StorageInterfaceImpl::dropCollection", nss.ns(), [&] {
AutoGetDb autoDb(opCtx, nss.db(), MODE_IX);
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
index ac5a4497f9b..6c38fdd5b72 100644
--- a/src/mongo/db/repl/storage_interface_impl.h
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -83,6 +83,10 @@ public:
const NamespaceString& nss,
const CollectionOptions& options) override;
+ Status createIndexesOnEmptyCollection(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const std::vector<BSONObj>& secondaryIndexSpecs) override;
+
Status dropCollection(OperationContext* opCtx, const NamespaceString& nss) override;
Status truncateCollection(OperationContext* opCtx, const NamespaceString& nss) override;
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index 3c4ec8e9176..a57c2298110 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -104,6 +104,8 @@ public:
using CreateOplogFn = std::function<Status(OperationContext*, const NamespaceString&)>;
using CreateCollectionFn =
std::function<Status(OperationContext*, const NamespaceString&, const CollectionOptions&)>;
+ using CreateIndexesOnEmptyCollectionFn = std::function<Status(
+ OperationContext*, const NamespaceString&, const std::vector<BSONObj>&)>;
using TruncateCollectionFn =
std::function<Status(OperationContext*, const NamespaceString& nss)>;
using DropCollectionFn = std::function<Status(OperationContext*, const NamespaceString& nss)>;
@@ -172,6 +174,13 @@ public:
return createCollFn(opCtx, nss, options);
}
+ Status createIndexesOnEmptyCollection(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const std::vector<BSONObj>& secondaryIndexSpecs) override {
+ return createIndexesOnEmptyCollFn(opCtx, nss, secondaryIndexSpecs);
+ }
+
Status dropCollection(OperationContext* opCtx, const NamespaceString& nss) override {
return dropCollFn(opCtx, nss);
};
@@ -381,6 +390,12 @@ public:
[](OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options) {
return Status{ErrorCodes::IllegalOperation, "CreateCollectionFn not implemented."};
};
+ CreateIndexesOnEmptyCollectionFn createIndexesOnEmptyCollFn = [](OperationContext* opCtx,
+ const NamespaceString& nss,
+ const std::vector<BSONObj>&
+ secondaryIndexSpecs) {
+ return Status{ErrorCodes::IllegalOperation, "createIndexesOnEmptyCollFn not implemented."};
+ };
TruncateCollectionFn truncateCollFn = [](OperationContext* opCtx, const NamespaceString& nss) {
return Status{ErrorCodes::IllegalOperation, "TruncateCollectionFn not implemented."};
};
diff --git a/src/mongo/db/repl/tenant_all_database_cloner.cpp b/src/mongo/db/repl/tenant_all_database_cloner.cpp
index 1005ddc5b25..b27e070a4a7 100644
--- a/src/mongo/db/repl/tenant_all_database_cloner.cpp
+++ b/src/mongo/db/repl/tenant_all_database_cloner.cpp
@@ -52,10 +52,10 @@ TenantAllDatabaseCloner::TenantAllDatabaseCloner(InitialSyncSharedData* sharedDa
DBClientConnection* client,
StorageInterface* storageInterface,
ThreadPool* dbPool,
- StringData databasePrefix)
+ StringData tenantId)
: BaseCloner(
"TenantAllDatabaseCloner"_sd, sharedData, source, client, storageInterface, dbPool),
- _databasePrefix(databasePrefix),
+ _tenantId(tenantId),
_listDatabasesStage("listDatabases", this, &TenantAllDatabaseCloner::listDatabasesStage) {}
BaseCloner::ClonerStages TenantAllDatabaseCloner::getStages() {
@@ -63,19 +63,23 @@ BaseCloner::ClonerStages TenantAllDatabaseCloner::getStages() {
}
BaseCloner::AfterStageBehavior TenantAllDatabaseCloner::listDatabasesStage() {
+ // This will be set after a successful listDatabases command.
+ _operationTime = Timestamp();
+
BSONObj res;
- const BSONObj filter = ClonerUtils::makeTenantDatabaseFilter(_databasePrefix);
+ const BSONObj filter = ClonerUtils::makeTenantDatabaseFilter(_tenantId);
auto databasesArray = getClient()->getDatabaseInfos(filter, true /* nameOnly */);
- // Do a speculative majority read on the sync source to make sure the databases listed
- // exist on a majority of nodes in the set. We do not check the rollbackId - rollback
- // would lead to the sync source closing connections so the stage would fail.
+ // Do a majority read on the sync source to make sure the databases listed exist on a majority
+ // of nodes in the set. We do not check the rollbackId - rollback would lead to the sync source
+ // closing connections so the stage would fail.
_operationTime = getClient()->getOperationTime();
if (MONGO_unlikely(tenantAllDatabaseClonerHangAfterGettingOperationTime.shouldFail())) {
LOGV2(4881504,
"Failpoint 'tenantAllDatabaseClonerHangAfterGettingOperationTime' enabled. Blocking "
- "until it is disabled.");
+ "until it is disabled.",
+ "tenantId"_attr = _tenantId);
tenantAllDatabaseClonerHangAfterGettingOperationTime.pauseWhileSet();
}
@@ -88,7 +92,11 @@ BaseCloner::AfterStageBehavior TenantAllDatabaseCloner::listDatabasesStage() {
// Process and verify the listDatabases results.
for (const auto& dbBSON : databasesArray) {
- LOGV2_DEBUG(4881508, 2, "Cloner received listDatabases entry", "db"_attr = dbBSON);
+ LOGV2_DEBUG(4881508,
+ 2,
+ "Cloner received listDatabases entry",
+ "db"_attr = dbBSON,
+ "tenantId"_attr = _tenantId);
uassert(4881505, "Result from donor must have 'name' set", dbBSON.hasField("name"));
const auto& dbName = dbBSON["name"].str();
@@ -117,7 +125,8 @@ void TenantAllDatabaseCloner::postStage() {
getSource(),
getClient(),
getStorageInterface(),
- getDBPool());
+ getDBPool(),
+ _tenantId);
}
auto dbStatus = _currentDatabaseCloner->run();
if (dbStatus.isOK()) {
@@ -125,14 +134,16 @@ void TenantAllDatabaseCloner::postStage() {
1,
"Tenant migration database clone finished",
"dbName"_attr = dbName,
- "status"_attr = dbStatus);
+ "status"_attr = dbStatus,
+ "tenantId"_attr = _tenantId);
} else {
LOGV2_WARNING(4881501,
"Tenant migration database clone failed",
"dbName"_attr = dbName,
"dbNumber"_attr = (_stats.databasesCloned + 1),
"totalDbs"_attr = _databases.size(),
- "error"_attr = dbStatus.toString());
+ "error"_attr = dbStatus.toString(),
+ "tenantId"_attr = _tenantId);
setInitialSyncFailedStatus(dbStatus);
return;
}
@@ -158,7 +169,7 @@ std::string TenantAllDatabaseCloner::toString() const {
stdx::lock_guard<Latch> lk(_mutex);
return str::stream() << "tenant migration --"
<< " active:" << isActive(lk) << " status:" << getStatus(lk).toString()
- << " source:" << getSource()
+ << " source:" << getSource() << " tenantId: " << _tenantId
<< " db cloners completed:" << _stats.databasesCloned;
}
diff --git a/src/mongo/db/repl/tenant_all_database_cloner.h b/src/mongo/db/repl/tenant_all_database_cloner.h
index 75859f95383..c02ec12046b 100644
--- a/src/mongo/db/repl/tenant_all_database_cloner.h
+++ b/src/mongo/db/repl/tenant_all_database_cloner.h
@@ -53,7 +53,7 @@ public:
DBClientConnection* client,
StorageInterface* storageInterface,
ThreadPool* dbPool,
- StringData databasePrefix);
+ StringData tenantId);
virtual ~TenantAllDatabaseCloner() = default;
@@ -111,7 +111,7 @@ private:
std::unique_ptr<TenantDatabaseCloner> _currentDatabaseCloner; // (MX)
// The database name prefix of the tenant associated with this migration.
- std::string _databasePrefix; // (R)
+ std::string _tenantId; // (R)
TenantAllDatabaseClonerStage _listDatabasesStage; // (R)
diff --git a/src/mongo/db/repl/tenant_all_database_cloner_test.cpp b/src/mongo/db/repl/tenant_all_database_cloner_test.cpp
index 9ae8973c5a3..ae853a9f56f 100644
--- a/src/mongo/db/repl/tenant_all_database_cloner_test.cpp
+++ b/src/mongo/db/repl/tenant_all_database_cloner_test.cpp
@@ -61,7 +61,7 @@ protected:
_mockClient.get(),
&_storageInterface,
_dbWorkThreadPool.get(),
- _databasePrefix);
+ _tenantId);
}
std::vector<std::string> getDatabasesFromCloner(TenantAllDatabaseCloner* cloner) {
@@ -80,7 +80,7 @@ protected:
}
static Timestamp _operationTime;
- static std::string _databasePrefix;
+ static std::string _tenantId;
static std::string _tenantDbA;
static std::string _tenantDbAAB;
static std::string _tenantDbABC;
@@ -89,11 +89,11 @@ protected:
/* static */
Timestamp TenantAllDatabaseClonerTest::_operationTime = Timestamp(12345, 67);
-std::string TenantAllDatabaseClonerTest::_databasePrefix = "tenant42";
-std::string TenantAllDatabaseClonerTest::_tenantDbA = _databasePrefix + "_a";
-std::string TenantAllDatabaseClonerTest::_tenantDbAAB = _databasePrefix + "_aab";
-std::string TenantAllDatabaseClonerTest::_tenantDbABC = _databasePrefix + "_abc";
-std::string TenantAllDatabaseClonerTest::_tenantDbB = _databasePrefix + "_b";
+std::string TenantAllDatabaseClonerTest::_tenantId = "tenant42";
+std::string TenantAllDatabaseClonerTest::_tenantDbA = _tenantId + "_a";
+std::string TenantAllDatabaseClonerTest::_tenantDbAAB = _tenantId + "_aab";
+std::string TenantAllDatabaseClonerTest::_tenantDbABC = _tenantId + "_abc";
+std::string TenantAllDatabaseClonerTest::_tenantDbB = _tenantId + "_b";
TEST_F(TenantAllDatabaseClonerTest, FailsOnListDatabases) {
Status expectedResult{ErrorCodes::BadValue, "foo"};
diff --git a/src/mongo/db/repl/tenant_collection_cloner.cpp b/src/mongo/db/repl/tenant_collection_cloner.cpp
index be1d6c0a3de..a3948e352d4 100644
--- a/src/mongo/db/repl/tenant_collection_cloner.cpp
+++ b/src/mongo/db/repl/tenant_collection_cloner.cpp
@@ -33,26 +33,74 @@
#include "mongo/base/string_data.h"
#include "mongo/db/commands/list_collections_filter.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/repl/cloner_utils.h"
#include "mongo/db/repl/database_cloner_gen.h"
+#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/db/repl/tenant_collection_cloner.h"
#include "mongo/logv2/log.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/rpc/metadata/repl_set_metadata.h"
#include "mongo/util/assert_util.h"
namespace mongo {
namespace repl {
+namespace {
+const int kProgressMeterSecondsBetween = 60;
+const int kProgressMeterCheckInterval = 128;
+} // namespace
+
+// Failpoint which causes the tenant database cloner to hang after it has successfully run
+// listIndexes and recorded the results and the operationTime.
+MONGO_FAIL_POINT_DEFINE(tenantCollectionClonerHangAfterGettingOperationTime);
+
+// Failpoint which causes tenant migration to hang after handling the next batch of results from the
+// DBClientConnection, optionally limited to a specific collection.
+MONGO_FAIL_POINT_DEFINE(tenantMigrationHangCollectionClonerAfterHandlingBatchResponse);
+
+// Failpoint which causes tenant migration to hang when it has cloned 'numDocsToClone' documents to
+// collection 'namespace'.
+MONGO_FAIL_POINT_DEFINE(tenantMigrationHangDuringCollectionClone);
+
TenantCollectionCloner::TenantCollectionCloner(const NamespaceString& sourceNss,
const CollectionOptions& collectionOptions,
InitialSyncSharedData* sharedData,
const HostAndPort& source,
DBClientConnection* client,
StorageInterface* storageInterface,
- ThreadPool* dbPool)
+ ThreadPool* dbPool,
+ StringData tenantId)
: BaseCloner("TenantCollectionCloner"_sd, sharedData, source, client, storageInterface, dbPool),
_sourceNss(sourceNss),
_collectionOptions(collectionOptions),
_sourceDbAndUuid(NamespaceString("UNINITIALIZED")),
- _placeholderStage("placeholder", this, &TenantCollectionCloner::placeholderStage) {
+ _countStage("count", this, &TenantCollectionCloner::countStage),
+ _listIndexesStage("listIndexes", this, &TenantCollectionCloner::listIndexesStage),
+ _createCollectionStage(
+ "createCollection", this, &TenantCollectionCloner::createCollectionStage),
+ _queryStage("query", this, &TenantCollectionCloner::queryStage),
+ _progressMeter(1U, // total will be replaced with count command result.
+ kProgressMeterSecondsBetween,
+ kProgressMeterCheckInterval,
+ "documents copied",
+ str::stream() << _sourceNss.toString() << " tenant collection clone progress"),
+ _scheduleDbWorkFn([this](executor::TaskExecutor::CallbackFn work) {
+ auto task = [ this, work = std::move(work) ](
+ OperationContext * opCtx,
+ const Status& status) mutable noexcept->TaskRunner::NextAction {
+ try {
+ work(executor::TaskExecutor::CallbackArgs(nullptr, {}, status, opCtx));
+ } catch (const DBException& e) {
+ setInitialSyncFailedStatus(e.toStatus());
+ }
+ return TaskRunner::NextAction::kDisposeOperationContext;
+ };
+ _dbWorkTaskRunner.schedule(std::move(task));
+ return executor::TaskExecutor::CallbackHandle();
+ }),
+ _dbWorkTaskRunner(dbPool),
+ _tenantId(tenantId) {
invariant(sourceNss.isValid());
invariant(collectionOptions.uuid);
_sourceDbAndUuid = NamespaceStringOrUUID(sourceNss.db().toString(), *collectionOptions.uuid);
@@ -60,18 +108,293 @@ TenantCollectionCloner::TenantCollectionCloner(const NamespaceString& sourceNss,
}
BaseCloner::ClonerStages TenantCollectionCloner::getStages() {
- return {&_placeholderStage};
+ return {&_countStage, &_listIndexesStage, &_createCollectionStage, &_queryStage};
+}
+
+void TenantCollectionCloner::preStage() {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _stats.start = getSharedData()->getClock()->now();
+}
+
+void TenantCollectionCloner::postStage() {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _stats.end = getSharedData()->getClock()->now();
+}
+
+BaseCloner::AfterStageBehavior TenantCollectionCloner::TenantCollectionClonerStage::run() {
+ try {
+ return ClonerStage<TenantCollectionCloner>::run();
+ } catch (const DBException&) {
+ getCloner()->waitForDatabaseWorkToComplete();
+ throw;
+ }
+}
+
+BaseCloner::AfterStageBehavior TenantCollectionCloner::countStage() {
+ auto count = getClient()->count(_sourceDbAndUuid,
+ {} /* Query */,
+ QueryOption_SlaveOk,
+ 0 /* limit */,
+ 0 /* skip */,
+ ReadConcernArgs::kImplicitDefault);
+
+ // The count command may return a negative value after an unclean shutdown,
+ // so we set it to zero here to avoid aborting the collection clone.
+ // Note that this count value is only used for reporting purposes.
+ if (count < 0) {
+ LOGV2_WARNING(4884502,
+ "Count command returned negative value. Updating to 0 to allow progress "
+ "meter to function properly",
+ "namespace"_attr = _sourceNss.ns(),
+ "tenantId"_attr = _tenantId);
+ count = 0;
+ }
+
+ _progressMeter.setTotalWhileRunning(static_cast<unsigned long long>(count));
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _stats.documentToCopy = count;
+ }
+ return kContinueNormally;
}
-BaseCloner::AfterStageBehavior TenantCollectionCloner::placeholderStage() {
+BaseCloner::AfterStageBehavior TenantCollectionCloner::listIndexesStage() {
+ // This will be set after a successful listCollections command.
+ _operationTime = Timestamp();
+
+ auto indexSpecs = getClient()->getIndexSpecs(
+ _sourceDbAndUuid, false /* includeBuildUUIDs */, QueryOption_SlaveOk);
+
+ // Do a majority read on the sync source to make sure the indexes listed exist on a majority of
+ // nodes in the set. We do not check the rollbackId - rollback would lead to the sync source
+ // closing connections so the stage would fail.
+ _operationTime = getClient()->getOperationTime();
+
+ tenantCollectionClonerHangAfterGettingOperationTime.executeIf(
+ [&](const BSONObj&) {
+ while (
+ MONGO_unlikely(tenantCollectionClonerHangAfterGettingOperationTime.shouldFail()) &&
+ !mustExit()) {
+ LOGV2(4884509,
+ "tenantCollectionClonerHangAfterGettingOperationTime fail point "
+ "enabled. Blocking until fail point is disabled",
+ "namespace"_attr = _sourceNss.toString(),
+ "tenantId"_attr = _tenantId);
+ mongo::sleepsecs(1);
+ }
+ },
+ [&](const BSONObj& data) {
+ // Only hang when cloning the specified collection, or if no collection was specified.
+ auto nss = data["nss"].str();
+ return nss.empty() || nss == _sourceNss.toString();
+ });
+
+ BSONObj readResult;
+ BSONObj cmd = ClonerUtils::buildMajorityWaitRequest(_operationTime);
+ getClient()->runCommand("admin", cmd, readResult, QueryOption_SlaveOk);
+ uassertStatusOKWithContext(
+ getStatusFromCommandResult(readResult),
+ "TenantCollectionCloner failed to get listIndexes result majority-committed");
+
+ // Process the listIndexes results for finished indexes only.
+ if (indexSpecs.empty()) {
+ LOGV2_WARNING(4884503,
+ "No indexes found for collection while cloning",
+ "namespace"_attr = _sourceNss.ns(),
+ "source"_attr = getSource(),
+ "tenantId"_attr = _tenantId);
+ }
+ for (auto&& spec : indexSpecs) {
+ if (spec.hasField("name") && spec.getStringField("name") == "_id_"_sd) {
+ _idIndexSpec = spec.getOwned();
+ } else {
+ _readyIndexSpecs.push_back(spec.getOwned());
+ }
+ }
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _stats.indexes = _readyIndexSpecs.size() + (_idIndexSpec.isEmpty() ? 0 : 1);
+ };
+
+ if (!_idIndexSpec.isEmpty() && _collectionOptions.autoIndexId == CollectionOptions::NO) {
+ LOGV2_WARNING(4884504,
+ "Found the _id index spec but the collection specified autoIndexId of false",
+ "namespace"_attr = this->_sourceNss,
+ "tenantId"_attr = _tenantId);
+ }
+ return kContinueNormally;
+}
+
+BaseCloner::AfterStageBehavior TenantCollectionCloner::createCollectionStage() {
+ auto opCtx = cc().makeOperationContext();
+
+ auto status =
+ getStorageInterface()->createCollection(opCtx.get(), _sourceNss, _collectionOptions);
+ if (status == ErrorCodes::NamespaceExists) {
+ uassert(4884501,
+ "Collection exists but does not belong to tenant",
+ ClonerUtils::isNamespaceForTenant(_sourceNss, _tenantId));
+ } else {
+ uassertStatusOKWithContext(status, "Tenant collection cloner: create collection");
+ }
+
+ // This will start building the indexes whose specs we saved last stage.
+ status = getStorageInterface()->createIndexesOnEmptyCollection(
+ opCtx.get(), _sourceNss, _readyIndexSpecs);
+
+ uassertStatusOKWithContext(status, "Tenant collection cloner: create indexes");
+
return kContinueNormally;
}
+BaseCloner::AfterStageBehavior TenantCollectionCloner::queryStage() {
+ ON_BLOCK_EXIT([this] { this->unsetMetadataReader(); });
+ setMetadataReader();
+ runQuery();
+ waitForDatabaseWorkToComplete();
+ return kContinueNormally;
+}
+
+void TenantCollectionCloner::runQuery() {
+ auto query = QUERY("query" << BSONObj());
+ query.hint(BSON("_id" << 1));
+
+ getClient()->query([this](DBClientCursorBatchIterator& iter) { handleNextBatch(iter); },
+ _sourceDbAndUuid,
+ query,
+ nullptr /* fieldsToReturn */,
+ QueryOption_NoCursorTimeout | QueryOption_SlaveOk |
+ (collectionClonerUsesExhaust ? QueryOption_Exhaust : 0),
+ _collectionClonerBatchSize);
+ _dbWorkTaskRunner.join();
+}
+
+void TenantCollectionCloner::handleNextBatch(DBClientCursorBatchIterator& iter) {
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _stats.receivedBatches++;
+ while (iter.moreInCurrentBatch()) {
+ _documentsToInsert.emplace_back(InsertStatement(iter.nextSafe()));
+ }
+ }
+
+ // Schedule the next document batch insertion.
+ auto&& scheduleResult = _scheduleDbWorkFn(
+ [=](const executor::TaskExecutor::CallbackArgs& cbd) { insertDocumentsCallback(cbd); });
+
+ if (!scheduleResult.isOK()) {
+ Status newStatus = scheduleResult.getStatus().withContext(
+ str::stream() << "Error cloning collection '" << _sourceNss.ns() << "'");
+ // We must throw an exception to terminate query.
+ uassertStatusOK(newStatus);
+ }
+
+ tenantMigrationHangCollectionClonerAfterHandlingBatchResponse.executeIf(
+ [&](const BSONObj&) {
+ while (
+ MONGO_unlikely(
+ tenantMigrationHangCollectionClonerAfterHandlingBatchResponse.shouldFail()) &&
+ !mustExit()) {
+ LOGV2(4884506,
+ "tenantMigrationHangCollectionClonerAfterHandlingBatchResponse fail point "
+ "enabled. Blocking until fail point is disabled",
+ "namespace"_attr = _sourceNss.toString(),
+ "tenantId"_attr = _tenantId);
+ mongo::sleepsecs(1);
+ }
+ },
+ [&](const BSONObj& data) {
+ // Only hang when cloning the specified collection, or if no collection was specified.
+ auto nss = data["nss"].str();
+ return nss.empty() || nss == _sourceNss.toString();
+ });
+}
+
+
+void TenantCollectionCloner::insertDocumentsCallback(
+ const executor::TaskExecutor::CallbackArgs& cbd) {
+ uassertStatusOK(cbd.status);
+ std::vector<InsertStatement> docs;
+
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ if (_documentsToInsert.size() == 0) {
+ LOGV2_WARNING(4884507,
+ "insertDocumentsCallback, but no documents to insert",
+ "namespace"_attr = _sourceNss,
+ "tenantId"_attr = _tenantId);
+ return;
+ }
+ _documentsToInsert.swap(docs);
+ _stats.documentsCopied += docs.size();
+ ++_stats.insertedBatches;
+ _progressMeter.hit(int(docs.size()));
+ }
+
+ uassertStatusOK(getStorageInterface()->insertDocuments(cbd.opCtx, _sourceDbAndUuid, docs));
+
+ tenantMigrationHangDuringCollectionClone.executeIf(
+ [&](const BSONObj&) {
+ LOGV2(4884508,
+ "initial sync - tenantMigrationHangDuringCollectionClone fail point "
+ "enabled. Blocking until fail point is disabled",
+ "namespace"_attr = _sourceNss.ns(),
+ "tenantId"_attr = _tenantId);
+ while (MONGO_unlikely(tenantMigrationHangDuringCollectionClone.shouldFail()) &&
+ !mustExit()) {
+ mongo::sleepsecs(1);
+ }
+ },
+ [&](const BSONObj& data) {
+ return data["namespace"].String() == _sourceNss.ns() &&
+ static_cast<int>(_stats.documentsCopied) >= data["numDocsToClone"].numberInt();
+ });
+}
+
+void TenantCollectionCloner::waitForDatabaseWorkToComplete() {
+ _dbWorkTaskRunner.join();
+}
+
+void TenantCollectionCloner::setMetadataReader() {
+ getClient()->setReplyMetadataReader(
+ [this](OperationContext* opCtx, const BSONObj& metadataObj, StringData source) {
+ auto readResult = rpc::ReplSetMetadata::readFromMetadata(metadataObj);
+ if (!readResult.isOK()) {
+ return readResult.getStatus().withContext(
+ "tenant collection cloner failed to read repl set metadata");
+ }
+ this->setLastVisibleOpTime(readResult.getValue().getLastOpVisible());
+ return Status::OK();
+ });
+}
+
+void TenantCollectionCloner::unsetMetadataReader() {
+ getClient()->setReplyMetadataReader([this](OperationContext* opCtx,
+ const BSONObj& metadataObj,
+ StringData source) { return Status::OK(); });
+}
+
+bool TenantCollectionCloner::isMyFailPoint(const BSONObj& data) const {
+ auto nss = data["nss"].str();
+ return (nss.empty() || nss == _sourceNss.toString()) && BaseCloner::isMyFailPoint(data);
+}
+
TenantCollectionCloner::Stats TenantCollectionCloner::getStats() const {
stdx::lock_guard<Latch> lk(_mutex);
return _stats;
}
+std::string TenantCollectionCloner::Stats::toString() const {
+ return toBSON().toString();
+}
+
+BSONObj TenantCollectionCloner::Stats::toBSON() const {
+ BSONObjBuilder bob;
+ bob.append("ns", ns);
+ append(&bob);
+ return bob.obj();
+}
+
void TenantCollectionCloner::Stats::append(BSONObjBuilder* builder) const {
builder->appendNumber(kDocumentsToCopyFieldName, documentToCopy);
builder->appendNumber(kDocumentsCopiedFieldName, documentsCopied);
@@ -89,5 +412,10 @@ void TenantCollectionCloner::Stats::append(BSONObjBuilder* builder) const {
builder->appendNumber("receivedBatches", receivedBatches);
}
+Timestamp TenantCollectionCloner::getOperationTime_forTest() {
+ return _operationTime;
+}
+
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_collection_cloner.h b/src/mongo/db/repl/tenant_collection_cloner.h
index ee8c9bf2ced..2dd94cb2191 100644
--- a/src/mongo/db/repl/tenant_collection_cloner.h
+++ b/src/mongo/db/repl/tenant_collection_cloner.h
@@ -59,31 +59,152 @@ public:
void append(BSONObjBuilder* builder) const;
};
+ /**
+ * Type of function to schedule storage interface tasks with the executor.
+ *
+ * Used for testing only.
+ */
+ using ScheduleDbWorkFn = unique_function<StatusWith<executor::TaskExecutor::CallbackHandle>(
+ executor::TaskExecutor::CallbackFn)>;
+
TenantCollectionCloner(const NamespaceString& ns,
const CollectionOptions& collectionOptions,
InitialSyncSharedData* sharedData,
const HostAndPort& source,
DBClientConnection* client,
StorageInterface* storageInterface,
- ThreadPool* dbPool);
+ ThreadPool* dbPool,
+ StringData tenantId);
virtual ~TenantCollectionCloner() = default;
Stats getStats() const;
+ std::string toString() const;
+
+ NamespaceString getSourceNss() const {
+ return _sourceNss;
+ }
+ UUID getSourceUuid() const {
+ return *_sourceDbAndUuid.uuid();
+ }
+
+ /**
+ * Set the cloner batch size.
+ *
+ * Used for testing only. Set by server parameter 'collectionClonerBatchSize' in normal
+ * operation.
+ */
+ void setBatchSize_forTest(int batchSize) {
+ _collectionClonerBatchSize = batchSize;
+ }
+
+ /**
+ * Overrides how executor schedules database work.
+ *
+ * For testing only.
+ */
+ void setScheduleDbWorkFn_forTest(ScheduleDbWorkFn scheduleDbWorkFn) {
+ _scheduleDbWorkFn = std::move(scheduleDbWorkFn);
+ }
+
+ Timestamp getOperationTime_forTest();
+
protected:
ClonerStages getStages() final;
+ bool isMyFailPoint(const BSONObj& data) const final;
+
private:
+ friend class TenantCollectionClonerTest;
+ friend class TenantCollectionClonerStage;
+
+ class TenantCollectionClonerStage : public ClonerStage<TenantCollectionCloner> {
+ public:
+ TenantCollectionClonerStage(std::string name,
+ TenantCollectionCloner* cloner,
+ ClonerRunFn stageFunc)
+ : ClonerStage<TenantCollectionCloner>(name, cloner, stageFunc) {}
+ AfterStageBehavior run() override;
+
+ bool isTransientError(const Status& status) override {
+ // Always abort on error.
+ return false;
+ }
+ };
+
std::string describeForFuzzer(BaseClonerStage* stage) const final {
return _sourceNss.db() + " db: { " + stage->getName() + ": UUID(\"" +
_sourceDbAndUuid.uuid()->toString() + "\") coll: " + _sourceNss.coll() + " }";
}
/**
- * Temporary no-op stage.
+ * The preStage sets the start time in _stats.
*/
- AfterStageBehavior placeholderStage();
+ void preStage() final;
+
+ /**
+ * The postStage sets the end time in _stats.
+ */
+ void postStage() final;
+
+ /**
+ * Stage function that counts the number of documents in the collection on the source in order
+ * to generate progress information.
+ */
+ AfterStageBehavior countStage();
+
+ /**
+ * Stage function that gets the index information of the collection on the source to re-create
+ * it.
+ */
+ AfterStageBehavior listIndexesStage();
+
+ /**
+ * Stage function that creates the collection using the storageInterface. This stage does not
+ * actually contact the sync source.
+ */
+ AfterStageBehavior createCollectionStage();
+
+ /**
+ * Stage function that executes a query to retrieve all documents in the collection. For each
+ * batch returned by the upstream node, handleNextBatch will be called with the data. This
+ * stage will finish when the entire query is finished or failed.
+ */
+ AfterStageBehavior queryStage();
+
+ /**
+ * Put all results from a query batch into a buffer to be inserted, and schedule
+ * it to be inserted.
+ */
+ void handleNextBatch(DBClientCursorBatchIterator& iter);
+
+ /**
+ * Called whenever there is a new batch of documents ready from the DBClientConnection.
+ */
+ void insertDocumentsCallback(const executor::TaskExecutor::CallbackArgs& cbd);
+
+ /**
+ * Sends a query command to the source.
+ */
+ void runQuery();
+
+ /**
+ * Waits for any database work to finish or fail.
+ */
+ void waitForDatabaseWorkToComplete();
+
+ /**
+ * Sets up tracking the lastVisibleOpTime from response metadata.
+ */
+ void setMetadataReader();
+ void unsetMetadataReader();
+ void setLastVisibleOpTime(OpTime opTime) {
+ _lastVisibleOpTime = opTime;
+ }
+ OpTime getLastVisibleOpTime() {
+ return _lastVisibleOpTime;
+ }
// All member variables are labeled with one of the following codes indicating the
// synchronization rules for accessing them.
@@ -96,10 +217,36 @@ private:
const CollectionOptions _collectionOptions; // (R)
// Despite the type name, this member must always contain a UUID.
NamespaceStringOrUUID _sourceDbAndUuid; // (R)
-
- ClonerStage<TenantCollectionCloner> _placeholderStage; // (R)
-
- Stats _stats; // (M)
+ // The size of the batches of documents returned in collection cloning.
+ int _collectionClonerBatchSize; // (R)
+
+ TenantCollectionClonerStage _countStage; // (R)
+ TenantCollectionClonerStage _listIndexesStage; // (R)
+ TenantCollectionClonerStage _createCollectionStage; // (R)
+ TenantCollectionClonerStage _queryStage; // (R)
+
+ ProgressMeter _progressMeter; // (X) progress meter for this instance.
+ std::vector<BSONObj> _readyIndexSpecs; // (X) Except for _id_
+ BSONObj _idIndexSpec; // (X)
+ // Function for scheduling database work using the executor.
+ ScheduleDbWorkFn _scheduleDbWorkFn; // (R)
+ // Documents read from source to insert.
+ std::vector<InsertStatement> _documentsToInsert; // (M)
+ Stats _stats; // (M)
+ // We put _dbWorkTaskRunner after anything the database threads depend on to ensure it is
+ // only destroyed after those threads exit.
+ TaskRunner _dbWorkTaskRunner; // (R)
+
+ // TODO(SERVER-49780): Move this into TenantMigrationSharedData.
+ OpTime _lastVisibleOpTime; // (X)
+
+ // The database name prefix of the tenant associated with this migration.
+ // TODO(SERVER-49780): Consider moving this into TenantMigrationSharedData.
+ std::string _tenantId; // (R)
+
+ // The operationTime returned with the listIndexes result.
+ // TODO(SERVER-49780): Consider moving this into TenantMigrationSharedData.
+ Timestamp _operationTime; // (X)
};
} // namespace repl
diff --git a/src/mongo/db/repl/tenant_collection_cloner_test.cpp b/src/mongo/db/repl/tenant_collection_cloner_test.cpp
new file mode 100644
index 00000000000..d365f098178
--- /dev/null
+++ b/src/mongo/db/repl/tenant_collection_cloner_test.cpp
@@ -0,0 +1,574 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <vector>
+
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/db/repl/cloner_test_fixture.h"
+#include "mongo/db/repl/storage_interface.h"
+#include "mongo/db/repl/storage_interface_mock.h"
+#include "mongo/db/repl/tenant_collection_cloner.h"
+#include "mongo/db/service_context_test_fixture.h"
+#include "mongo/dbtests/mock/mock_dbclient_connection.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/concurrency/thread_pool.h"
+
+namespace mongo {
+namespace repl {
+
+class MockCallbackState final : public mongo::executor::TaskExecutor::CallbackState {
+public:
+ MockCallbackState() = default;
+ void cancel() override {}
+ void waitForCompletion() override {}
+ bool isCanceled() const override {
+ return false;
+ }
+};
+
+class TenantCollectionClonerTest : public ClonerTestFixture {
+public:
+ TenantCollectionClonerTest() {}
+
+protected:
+ void setUp() override {
+ ClonerTestFixture::setUp();
+ _standardCreateCollectionFn = [this](OperationContext* opCtx,
+ const NamespaceString& nss,
+ const CollectionOptions& options) -> Status {
+ this->_collCreated = true;
+ return Status::OK();
+ };
+ _storageInterface.createCollFn = _standardCreateCollectionFn;
+ _standardCreateIndexesOnEmptyCollectionFn =
+ [this](OperationContext* opCtx,
+ const NamespaceString& nss,
+ const std::vector<BSONObj>& secondaryIndexSpecs) -> Status {
+ this->_numSecondaryIndexesCreated += secondaryIndexSpecs.size();
+ return Status::OK();
+ };
+ _storageInterface.createIndexesOnEmptyCollFn = _standardCreateIndexesOnEmptyCollectionFn;
+ _storageInterface.insertDocumentsFn = [this](OperationContext* opCtx,
+ const NamespaceStringOrUUID& nsOrUUID,
+ const std::vector<InsertStatement>& ops) {
+ this->_numDocsInserted += ops.size();
+ return Status::OK();
+ };
+
+ _mockServer->assignCollectionUuid(_nss.ns(), _collUuid);
+ _mockServer->setCommandReply("replSetGetRBID",
+ BSON("ok" << 1 << "rbid" << _sharedData->getRollBackId()));
+ _mockClient->setOperationTime(_operationTime);
+ }
+ std::unique_ptr<TenantCollectionCloner> makeCollectionCloner(
+ CollectionOptions options = CollectionOptions()) {
+ options.uuid = _collUuid;
+ _options = options;
+ return std::make_unique<TenantCollectionCloner>(_nss,
+ options,
+ _sharedData.get(),
+ _source,
+ _mockClient.get(),
+ &_storageInterface,
+ _dbWorkThreadPool.get(),
+ _tenantId);
+ }
+
+ BSONObj createFindResponse(ErrorCodes::Error code = ErrorCodes::OK) {
+ BSONObjBuilder bob;
+ if (code != ErrorCodes::OK) {
+ bob.append("ok", 0);
+ bob.append("code", code);
+ } else {
+ bob.append("ok", 1);
+ }
+ return bob.obj();
+ }
+
+ ProgressMeter& getProgressMeter(TenantCollectionCloner* cloner) {
+ return cloner->_progressMeter;
+ }
+
+ std::vector<BSONObj> getIndexSpecs(TenantCollectionCloner* cloner) {
+ return cloner->_readyIndexSpecs;
+ }
+
+ BSONObj& getIdIndexSpec(TenantCollectionCloner* cloner) {
+ return cloner->_idIndexSpec;
+ }
+
+ StorageInterfaceMock::CreateCollectionFn _standardCreateCollectionFn;
+ StorageInterfaceMock::CreateIndexesOnEmptyCollectionFn
+ _standardCreateIndexesOnEmptyCollectionFn;
+ bool _collCreated = false;
+ size_t _numSecondaryIndexesCreated{0};
+ size_t _numDocsInserted{0};
+ CollectionOptions _options;
+
+ UUID _collUuid = UUID::gen();
+ BSONObj _idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_");
+
+ std::vector<BSONObj> _secondaryIndexSpecs{BSON("v" << 1 << "key" << BSON("a" << 1) << "name"
+ << "a_1"),
+ BSON("v" << 1 << "key" << BSON("b" << 1) << "name"
+ << "b_1")};
+ static std::string _tenantId;
+ static NamespaceString _nss;
+ static Timestamp _operationTime;
+};
+
+/* static */
+std::string TenantCollectionClonerTest::_tenantId = "tenant42";
+NamespaceString TenantCollectionClonerTest::_nss = {_tenantId + "_testDb", "testcoll"};
+Timestamp TenantCollectionClonerTest::_operationTime = Timestamp(12345, 42);
+
+
+TEST_F(TenantCollectionClonerTest, CountStage) {
+ auto cloner = makeCollectionCloner();
+ cloner->setStopAfterStage_forTest("count");
+ _mockServer->setCommandReply("count", createCountResponse(100));
+ ASSERT_OK(cloner->run());
+ ASSERT_EQ(100, getProgressMeter(cloner.get()).total());
+}
+
+// On a negative count, the CollectionCloner should use a zero count.
+TEST_F(TenantCollectionClonerTest, CountStageNegativeCount) {
+ auto cloner = makeCollectionCloner();
+ cloner->setStopAfterStage_forTest("count");
+ _mockServer->setCommandReply("count", createCountResponse(-100));
+ ASSERT_OK(cloner->run());
+ ASSERT_EQ(0, getProgressMeter(cloner.get()).total());
+}
+
+TEST_F(TenantCollectionClonerTest, CollectionClonerPassesThroughNonRetriableErrorFromCountCommand) {
+ auto cloner = makeCollectionCloner();
+ _mockServer->setCommandReply("count", Status(ErrorCodes::OperationFailed, ""));
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, cloner->run());
+}
+
+TEST_F(TenantCollectionClonerTest,
+ CollectionClonerReturnsNoSuchKeyOnMissingDocumentCountFieldName) {
+ auto cloner = makeCollectionCloner();
+ cloner->setStopAfterStage_forTest("count");
+ _mockServer->setCommandReply("count", BSON("ok" << 1));
+ auto status = cloner->run();
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey, status);
+}
+
+TEST_F(TenantCollectionClonerTest, ListIndexesReturnedNoIndexes) {
+ auto cloner = makeCollectionCloner();
+ cloner->setStopAfterStage_forTest("listIndexes");
+ _mockServer->setCommandReply("count", createCountResponse(1));
+ _mockServer->setCommandReply("listIndexes", createCursorResponse(_nss.ns(), BSONArray()));
+ _mockServer->setCommandReply("find", createFindResponse());
+ ASSERT_OK(cloner->run());
+ ASSERT(getIdIndexSpec(cloner.get()).isEmpty());
+ ASSERT(getIndexSpecs(cloner.get()).empty());
+ ASSERT_EQ(0, cloner->getStats().indexes);
+}
+
+TEST_F(TenantCollectionClonerTest, ListIndexesHasResults) {
+ auto cloner = makeCollectionCloner();
+ cloner->setStopAfterStage_forTest("listIndexes");
+ _mockServer->setCommandReply("count", createCountResponse(1));
+ _mockServer->setCommandReply(
+ "listIndexes",
+ createCursorResponse(
+ _nss.ns(),
+ BSON_ARRAY(_secondaryIndexSpecs[0] << _idIndexSpec << _secondaryIndexSpecs[1])));
+ _mockServer->setCommandReply("find", createFindResponse());
+ ASSERT_OK(cloner->run());
+ ASSERT_BSONOBJ_EQ(_idIndexSpec, getIdIndexSpec(cloner.get()));
+ ASSERT_EQ(2, getIndexSpecs(cloner.get()).size());
+ ASSERT_BSONOBJ_EQ(_secondaryIndexSpecs[0], getIndexSpecs(cloner.get())[0]);
+ ASSERT_BSONOBJ_EQ(_secondaryIndexSpecs[1], getIndexSpecs(cloner.get())[1]);
+ ASSERT_EQ(3, cloner->getStats().indexes);
+}
+
+TEST_F(TenantCollectionClonerTest, ListIndexesNonRetriableError) {
+ auto cloner = makeCollectionCloner();
+ _mockServer->setCommandReply("count", createCountResponse(1));
+ _mockServer->setCommandReply("listIndexes", Status(ErrorCodes::OperationFailed, ""));
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, cloner->run());
+}
+
+TEST_F(TenantCollectionClonerTest, ListIndexesRemoteUnreachableBeforeMajorityFind) {
+ auto cloner = makeCollectionCloner();
+ _mockServer->setCommandReply("count", createCountResponse(1));
+ _mockServer->setCommandReply("listIndexes", createCursorResponse(_nss.ns(), BSONArray()));
+
+ auto clonerOperationTimeFP =
+ globalFailPointRegistry().find("tenantCollectionClonerHangAfterGettingOperationTime");
+ auto timesEntered = clonerOperationTimeFP->setMode(FailPoint::alwaysOn, 0);
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ ASSERT_NOT_OK(cloner->run());
+ });
+ // Wait for the failpoint to be reached
+ clonerOperationTimeFP->waitForTimesEntered(timesEntered + 1);
+ _mockServer->shutdown();
+
+ // Finish test
+ clonerOperationTimeFP->setMode(FailPoint::off, 0);
+ clonerThread.join();
+}
+
+TEST_F(TenantCollectionClonerTest, ListIndexesRecordsCorrectOperationTime) {
+ auto cloner = makeCollectionCloner();
+ _mockServer->setCommandReply("count", createCountResponse(1));
+ _mockServer->setCommandReply("listIndexes", createCursorResponse(_nss.ns(), BSONArray()));
+ _mockServer->setCommandReply("find", createFindResponse());
+
+ auto clonerOperationTimeFP =
+ globalFailPointRegistry().find("tenantCollectionClonerHangAfterGettingOperationTime");
+ auto timesEntered = clonerOperationTimeFP->setMode(FailPoint::alwaysOn, 0);
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ ASSERT_OK(cloner->run());
+ });
+ // Wait for the failpoint to be reached
+ clonerOperationTimeFP->waitForTimesEntered(timesEntered + 1);
+ ASSERT_EQUALS(_operationTime, cloner->getOperationTime_forTest());
+
+ // Finish test
+ clonerOperationTimeFP->setMode(FailPoint::off, 0);
+ clonerThread.join();
+}
+
+TEST_F(TenantCollectionClonerTest, BeginCollection) {
+ NamespaceString collNss;
+ CollectionOptions collOptions;
+ BSONObj collIdIndexSpec;
+ std::vector<BSONObj> collSecondaryIndexSpecs;
+
+ _storageInterface.createCollFn =
+ [&](OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options) {
+ collNss = nss;
+ collOptions = options;
+ return _standardCreateCollectionFn(opCtx, nss, options);
+ };
+
+ _storageInterface.createIndexesOnEmptyCollFn =
+ [&](OperationContext* opCtx,
+ const NamespaceString& nss,
+ const std::vector<BSONObj>& secondaryIndexSpecs) {
+ collSecondaryIndexSpecs = secondaryIndexSpecs;
+ return _standardCreateIndexesOnEmptyCollectionFn(opCtx, nss, secondaryIndexSpecs);
+ };
+
+ auto cloner = makeCollectionCloner();
+ cloner->setStopAfterStage_forTest("createCollection");
+ _mockServer->setCommandReply("count", createCountResponse(1));
+ BSONArrayBuilder indexSpecs;
+ indexSpecs.append(_idIndexSpec);
+ for (const auto& secondaryIndexSpec : _secondaryIndexSpecs) {
+ indexSpecs.append(secondaryIndexSpec);
+ }
+ _mockServer->setCommandReply("listIndexes", createCursorResponse(_nss.ns(), indexSpecs.arr()));
+ _mockServer->setCommandReply("find", createFindResponse());
+
+ ASSERT_EQUALS(Status::OK(), cloner->run());
+
+ ASSERT_EQUALS(_nss.ns(), collNss.ns());
+ ASSERT_BSONOBJ_EQ(_options.toBSON(), collOptions.toBSON());
+ ASSERT_EQUALS(_secondaryIndexSpecs.size(), collSecondaryIndexSpecs.size());
+ for (std::vector<BSONObj>::size_type i = 0; i < _secondaryIndexSpecs.size(); ++i) {
+ ASSERT_BSONOBJ_EQ(_secondaryIndexSpecs[i], collSecondaryIndexSpecs[i]);
+ }
+}
+
+TEST_F(TenantCollectionClonerTest, BeginCollectionFailed) {
+ _storageInterface.createCollFn =
+ [&](OperationContext* opCtx, const NamespaceString& nss, const CollectionOptions& options) {
+ return Status(ErrorCodes::OperationFailed, "");
+ };
+
+ auto cloner = makeCollectionCloner();
+ cloner->setStopAfterStage_forTest("createCollection");
+ _mockServer->setCommandReply("count", createCountResponse(1));
+ _mockServer->setCommandReply("listIndexes", createCursorResponse(_nss.ns(), BSONArray()));
+ _mockServer->setCommandReply("find", createFindResponse());
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, cloner->run());
+}
+
+TEST_F(TenantCollectionClonerTest, InsertDocumentsSingleBatch) {
+ // Set up data for preliminary stages
+ _mockServer->setCommandReply("count", createCountResponse(2));
+ _mockServer->setCommandReply("listIndexes",
+ createCursorResponse(_nss.ns(), BSON_ARRAY(_idIndexSpec)));
+ _mockServer->setCommandReply("find", createFindResponse());
+
+ // Set up documents to be returned from upstream node.
+ _mockServer->insert(_nss.ns(), BSON("_id" << 1));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 2));
+
+ auto cloner = makeCollectionCloner();
+ ASSERT_OK(cloner->run());
+
+ ASSERT_EQUALS(2, _numDocsInserted);
+
+ auto stats = cloner->getStats();
+ ASSERT_EQUALS(1u, stats.receivedBatches);
+}
+
+TEST_F(TenantCollectionClonerTest, InsertDocumentsMultipleBatches) {
+ // Set up data for preliminary stages
+ _mockServer->setCommandReply("count", createCountResponse(5));
+ _mockServer->setCommandReply("listIndexes",
+ createCursorResponse(_nss.ns(), BSON_ARRAY(_idIndexSpec)));
+ _mockServer->setCommandReply("find", createFindResponse());
+
+ // Set up documents to be returned from upstream node.
+ _mockServer->insert(_nss.ns(), BSON("_id" << 1));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 2));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 3));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 4));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 5));
+
+ auto cloner = makeCollectionCloner();
+ cloner->setBatchSize_forTest(2);
+ ASSERT_OK(cloner->run());
+
+ ASSERT_EQUALS(5, _numDocsInserted);
+
+ auto stats = cloner->getStats();
+ ASSERT_EQUALS(3u, stats.receivedBatches);
+}
+
+TEST_F(TenantCollectionClonerTest, InsertDocumentsScheduleDBWorkFailed) {
+ // Set up data for preliminary stages
+ _mockServer->setCommandReply("count", createCountResponse(3));
+ _mockServer->setCommandReply("listIndexes",
+ createCursorResponse(_nss.ns(), BSON_ARRAY(_idIndexSpec)));
+ _mockServer->setCommandReply("find", createFindResponse());
+
+ // Set up documents to be returned from upstream node.
+ _mockServer->insert(_nss.ns(), BSON("_id" << 1));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 2));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 3));
+
+ auto cloner = makeCollectionCloner();
+ // Stop before running the query to set up the failure.
+ auto collClonerBeforeFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
+ auto timesEntered = collClonerBeforeFailPoint->setMode(
+ FailPoint::alwaysOn,
+ 0,
+ fromjson("{cloner: 'TenantCollectionCloner', stage: 'query', nss: '" + _nss.ns() + "'}"));
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ ASSERT_EQUALS(ErrorCodes::UnknownError, cloner->run());
+ });
+ // Wait for the failpoint to be reached
+ collClonerBeforeFailPoint->waitForTimesEntered(timesEntered + 1);
+ // Replace scheduleDbWork function so that cloner will fail to schedule DB work after
+ // getting documents.
+ cloner->setScheduleDbWorkFn_forTest([](const executor::TaskExecutor::CallbackFn& workFn) {
+ return StatusWith<executor::TaskExecutor::CallbackHandle>(ErrorCodes::UnknownError, "");
+ });
+
+ // Continue and finish. Final status is checked in the thread.
+ collClonerBeforeFailPoint->setMode(FailPoint::off, 0);
+ clonerThread.join();
+}
+
+TEST_F(TenantCollectionClonerTest, InsertDocumentsCallbackCanceled) {
+ // Set up data for preliminary stages
+ _mockServer->setCommandReply("count", createCountResponse(3));
+ _mockServer->setCommandReply("listIndexes",
+ createCursorResponse(_nss.ns(), BSON_ARRAY(_idIndexSpec)));
+ _mockServer->setCommandReply("find", createFindResponse());
+
+ // Set up documents to be returned from upstream node.
+ _mockServer->insert(_nss.ns(), BSON("_id" << 1));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 2));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 3));
+
+ auto cloner = makeCollectionCloner();
+ // Stop before running the query to set up the failure.
+ auto collClonerBeforeFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
+ auto timesEntered = collClonerBeforeFailPoint->setMode(
+ FailPoint::alwaysOn,
+ 0,
+ fromjson("{cloner: 'TenantCollectionCloner', stage: 'query', nss: '" + _nss.ns() + "'}"));
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ ASSERT_EQUALS(ErrorCodes::CallbackCanceled, cloner->run());
+ });
+ // Wait for the failpoint to be reached
+ collClonerBeforeFailPoint->waitForTimesEntered(timesEntered + 1);
+ // Replace scheduleDbWork function so that cloner will fail to schedule DB work after
+ // getting documents.
+ cloner->setScheduleDbWorkFn_forTest([&](const executor::TaskExecutor::CallbackFn& workFn) {
+ executor::TaskExecutor::CallbackHandle handle(std::make_shared<MockCallbackState>());
+ mongo::executor::TaskExecutor::CallbackArgs args{
+ nullptr,
+ handle,
+ {ErrorCodes::CallbackCanceled, "Never run, but treat like cancelled."}};
+ workFn(args);
+ return StatusWith<executor::TaskExecutor::CallbackHandle>(handle);
+ });
+
+ // Continue and finish. Final status is checked in the thread.
+ collClonerBeforeFailPoint->setMode(FailPoint::off, 0);
+ clonerThread.join();
+}
+
+TEST_F(TenantCollectionClonerTest, InsertDocumentsFailed) {
+ // Set up data for preliminary stages
+ _mockServer->setCommandReply("count", createCountResponse(3));
+ _mockServer->setCommandReply("listIndexes",
+ createCursorResponse(_nss.ns(), BSON_ARRAY(_idIndexSpec)));
+ _mockServer->setCommandReply("find", createFindResponse());
+
+ // Set up documents to be returned from upstream node.
+ _mockServer->insert(_nss.ns(), BSON("_id" << 1));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 2));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 3));
+
+ auto cloner = makeCollectionCloner();
+ // Stop before running the query to set up the failure.
+ auto collClonerBeforeFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
+ auto timesEntered = collClonerBeforeFailPoint->setMode(
+ FailPoint::alwaysOn,
+ 0,
+ fromjson("{cloner: 'TenantCollectionCloner', stage: 'query', nss: '" + _nss.ns() + "'}"));
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, cloner->run());
+ });
+
+ // Wait for the failpoint to be reached
+ collClonerBeforeFailPoint->waitForTimesEntered(timesEntered + 1);
+
+ // Make the insertDocuments fail.
+ _storageInterface.insertDocumentsFn = [this](OperationContext* opCtx,
+ const NamespaceStringOrUUID& nsOrUUID,
+ const std::vector<InsertStatement>& ops) {
+ return Status(ErrorCodes::OperationFailed, "");
+ };
+
+
+ // Continue and finish. Final status is checked in the thread.
+ collClonerBeforeFailPoint->setMode(FailPoint::off, 0);
+ clonerThread.join();
+}
+
+TEST_F(TenantCollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) {
+ NamespaceString collNss;
+ CollectionOptions collOptions;
+ // We initialize collIndexSpecs with fake information to ensure it is overwritten by an empty
+ // vector.
+ std::vector<BSONObj> collIndexSpecs{BSON("fakeindexkeys" << 1)};
+ _storageInterface.createCollFn = [&, this](OperationContext* opCtx,
+ const NamespaceString& nss,
+ const CollectionOptions& options) -> Status {
+ collNss = nss;
+ collOptions = options;
+ return _standardCreateCollectionFn(opCtx, nss, options);
+ };
+
+ _storageInterface.createIndexesOnEmptyCollFn =
+ [&](OperationContext* opCtx,
+ const NamespaceString& nss,
+ const std::vector<BSONObj>& secondaryIndexSpecs) {
+ collIndexSpecs = secondaryIndexSpecs;
+ return _standardCreateIndexesOnEmptyCollectionFn(opCtx, nss, secondaryIndexSpecs);
+ };
+
+ const BSONObj doc = BSON("_id" << 1);
+ _mockServer->insert(_nss.ns(), doc);
+
+ _mockServer->setCommandReply("count", createCountResponse(1));
+ _mockServer->setCommandReply("listIndexes", createCursorResponse(_nss.ns(), BSONArray()));
+ _mockServer->setCommandReply("find", createFindResponse());
+
+ CollectionOptions options;
+ options.autoIndexId = CollectionOptions::NO;
+ auto cloner = makeCollectionCloner(options);
+ ASSERT_OK(cloner->run());
+ ASSERT_EQUALS(1, _numDocsInserted);
+ ASSERT_TRUE(_collCreated);
+ ASSERT_EQ(collOptions.autoIndexId, CollectionOptions::NO);
+ ASSERT_EQ(0UL, collIndexSpecs.size());
+ ASSERT_EQ(collNss, _nss);
+}
+
+TEST_F(TenantCollectionClonerTest, QueryFailure) {
+ // Set up data for preliminary stages
+ auto idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_");
+ _mockServer->setCommandReply("count", createCountResponse(3));
+ _mockServer->setCommandReply("listIndexes",
+ createCursorResponse(_nss.ns(), BSON_ARRAY(idIndexSpec)));
+ _mockServer->setCommandReply("find", createFindResponse());
+
+ auto beforeStageFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
+ auto timesEnteredBeforeStage = beforeStageFailPoint->setMode(
+ FailPoint::alwaysOn, 0, fromjson("{cloner: 'TenantCollectionCloner', stage: 'query'}"));
+
+ // Set up documents to be returned from upstream node.
+ _mockServer->insert(_nss.ns(), BSON("_id" << 1));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 2));
+ _mockServer->insert(_nss.ns(), BSON("_id" << 3));
+
+ auto cloner = makeCollectionCloner();
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ ASSERT_NOT_OK(cloner->run());
+ });
+
+ // Wait until we get to the query stage.
+ beforeStageFailPoint->waitForTimesEntered(timesEnteredBeforeStage + 1);
+
+ // Bring the server down.
+ _mockServer->shutdown();
+
+ // Let us begin with the query stage.
+ beforeStageFailPoint->setMode(FailPoint::off, 0);
+
+ clonerThread.join();
+}
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/tenant_database_cloner.cpp b/src/mongo/db/repl/tenant_database_cloner.cpp
index 8baee18daca..462b6b042da 100644
--- a/src/mongo/db/repl/tenant_database_cloner.cpp
+++ b/src/mongo/db/repl/tenant_database_cloner.cpp
@@ -53,10 +53,12 @@ TenantDatabaseCloner::TenantDatabaseCloner(const std::string& dbName,
const HostAndPort& source,
DBClientConnection* client,
StorageInterface* storageInterface,
- ThreadPool* dbPool)
+ ThreadPool* dbPool,
+ StringData tenantId)
: BaseCloner("TenantDatabaseCloner"_sd, sharedData, source, client, storageInterface, dbPool),
_dbName(dbName),
- _listCollectionsStage("listCollections", this, &TenantDatabaseCloner::listCollectionsStage) {
+ _listCollectionsStage("listCollections", this, &TenantDatabaseCloner::listCollectionsStage),
+ _tenantId(tenantId) {
invariant(!dbName.empty());
_stats.dbname = dbName;
}
@@ -72,15 +74,14 @@ void TenantDatabaseCloner::preStage() {
BaseCloner::AfterStageBehavior TenantDatabaseCloner::listCollectionsStage() {
// This will be set after a successful listCollections command.
- _operationTime = Timestamp(0, 0);
+ _operationTime = Timestamp();
- BSONObj res;
auto collectionInfos =
getClient()->getCollectionInfos(_dbName, ListCollectionsFilter::makeTypeCollectionFilter());
- // Do a speculative majority read on the sync source to make sure the collections listed
- // exist on a majority of nodes in the set. We do not check the rollbackId - rollback
- // would lead to the sync source closing connections so the stage would fail.
+ // Do a majority read on the sync source to make sure the collections listed exist on a majority
+ // of nodes in the set. We do not check the rollbackId - rollback would lead to the sync source
+ // closing connections so the stage would fail.
_operationTime = getClient()->getOperationTime();
tenantDatabaseClonerHangAfterGettingOperationTime.executeIf(
@@ -90,7 +91,8 @@ BaseCloner::AfterStageBehavior TenantDatabaseCloner::listCollectionsStage() {
LOGV2(4881605,
"tenantDatabaseClonerHangAfterGettingOperationTime fail point "
"enabled. Blocking until fail point is disabled",
- "dbName"_attr = _dbName);
+ "dbName"_attr = _dbName,
+ "tenantId"_attr = _tenantId);
mongo::sleepsecs(1);
}
},
@@ -126,10 +128,16 @@ BaseCloner::AfterStageBehavior TenantDatabaseCloner::listCollectionsStage() {
LOGV2_DEBUG(4881602,
1,
"Database cloner skipping 'system' collection",
- "namespace"_attr = collectionNamespace.ns());
+ "namespace"_attr = collectionNamespace.ns(),
+ "tenantId"_attr = _tenantId);
continue;
}
- LOGV2_DEBUG(4881603, 2, "Allowing cloning of collectionInfo", "info"_attr = info);
+ LOGV2_DEBUG(4881603,
+ 2,
+ "Allowing cloning of collectionInfo",
+ "info"_attr = info,
+ "db"_attr = _dbName,
+ "tenantId"_attr = _tenantId);
bool isDuplicate = seen.insert(result.getName().toString()).second;
uassert(4881604,
@@ -173,17 +181,22 @@ void TenantDatabaseCloner::postStage() {
getSource(),
getClient(),
getStorageInterface(),
- getDBPool());
+ getDBPool(),
+ _tenantId);
}
auto collStatus = _currentCollectionCloner->run();
if (collStatus.isOK()) {
- LOGV2_DEBUG(
- 4881600, 1, "Tenant collection clone finished", "namespace"_attr = sourceNss);
+ LOGV2_DEBUG(4881600,
+ 1,
+ "Tenant collection clone finished",
+ "namespace"_attr = sourceNss,
+ "tenantId"_attr = _tenantId);
} else {
LOGV2_ERROR(4881601,
"Tenant collection clone failed",
"namespace"_attr = sourceNss,
- "error"_attr = collStatus.toString());
+ "error"_attr = collStatus.toString(),
+ "tenantId"_attr = _tenantId);
setInitialSyncFailedStatus(
{collStatus.code(),
collStatus
diff --git a/src/mongo/db/repl/tenant_database_cloner.h b/src/mongo/db/repl/tenant_database_cloner.h
index 1684231ab0d..e0bf8f98ca9 100644
--- a/src/mongo/db/repl/tenant_database_cloner.h
+++ b/src/mongo/db/repl/tenant_database_cloner.h
@@ -57,7 +57,8 @@ public:
const HostAndPort& source,
DBClientConnection* client,
StorageInterface* storageInterface,
- ThreadPool* dbPool);
+ ThreadPool* dbPool,
+ StringData tenantId);
virtual ~TenantDatabaseCloner() = default;
@@ -123,6 +124,9 @@ private:
TenantDatabaseClonerStage _listCollectionsStage; // (R)
+ // The database name prefix of the tenant associated with this migration.
+ std::string _tenantId; // (R)
+
// The operationTime returned with the listCollections result.
Timestamp _operationTime; // (X)
diff --git a/src/mongo/db/repl/tenant_database_cloner_test.cpp b/src/mongo/db/repl/tenant_database_cloner_test.cpp
index 7342c368db9..3331178e080 100644
--- a/src/mongo/db/repl/tenant_database_cloner_test.cpp
+++ b/src/mongo/db/repl/tenant_database_cloner_test.cpp
@@ -43,9 +43,9 @@
namespace mongo {
namespace repl {
-struct CollectionCloneInfo {
- std::shared_ptr<CollectionMockStats> stats = std::make_shared<CollectionMockStats>();
- CollectionBulkLoaderMock* loader = nullptr;
+struct TenantCollectionCloneInfo {
+ size_t numDocsInserted{0};
+ bool collCreated = false;
};
class TenantDatabaseClonerTest : public ClonerTestFixture {
@@ -55,21 +55,26 @@ public:
protected:
void setUp() override {
ClonerTestFixture::setUp();
- _storageInterface.createCollectionForBulkFn =
- [this](const NamespaceString& nss,
- const CollectionOptions& options,
- const BSONObj& idIndexSpec,
- const std::vector<BSONObj>& secondaryIndexSpecs)
- -> StatusWith<std::unique_ptr<CollectionBulkLoaderMock>> {
+ _storageInterface.createCollFn = [this](OperationContext* opCtx,
+ const NamespaceString& nss,
+ const CollectionOptions& options) -> Status {
const auto collInfo = &_collections[nss];
-
- auto localLoader = std::make_unique<CollectionBulkLoaderMock>(collInfo->stats);
- auto status = localLoader->init(secondaryIndexSpecs);
- if (!status.isOK())
- return status;
- collInfo->loader = localLoader.get();
-
- return std::move(localLoader);
+ collInfo->collCreated = true;
+ collInfo->numDocsInserted = 0;
+ return Status::OK();
+ };
+ _storageInterface.createIndexesOnEmptyCollFn =
+ [this](OperationContext* opCtx,
+ const NamespaceString& nss,
+ const std::vector<BSONObj>& secondaryIndexSpecs) -> Status {
+ return Status::OK();
+ };
+ _storageInterface.insertDocumentsFn = [this](OperationContext* opCtx,
+ const NamespaceStringOrUUID& nsOrUUID,
+ const std::vector<InsertStatement>& ops) {
+ const auto collInfo = &_collections[nsOrUUID.nss().get()];
+ collInfo->numDocsInserted += ops.size();
+ return Status::OK();
};
setInitialSyncId();
_mockClient->setOperationTime(_operationTime);
@@ -81,7 +86,8 @@ protected:
_source,
_mockClient.get(),
&_storageInterface,
- _dbWorkThreadPool.get());
+ _dbWorkThreadPool.get(),
+ _tenantId);
}
BSONObj createListCollectionsResponse(const std::vector<BSONObj>& collections) {
@@ -119,14 +125,16 @@ protected:
return cloner->_collections;
}
- std::map<NamespaceString, CollectionCloneInfo> _collections;
+ std::map<NamespaceString, TenantCollectionCloneInfo> _collections;
+ static std::string _tenantId;
static std::string _dbName;
static Timestamp _operationTime;
};
/* static */
-std::string TenantDatabaseClonerTest::_dbName = "testDb";
+std::string TenantDatabaseClonerTest::_tenantId = "tenant42";
+std::string TenantDatabaseClonerTest::_dbName = _tenantId + "_testDb";
Timestamp TenantDatabaseClonerTest::_operationTime = Timestamp(12345, 42);
// A database may have no collections. Nothing to do for the tenant database cloner.
@@ -454,200 +462,185 @@ TEST_F(TenantDatabaseClonerTest, ListCollectionsRecordsCorrectOperationTime) {
clonerThread.join();
}
-// TODO(SERVER-48845): Restore the below tests.
-
-// TEST_F(TenantDatabaseClonerTest, FirstCollectionListIndexesFailed) {
-// auto uuid1 = UUID::gen();
-// auto uuid2 = UUID::gen();
-// const BSONObj idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
-// << "_id_");
-// const std::vector<BSONObj> sourceInfos = {BSON("name"
-// << "a"
-// << "type"
-// << "collection"
-// << "options" << BSONObj() << "info"
-// << BSON("readOnly" << false << "uuid" <<
-// uuid1)),
-// BSON(
-// "name"
-// << "b"
-// << "type"
-// << "collection"
-// << "options" << BSONObj() << "info"
-// << BSON("readOnly" << false << "uuid" <<
-// uuid2))};
-// _mockServer->setCommandReply("listCollections",
-// createListCollectionsResponse({sourceInfos[0],
-// sourceInfos[1]}));
-// _mockServer->setCommandReply("find", createFindResponse());
-// _mockServer->setCommandReply("count", {createCountResponse(0), createCountResponse(0)});
-// _mockServer->setCommandReply("listIndexes",
-// {BSON("ok" << 0 << "errmsg"
-// << "fake message"
-// << "code" << ErrorCodes::CursorNotFound),
-// createCursorResponse(_dbName + ".b",
-// BSON_ARRAY(idIndexSpec))});
-// auto cloner = makeDatabaseCloner();
-// auto status = cloner->run();
-// ASSERT_NOT_OK(status);
-
-// ASSERT_EQ(status.code(), ErrorCodes::InitialSyncFailure);
-// ASSERT_EQUALS(0u, _collections.size());
-// }
-
-// TEST_F(TenantDatabaseClonerTest, CreateCollections) {
-// auto uuid1 = UUID::gen();
-// auto uuid2 = UUID::gen();
-// const BSONObj idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
-// << "_id_");
-// const std::vector<BSONObj> sourceInfos = {BSON("name"
-// << "a"
-// << "type"
-// << "collection"
-// << "options" << BSONObj() << "info"
-// << BSON("readOnly" << false << "uuid" <<
-// uuid1)),
-// BSON(
-// "name"
-// << "b"
-// << "type"
-// << "collection"
-// << "options" << BSONObj() << "info"
-// << BSON("readOnly" << false << "uuid" <<
-// uuid2))};
-// _mockServer->setCommandReply("listCollections",
-// createListCollectionsResponse({sourceInfos[0],
-// sourceInfos[1]}));
-// _mockServer->setCommandReply("find", createFindResponse());
-// _mockServer->setCommandReply("count", {createCountResponse(0), createCountResponse(0)});
-// _mockServer->setCommandReply("listIndexes",
-// {createCursorResponse(_dbName + ".a", BSON_ARRAY(idIndexSpec)),
-// createCursorResponse(_dbName + ".b",
-// BSON_ARRAY(idIndexSpec))});
-// auto cloner = makeDatabaseCloner();
-// auto status = cloner->run();
-// ASSERT_OK(status);
-
-// ASSERT_EQUALS(2U, _collections.size());
-
-// auto collInfo = _collections[NamespaceString{_dbName, "a"}];
-// auto stats = *collInfo.stats;
-// ASSERT_EQUALS(0, stats.insertCount);
-// ASSERT(stats.commitCalled);
-
-// collInfo = _collections[NamespaceString{_dbName, "b"}];
-// stats = *collInfo.stats;
-// ASSERT_EQUALS(0, stats.insertCount);
-// ASSERT(stats.commitCalled);
-// }
-
-// TEST_F(TenantDatabaseClonerTest, DatabaseAndCollectionStats) {
-// auto uuid1 = UUID::gen();
-// auto uuid2 = UUID::gen();
-// const BSONObj idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
-// << "_id_");
-// const BSONObj extraIndexSpec = BSON("v" << 1 << "key" << BSON("x" << 1) << "name"
-// << "_extra_");
-// const std::vector<BSONObj> sourceInfos = {BSON("name"
-// << "a"
-// << "type"
-// << "collection"
-// << "options" << BSONObj() << "info"
-// << BSON("readOnly" << false << "uuid" <<
-// uuid1)),
-// BSON(
-// "name"
-// << "b"
-// << "type"
-// << "collection"
-// << "options" << BSONObj() << "info"
-// << BSON("readOnly" << false << "uuid" <<
-// uuid2))};
-// _mockServer->setCommandReply("listCollections",
-// createListCollectionsResponse({sourceInfos[0],
-// sourceInfos[1]}));
-// _mockServer->setCommandReply("find", createFindResponse());
-// _mockServer->setCommandReply("count", {createCountResponse(0), createCountResponse(0)});
-// _mockServer->setCommandReply(
-// "listIndexes",
-// {createCursorResponse(_dbName + ".a", BSON_ARRAY(idIndexSpec << extraIndexSpec)),
-// createCursorResponse(_dbName + ".b", BSON_ARRAY(idIndexSpec))});
-// auto cloner = makeDatabaseCloner();
-
-// auto collClonerBeforeFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
-// auto collClonerAfterFailPoint = globalFailPointRegistry().find("hangAfterClonerStage");
-// auto timesEntered = collClonerBeforeFailPoint->setMode(
-// FailPoint::alwaysOn,
-// 0,
-// fromjson("{cloner: 'CollectionCloner', stage: 'count', nss: '" + _dbName + ".a'}"));
-// collClonerAfterFailPoint->setMode(
-// FailPoint::alwaysOn,
-// 0,
-// fromjson("{cloner: 'CollectionCloner', stage: 'count', nss: '" + _dbName + ".a'}"));
-
-// // Run the cloner in a separate thread.
-// stdx::thread clonerThread([&] {
-// Client::initThread("ClonerRunner");
-// ASSERT_OK(cloner->run());
-// });
-// // Wait for the failpoint to be reached
-// collClonerBeforeFailPoint->waitForTimesEntered(timesEntered + 1);
-
-// // Collection stats should be set up with namespace.
-// auto stats = cloner->getStats();
-// ASSERT_EQ(_dbName, stats.dbname);
-// ASSERT_EQ(_clock.now(), stats.start);
-// ASSERT_EQ(2, stats.collections);
-// ASSERT_EQ(0, stats.clonedCollections);
-// ASSERT_EQ(2, stats.collectionStats.size());
-// ASSERT_EQ(_dbName + ".a", stats.collectionStats[0].ns);
-// ASSERT_EQ(_dbName + ".b", stats.collectionStats[1].ns);
-// ASSERT_EQ(_clock.now(), stats.collectionStats[0].start);
-// ASSERT_EQ(Date_t(), stats.collectionStats[0].end);
-// ASSERT_EQ(Date_t(), stats.collectionStats[1].start);
-// ASSERT_EQ(0, stats.collectionStats[0].indexes);
-// ASSERT_EQ(0, stats.collectionStats[1].indexes);
-// _clock.advance(Minutes(1));
-
-// // Move to the next collection
-// timesEntered = collClonerBeforeFailPoint->setMode(
-// FailPoint::alwaysOn,
-// 0,
-// fromjson("{cloner: 'CollectionCloner', stage: 'count', nss: '" + _dbName + ".b'}"));
-// collClonerAfterFailPoint->setMode(FailPoint::off);
-
-// // Wait for the failpoint to be reached
-// collClonerBeforeFailPoint->waitForTimesEntered(timesEntered + 1);
-
-// stats = cloner->getStats();
-// ASSERT_EQ(2, stats.collections);
-// ASSERT_EQ(1, stats.clonedCollections);
-// ASSERT_EQ(2, stats.collectionStats.size());
-// ASSERT_EQ(_dbName + ".a", stats.collectionStats[0].ns);
-// ASSERT_EQ(_dbName + ".b", stats.collectionStats[1].ns);
-// ASSERT_EQ(2, stats.collectionStats[0].indexes);
-// ASSERT_EQ(0, stats.collectionStats[1].indexes);
-// ASSERT_EQ(_clock.now(), stats.collectionStats[0].end);
-// ASSERT_EQ(_clock.now(), stats.collectionStats[1].start);
-// ASSERT_EQ(Date_t(), stats.collectionStats[1].end);
-// _clock.advance(Minutes(1));
-
-// // Finish
-// collClonerBeforeFailPoint->setMode(FailPoint::off, 0);
-// clonerThread.join();
-
-// stats = cloner->getStats();
-// ASSERT_EQ(_dbName, stats.dbname);
-// ASSERT_EQ(_clock.now(), stats.end);
-// ASSERT_EQ(2, stats.collections);
-// ASSERT_EQ(2, stats.clonedCollections);
-// ASSERT_EQ(2, stats.collectionStats.size());
-// ASSERT_EQ(_dbName + ".a", stats.collectionStats[0].ns);
-// ASSERT_EQ(_dbName + ".b", stats.collectionStats[1].ns);
-// ASSERT_EQ(2, stats.collectionStats[0].indexes);
-// ASSERT_EQ(1, stats.collectionStats[1].indexes);
-// ASSERT_EQ(_clock.now(), stats.collectionStats[1].end);
-// }
+TEST_F(TenantDatabaseClonerTest, FirstCollectionListIndexesFailed) {
+ auto uuid1 = UUID::gen();
+ auto uuid2 = UUID::gen();
+ const BSONObj idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_");
+ const std::vector<BSONObj> sourceInfos = {BSON("name"
+ << "a"
+ << "type"
+ << "collection"
+ << "options" << BSONObj() << "info"
+ << BSON("readOnly" << false << "uuid" << uuid1)),
+ BSON(
+ "name"
+ << "b"
+ << "type"
+ << "collection"
+ << "options" << BSONObj() << "info"
+ << BSON("readOnly" << false << "uuid" << uuid2))};
+ _mockServer->setCommandReply("listCollections",
+ createListCollectionsResponse({sourceInfos[0], sourceInfos[1]}));
+ _mockServer->setCommandReply("find", createFindResponse());
+ _mockServer->setCommandReply("count", {createCountResponse(0), createCountResponse(0)});
+ _mockServer->setCommandReply("listIndexes",
+ {BSON("ok" << 0 << "errmsg"
+ << "fake message"
+ << "code" << ErrorCodes::CursorNotFound),
+ createCursorResponse(_dbName + ".b", BSON_ARRAY(idIndexSpec))});
+ auto cloner = makeDatabaseCloner();
+ auto status = cloner->run();
+ ASSERT_NOT_OK(status);
+
+ ASSERT_EQ(status.code(), ErrorCodes::CursorNotFound);
+ ASSERT_EQUALS(0u, _collections.size());
+}
+
+TEST_F(TenantDatabaseClonerTest, CreateCollections) {
+ auto uuid1 = UUID::gen();
+ auto uuid2 = UUID::gen();
+ const BSONObj idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_");
+ const std::vector<BSONObj> sourceInfos = {BSON("name"
+ << "a"
+ << "type"
+ << "collection"
+ << "options" << BSONObj() << "info"
+ << BSON("readOnly" << false << "uuid" << uuid1)),
+ BSON(
+ "name"
+ << "b"
+ << "type"
+ << "collection"
+ << "options" << BSONObj() << "info"
+ << BSON("readOnly" << false << "uuid" << uuid2))};
+ _mockServer->setCommandReply("listCollections",
+ createListCollectionsResponse({sourceInfos[0], sourceInfos[1]}));
+ _mockServer->setCommandReply("find", createFindResponse());
+ _mockServer->setCommandReply("count", {createCountResponse(0), createCountResponse(0)});
+ _mockServer->setCommandReply("listIndexes",
+ {createCursorResponse(_dbName + ".a", BSON_ARRAY(idIndexSpec)),
+ createCursorResponse(_dbName + ".b", BSON_ARRAY(idIndexSpec))});
+ auto cloner = makeDatabaseCloner();
+ auto status = cloner->run();
+ ASSERT_OK(status);
+
+ ASSERT_EQUALS(2U, _collections.size());
+
+ auto collInfo = _collections[NamespaceString{_dbName, "a"}];
+ ASSERT(collInfo.collCreated);
+ ASSERT_EQUALS(0, collInfo.numDocsInserted);
+
+ collInfo = _collections[NamespaceString{_dbName, "b"}];
+ ASSERT(collInfo.collCreated);
+ ASSERT_EQUALS(0, collInfo.numDocsInserted);
+}
+
+TEST_F(TenantDatabaseClonerTest, DatabaseAndCollectionStats) {
+ auto uuid1 = UUID::gen();
+ auto uuid2 = UUID::gen();
+ const BSONObj idIndexSpec = BSON("v" << 1 << "key" << BSON("_id" << 1) << "name"
+ << "_id_");
+ const BSONObj extraIndexSpec = BSON("v" << 1 << "key" << BSON("x" << 1) << "name"
+ << "_extra_");
+ const std::vector<BSONObj> sourceInfos = {BSON("name"
+ << "a"
+ << "type"
+ << "collection"
+ << "options" << BSONObj() << "info"
+ << BSON("readOnly" << false << "uuid" << uuid1)),
+ BSON(
+ "name"
+ << "b"
+ << "type"
+ << "collection"
+ << "options" << BSONObj() << "info"
+ << BSON("readOnly" << false << "uuid" << uuid2))};
+ _mockServer->setCommandReply("listCollections",
+ createListCollectionsResponse({sourceInfos[0], sourceInfos[1]}));
+ _mockServer->setCommandReply("find", createFindResponse());
+ _mockServer->setCommandReply("count", {createCountResponse(0), createCountResponse(0)});
+ _mockServer->setCommandReply(
+ "listIndexes",
+ {createCursorResponse(_dbName + ".a", BSON_ARRAY(idIndexSpec << extraIndexSpec)),
+ createCursorResponse(_dbName + ".b", BSON_ARRAY(idIndexSpec))});
+ auto cloner = makeDatabaseCloner();
+
+ auto collClonerBeforeFailPoint = globalFailPointRegistry().find("hangBeforeClonerStage");
+ auto collClonerAfterFailPoint = globalFailPointRegistry().find("hangAfterClonerStage");
+ auto timesEntered = collClonerBeforeFailPoint->setMode(
+ FailPoint::alwaysOn,
+ 0,
+ fromjson("{cloner: 'TenantCollectionCloner', stage: 'count', nss: '" + _dbName + ".a'}"));
+ collClonerAfterFailPoint->setMode(
+ FailPoint::alwaysOn,
+ 0,
+ fromjson("{cloner: 'TenantCollectionCloner', stage: 'count', nss: '" + _dbName + ".a'}"));
+
+ // Run the cloner in a separate thread.
+ stdx::thread clonerThread([&] {
+ Client::initThread("ClonerRunner");
+ ASSERT_OK(cloner->run());
+ });
+ // Wait for the failpoint to be reached
+ collClonerBeforeFailPoint->waitForTimesEntered(timesEntered + 1);
+
+ // Collection stats should be set up with namespace.
+ auto stats = cloner->getStats();
+ ASSERT_EQ(_dbName, stats.dbname);
+ ASSERT_EQ(_clock.now(), stats.start);
+ ASSERT_EQ(2, stats.collections);
+ ASSERT_EQ(0, stats.clonedCollections);
+ ASSERT_EQ(2, stats.collectionStats.size());
+ ASSERT_EQ(_dbName + ".a", stats.collectionStats[0].ns);
+ ASSERT_EQ(_dbName + ".b", stats.collectionStats[1].ns);
+ ASSERT_EQ(_clock.now(), stats.collectionStats[0].start);
+ ASSERT_EQ(Date_t(), stats.collectionStats[0].end);
+ ASSERT_EQ(Date_t(), stats.collectionStats[1].start);
+ ASSERT_EQ(0, stats.collectionStats[0].indexes);
+ ASSERT_EQ(0, stats.collectionStats[1].indexes);
+ _clock.advance(Minutes(1));
+
+ // Move to the next collection
+ timesEntered = collClonerBeforeFailPoint->setMode(
+ FailPoint::alwaysOn,
+ 0,
+ fromjson("{cloner: 'TenantCollectionCloner', stage: 'count', nss: '" + _dbName + ".b'}"));
+ collClonerAfterFailPoint->setMode(FailPoint::off);
+
+ // Wait for the failpoint to be reached
+ collClonerBeforeFailPoint->waitForTimesEntered(timesEntered + 1);
+
+ stats = cloner->getStats();
+ ASSERT_EQ(2, stats.collections);
+ ASSERT_EQ(1, stats.clonedCollections);
+ ASSERT_EQ(2, stats.collectionStats.size());
+ ASSERT_EQ(_dbName + ".a", stats.collectionStats[0].ns);
+ ASSERT_EQ(_dbName + ".b", stats.collectionStats[1].ns);
+ ASSERT_EQ(2, stats.collectionStats[0].indexes);
+ ASSERT_EQ(0, stats.collectionStats[1].indexes);
+ ASSERT_EQ(_clock.now(), stats.collectionStats[0].end);
+ ASSERT_EQ(_clock.now(), stats.collectionStats[1].start);
+ ASSERT_EQ(Date_t(), stats.collectionStats[1].end);
+ _clock.advance(Minutes(1));
+
+ // Finish
+ collClonerBeforeFailPoint->setMode(FailPoint::off, 0);
+ clonerThread.join();
+
+ stats = cloner->getStats();
+ ASSERT_EQ(_dbName, stats.dbname);
+ ASSERT_EQ(_clock.now(), stats.end);
+ ASSERT_EQ(2, stats.collections);
+ ASSERT_EQ(2, stats.clonedCollections);
+ ASSERT_EQ(2, stats.collectionStats.size());
+ ASSERT_EQ(_dbName + ".a", stats.collectionStats[0].ns);
+ ASSERT_EQ(_dbName + ".b", stats.collectionStats[1].ns);
+ ASSERT_EQ(2, stats.collectionStats[0].indexes);
+ ASSERT_EQ(1, stats.collectionStats[1].indexes);
+ ASSERT_EQ(_clock.now(), stats.collectionStats[1].end);
+}
} // namespace repl
} // namespace mongo