summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSuganthi Mani <suganthi.mani@mongodb.com>2022-02-03 06:42:19 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-02-03 07:47:41 +0000
commitec97c7f137a73faa8579c5616e364b86e6cf56c8 (patch)
treeddcd276d6c2bf878f8cb9cc830b3a33af21f26a4
parent71287a3a8f033923ca9c2735c72da4460fbbf06d (diff)
downloadmongo-ec97c7f137a73faa8579c5616e364b86e6cf56c8.tar.gz
SERVER-61133 Copy donor files to temp directory.
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_ese.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_ese_gcm.yml2
-rw-r--r--jstests/replsets/libs/tenant_migration_test.js57
-rw-r--r--jstests/replsets/tenant_migration_concurrent_writes_on_donor.js26
-rw-r--r--jstests/replsets/tenant_migration_file_import.js43
-rw-r--r--jstests/replsets/tenant_migration_recipient_shard_merge_learn_files.js (renamed from jstests/replsets/tenant_migration_recipient_shard_merge.js)34
-rw-r--r--jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js (renamed from jstests/replsets/shard_merge_import_write_conflict_retry.js)44
-rw-r--r--src/mongo/db/repl/SConscript6
-rw-r--r--src/mongo/db/repl/tenant_file_cloner.cpp367
-rw-r--r--src/mongo/db/repl/tenant_file_cloner.h219
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp66
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_op_observer.h4
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp176
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.h7
-rw-r--r--src/mongo/db/repl/tenant_migration_shard_merge_util.cpp110
-rw-r--r--src/mongo/db/repl/tenant_migration_shard_merge_util.h74
16 files changed, 1072 insertions, 165 deletions
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_ese.yml b/buildscripts/resmokeconfig/suites/replica_sets_ese.yml
index 6c3eb45f26b..ceb21aa094e 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_ese.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_ese.yml
@@ -11,6 +11,8 @@ selector:
exclude_with_any_tags:
- does_not_support_encrypted_storage_engine
- disabled_due_to_server_61671
+ # Shard merge protocol won't work with encrypted storage engines.
+ - serverless
executor:
config:
shell_options:
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_ese_gcm.yml b/buildscripts/resmokeconfig/suites/replica_sets_ese_gcm.yml
index 356d3cda0af..a333024c529 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_ese_gcm.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_ese_gcm.yml
@@ -11,6 +11,8 @@ selector:
exclude_with_any_tags:
- does_not_support_encrypted_storage_engine
- disabled_due_to_server_61671
+ # Shard merge protocol won't work with encrypted storage engines.
+ - serverless
executor:
config:
shell_options:
diff --git a/jstests/replsets/libs/tenant_migration_test.js b/jstests/replsets/libs/tenant_migration_test.js
index 0e70ca11dbc..0e8ac919704 100644
--- a/jstests/replsets/libs/tenant_migration_test.js
+++ b/jstests/replsets/libs/tenant_migration_test.js
@@ -306,7 +306,14 @@ function TenantMigrationTest({
recipientNodes.forEach(node => {
const configRecipientsColl =
node.getCollection(TenantMigrationTest.kConfigRecipientsNS);
- assert.soon(() => 0 === configRecipientsColl.count({_id: migrationId}));
+ assert.soon(() => 0 === configRecipientsColl.count({_id: migrationId}), tojson(node));
+
+ let mtab;
+ assert.soon(() => {
+ mtab =
+ this.getTenantMigrationAccessBlocker({recipientNode: node, tenantId: tenantId});
+ return !mtab;
+ }, tojson(mtab));
});
};
@@ -350,6 +357,11 @@ function TenantMigrationTest({
return (mtab.donor.state === expectedAccessState);
};
+ function buildErrorMsg(
+ migrationId, expectedState, expectedAccessState, configDoc, recipientMtab) {
+ return tojson({migrationId, expectedState, expectedAccessState, configDoc, recipientMtab});
+ }
+
/**
* Asserts that the migration 'migrationId' and 'tenantId' eventually goes to the expected state
* on all the given recipient nodes.
@@ -357,8 +369,21 @@ function TenantMigrationTest({
this.waitForRecipientNodesToReachState = function(
nodes, migrationId, tenantId, expectedState, expectedAccessState) {
nodes.forEach(node => {
- assert.soon(() => this.isRecipientNodeInExpectedState(
- node, migrationId, tenantId, expectedState, expectedAccessState));
+ let result = {};
+ assert.soon(
+ () => {
+ result = this.isRecipientNodeInExpectedState(
+ node, migrationId, tenantId, expectedState, expectedAccessState);
+ return result.value;
+ },
+ () => {
+ return "waitForRecipientNodesToReachState failed: " +
+ buildErrorMsg(migrationId,
+ expectedState,
+ expectedAccessState,
+ result.configDoc,
+ result.recipientMtab);
+ });
});
};
@@ -369,8 +394,16 @@ function TenantMigrationTest({
this.assertRecipientNodesInExpectedState = function(
nodes, migrationId, tenantId, expectedState, expectedAccessState) {
nodes.forEach(node => {
- assert(this.isRecipientNodeInExpectedState(
- node, migrationId, tenantId, expectedState, expectedAccessState));
+ let result = this.isRecipientNodeInExpectedState(
+ node, migrationId, tenantId, expectedState, expectedAccessState);
+ assert(result.value, () => {
+ return "assertRecipientNodesInExpectedState failed: " +
+ buildErrorMsg(migrationId,
+ expectedState,
+ expectedAccessState,
+ result.configDoc,
+ result.recipientMtab);
+ });
});
};
@@ -383,12 +416,16 @@ function TenantMigrationTest({
const configRecipientsColl =
this.getRecipientPrimary().getCollection("config.tenantMigrationRecipients");
const configDoc = configRecipientsColl.findOne({_id: migrationId});
- if (!configDoc || configDoc.state !== expectedState) {
- return false;
- }
-
const mtab = this.getTenantMigrationAccessBlocker({recipientNode: node, tenantId});
- return (mtab.recipient.state === expectedAccessState);
+
+ let checkStates = () => {
+ if (!configDoc || configDoc.state !== expectedState) {
+ return false;
+ }
+ return (mtab.recipient.state === expectedAccessState);
+ };
+
+ return {value: checkStates(), configDoc: configDoc, recipientMtab: mtab.recipient};
};
function loadDummyData() {
diff --git a/jstests/replsets/tenant_migration_concurrent_writes_on_donor.js b/jstests/replsets/tenant_migration_concurrent_writes_on_donor.js
index 7ea82a20796..8435731f460 100644
--- a/jstests/replsets/tenant_migration_concurrent_writes_on_donor.js
+++ b/jstests/replsets/tenant_migration_concurrent_writes_on_donor.js
@@ -31,8 +31,8 @@ const tenantMigrationTest = new TenantMigrationTest({
});
const donorRst = tenantMigrationTest.getDonorRst();
+const donorPrimary = donorRst.getPrimary();
-const primary = donorRst.getPrimary();
const kCollName = "testColl";
const kTenantDefinedDbName = "0";
@@ -205,6 +205,16 @@ function makeTestOptions(
};
}
+function cleanUp(dbName) {
+ const donorDB = donorPrimary.getDB(dbName);
+
+ assert.commandWorked(donorDB.dropDatabase());
+ donorRst.awaitLastOpCommitted();
+ // TODO SERVER-62934: Remove this fsync once we run fsync command before the migration
+ // start for shard merge protocol.
+ assert.commandWorked(donorPrimary.adminCommand({fsync: 1}));
+}
+
function runTest(
primary, testCase, testFunc, dbName, collName, {testInTransaction, testAsRetryableWrite} = {}) {
const testOpts = makeTestOptions(
@@ -220,6 +230,9 @@ function runTest(
}
testFunc(testCase, testOpts);
+
+ // This cleanup step is necessary for the shard merge protocol to work correctly.
+ cleanUp(dbName);
}
function runCommand(testOpts, expectedError) {
@@ -1015,11 +1028,14 @@ for (const [testName, testFunc] of Object.entries(testFuncs)) {
continue;
}
- runTest(
- primary, testCase, testFunc, baseDbName + "Basic_" + kTenantDefinedDbName, kCollName);
+ runTest(donorPrimary,
+ testCase,
+ testFunc,
+ baseDbName + "Basic_" + kTenantDefinedDbName,
+ kCollName);
if (testCase.testInTransaction) {
- runTest(primary,
+ runTest(donorPrimary,
testCase,
testFunc,
baseDbName + "Txn_" + kTenantDefinedDbName,
@@ -1028,7 +1044,7 @@ for (const [testName, testFunc] of Object.entries(testFuncs)) {
}
if (testCase.testAsRetryableWrite) {
- runTest(primary,
+ runTest(donorPrimary,
testCase,
testFunc,
baseDbName + "Retryable_" + kTenantDefinedDbName,
diff --git a/jstests/replsets/tenant_migration_file_import.js b/jstests/replsets/tenant_migration_file_import.js
index 75770b9a9c7..c67e529c589 100644
--- a/jstests/replsets/tenant_migration_file_import.js
+++ b/jstests/replsets/tenant_migration_file_import.js
@@ -2,8 +2,6 @@
* Test the shard merge rollback-to-stable algorithm. This test was written before we implemented
* file copy, so the script opens a backup cursor and copies files itself.
*
- * TODO (SERVER-61133): Adapt or delete this test once file copy works.
- *
* @tags: [
* does_not_support_encrypted_storage_engine,
* featureFlagShardMerge,
@@ -31,12 +29,19 @@ const migrationId = UUID();
const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()});
const donorPrimary = tenantMigrationTest.getDonorPrimary();
const recipientPrimary = tenantMigrationTest.getRecipientPrimary();
+
+if (!TenantMigrationUtil.isShardMergeEnabled(recipientPrimary.getDB("admin"))) {
+ tenantMigrationTest.stop();
+ jsTestLog("Skipping Shard Merge-specific test");
+ return;
+}
+
const kDataDir =
`${recipientPrimary.dbpath}/migrationTmpFiles.${extractUUIDFromObject(migrationId)}`;
assert.eq(runNonMongoProgram("mkdir", "-p", kDataDir), 0);
(function() {
-jsTestLog("Generate test data: open a backup cursor on the donor and copy files");
+jsTestLog("Generate test data");
const db = donorPrimary.getDB("myDatabase");
const collection = db["myCollection"];
@@ -54,35 +59,6 @@ assert.commandWorked(db.runCommand({
// Ensure our new collections appear in the backup cursor's checkpoint.
assert.commandWorked(db.adminCommand({fsync: 1}));
-
-const reply = assert.commandWorked(
- donorPrimary.adminCommand({aggregate: 1, cursor: {}, pipeline: [{"$backupCursor": {}}]}));
-const cursor = reply.cursor;
-
-jsTestLog(`Backup cursor metadata: ${tojson(cursor.firstBatch[0].metadata)}`);
-jsTestLog("Copy files to local data dir");
-for (let f of cursor.firstBatch) {
- if (!f.hasOwnProperty("filename")) {
- continue;
- }
-
- assert(f.filename.startsWith(donorPrimary.dbpath));
- const suffix = f.filename.slice(donorPrimary.dbpath.length);
-
- /*
- * Create directories as needed, e.g. copy
- * /data/db/job0/mongorunner/test-0/journal/WiredTigerLog.01 to
- * /data/db/job0/mongorunner/test-1/migrationTmpFiles.migrationId/journal/WiredTigerLog.01,
- * by passing "--relative /data/db/job0/mongorunner/test-0/./journal/WiredTigerLog.01".
- * Note the "/./" marker.
- */
- assert.eq(runNonMongoProgram(
- "rsync", "-a", "--relative", `${donorPrimary.dbpath}/.${suffix}`, kDataDir),
- 0);
-}
-
-jsTestLog("Kill backup cursor");
-donorPrimary.adminCommand({killCursors: "$cmd.aggregate", cursors: [cursor.id]});
})();
jsTestLog("Run migration");
@@ -94,6 +70,9 @@ const migrationOpts = {
tenantId: kTenantId,
};
TenantMigrationTest.assertCommitted(tenantMigrationTest.runMigration(migrationOpts));
+
+// TODO SERVER-61144: Check on all recipient nodes that the collection documents got imported
+// successfully.
for (let collectionName of ["myCollection", "myCappedCollection"]) {
jsTestLog(`Checking ${collectionName}`);
// Use "countDocuments" to check actual docs, "count" to check sizeStorer data.
diff --git a/jstests/replsets/tenant_migration_recipient_shard_merge.js b/jstests/replsets/tenant_migration_recipient_shard_merge_learn_files.js
index 5e1d3e85c70..8be76547d73 100644
--- a/jstests/replsets/tenant_migration_recipient_shard_merge.js
+++ b/jstests/replsets/tenant_migration_recipient_shard_merge_learn_files.js
@@ -1,5 +1,5 @@
/**
- * Tests recipient behavior for shard merge
+ * Tests that recipient is able to learn files to be imported from donor for shard merge protocol.
*
* @tags: [
* incompatible_with_eft,
@@ -8,6 +8,7 @@
* requires_majority_read_concern,
* requires_persistence,
* serverless,
+ * featureFlagShardMerge,
* ]
*/
@@ -41,15 +42,22 @@ load("jstests/replsets/libs/tenant_migration_util.js");
const donorPrimary = tenantMigrationTest.getDonorPrimary();
const donorSecondary = donorRst.getSecondary();
+ // Do a majority write.
tenantMigrationTest.insertDonorDB(tenantDB, collName);
+ // Ensure our new collections appear in the backup cursor's checkpoint.
+ assert.commandWorked(donorPrimary.adminCommand({fsync: 1}));
+
const failpoint = "fpAfterRetrievingStartOpTimesMigrationRecipientInstance";
const waitInFailPoint = configureFailPoint(recipientPrimary, failpoint, {action: "hang"});
+ // In order to prevent the copying of "testTenantId" databases via logical cloning from donor to
+ // recipient, start migration on a tenant id which is non-existent on the donor.
const migrationUuid = UUID();
+ const kDummyTenantId = "nonExistentTenantId";
const migrationOpts = {
migrationIdString: extractUUIDFromObject(migrationUuid),
- tenantId,
+ tenantId: kDummyTenantId,
readPreference: {mode: 'primary'}
};
@@ -58,16 +66,28 @@ load("jstests/replsets/libs/tenant_migration_util.js");
waitInFailPoint.wait();
- const res =
- recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"});
- assert.eq(res.inprog.length, 1);
- const [currOp] = res.inprog;
- assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kLearnedFilenames, res);
+ tenantMigrationTest.assertRecipientNodesInExpectedState(
+ tenantMigrationTest.getRecipientRst().nodes,
+ migrationUuid,
+ kDummyTenantId,
+ TenantMigrationTest.RecipientState.kLearnedFilenames,
+ TenantMigrationTest.RecipientAccessState.kReject);
+
waitInFailPoint.off();
TenantMigrationTest.assertCommitted(
tenantMigrationTest.waitForMigrationToComplete(migrationOpts));
+ // TODO SERVER-61144: Check on all recipient nodes that the collection documents got imported
+ // successfully.
+ // Use "countDocuments" to check actual docs, "count" to check sizeStorer data.
+ assert.eq(donorPrimary.getDB(tenantDB)[collName].countDocuments({}),
+ recipientPrimary.getDB(tenantDB)[collName].countDocuments({}),
+ "countDocuments");
+ assert.eq(donorPrimary.getDB(tenantDB)[collName].count(),
+ recipientPrimary.getDB(tenantDB)[collName].count(),
+ "count");
+
tenantMigrationTest.stop();
})();
})();
diff --git a/jstests/replsets/shard_merge_import_write_conflict_retry.js b/jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js
index da4dc5cc220..6746b2d7935 100644
--- a/jstests/replsets/shard_merge_import_write_conflict_retry.js
+++ b/jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js
@@ -3,8 +3,6 @@
* collections. We will get WriteConflict exception if we try to import the files with timestamp
* older than the stable timestamp.
*
- * TODO (SERVER-61133): Adapt or delete this test once file copy works.
- *
* @tags: [
* does_not_support_encrypted_storage_engine,
* featureFlagShardMerge,
@@ -16,6 +14,7 @@
* requires_replication,
* requires_persistence,
* requires_wiredtiger,
+ * serverless,
* ]
*/
(function() {
@@ -32,12 +31,19 @@ const migrationId = UUID();
const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()});
const donorPrimary = tenantMigrationTest.getDonorPrimary();
const recipientPrimary = tenantMigrationTest.getRecipientPrimary();
+
+if (!TenantMigrationUtil.isShardMergeEnabled(recipientPrimary.getDB("admin"))) {
+ tenantMigrationTest.stop();
+ jsTestLog("Skipping Shard Merge-specific test");
+ return;
+}
+
const kDataDir =
`${recipientPrimary.dbpath}/migrationTmpFiles.${extractUUIDFromObject(migrationId)}`;
assert.eq(runNonMongoProgram("mkdir", "-p", kDataDir), 0);
(function() {
-jsTestLog("Generate test data: open a backup cursor on the donor and copy files");
+jsTestLog("Generate test data");
const db = donorPrimary.getDB("myDatabase");
const collection = db["myCollection"];
@@ -55,35 +61,6 @@ assert.commandWorked(db.runCommand({
// Ensure our new collections appear in the backup cursor's checkpoint.
assert.commandWorked(db.adminCommand({fsync: 1}));
-
-const reply = assert.commandWorked(
- donorPrimary.adminCommand({aggregate: 1, cursor: {}, pipeline: [{"$backupCursor": {}}]}));
-const cursor = reply.cursor;
-
-jsTestLog(`Backup cursor metadata: ${tojson(cursor.firstBatch[0].metadata)}`);
-jsTestLog("Copy files to local data dir");
-for (let f of cursor.firstBatch) {
- if (!f.hasOwnProperty("filename")) {
- continue;
- }
-
- assert(f.filename.startsWith(donorPrimary.dbpath));
- const suffix = f.filename.slice(donorPrimary.dbpath.length);
-
- /*
- * Create directories as needed, e.g. copy
- * /data/db/job0/mongorunner/test-0/journal/WiredTigerLog.01 to
- * /data/db/job0/mongorunner/test-1/migrationTmpFiles.migrationId/journal/WiredTigerLog.01,
- * by passing "--relative /data/db/job0/mongorunner/test-0/./journal/WiredTigerLog.01".
- * Note the "/./" marker.
- */
- assert.eq(runNonMongoProgram(
- "rsync", "-a", "--relative", `${donorPrimary.dbpath}/.${suffix}`, kDataDir),
- 0);
-}
-
-jsTestLog("Kill backup cursor");
-donorPrimary.adminCommand({killCursors: "$cmd.aggregate", cursors: [cursor.id]});
})();
// Enable Failpoints to simulate WriteConflict exception while importing donor files.
@@ -101,6 +78,9 @@ const migrationOpts = {
tenantId: kTenantId,
};
TenantMigrationTest.assertCommitted(tenantMigrationTest.runMigration(migrationOpts));
+
+// TODO SERVER-61144: Check on all recipient nodes that the collection documents got imported
+// successfully.
for (let collectionName of ["myCollection", "myCappedCollection"]) {
jsTestLog(`Checking ${collectionName}`);
// Use "countDocuments" to check actual docs, "count" to check sizeStorer data.
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index b4c4fc98b68..0df82726b10 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1347,7 +1347,9 @@ env.Library(
env.Library(
target='tenant_migration_recipient_service',
source=[
+ 'tenant_file_cloner.cpp',
'tenant_migration_recipient_coordinator.cpp',
+ 'tenant_migration_recipient_op_observer.cpp',
'tenant_migration_recipient_service.cpp',
'tenant_migration_shard_merge_util.cpp',
'vote_commit_migration_progress.cpp',
@@ -1372,6 +1374,7 @@ env.Library(
'$BUILD_DIR/mongo/db/transaction',
'cloner_utils',
'oplog',
+ 'oplog_application_interface',
'oplog_buffer_collection',
'oplog_entry',
'oplog_fetcher',
@@ -1391,9 +1394,7 @@ env.Library(
'tenant_migration_access_blocker_server_status_section.cpp',
'tenant_migration_access_blocker_util.cpp',
'tenant_migration_donor_access_blocker.cpp',
- 'tenant_migration_donor_op_observer.cpp',
'tenant_migration_recipient_access_blocker.cpp',
- 'tenant_migration_recipient_op_observer.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
@@ -1421,6 +1422,7 @@ env.Library(
env.Library(
target='tenant_migration_donor_service',
source=[
+ 'tenant_migration_donor_op_observer.cpp',
'tenant_migration_donor_service.cpp',
],
LIBDEPS=[
diff --git a/src/mongo/db/repl/tenant_file_cloner.cpp b/src/mongo/db/repl/tenant_file_cloner.cpp
new file mode 100644
index 00000000000..6a683ddc03e
--- /dev/null
+++ b/src/mongo/db/repl/tenant_file_cloner.cpp
@@ -0,0 +1,367 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTenantMigration
+
+#include "mongo/platform/basic.h"
+
+#include <fstream>
+
+#include "mongo/base/string_data.h"
+#include "mongo/db/pipeline/aggregate_command_gen.h"
+#include "mongo/db/pipeline/aggregation_request_helper.h"
+#include "mongo/db/repl/repl_server_parameters_gen.h"
+#include "mongo/db/repl/tenant_file_cloner.h"
+#include "mongo/db/repl/tenant_migration_shard_merge_util.h"
+#include "mongo/logv2/log.h"
+
+#include "mongo/util/assert_util.h"
+
+namespace mongo::repl {
+
+// TODO SERVER-63119: Verify if we need the below failpoints for TenantFileCloner unit testing.
+// Failpoint which causes the file cloner to hang after handling the next batch of results
+// from the DBClientConnection, optionally limited to a specific file name.
+MONGO_FAIL_POINT_DEFINE(TenantFileClonerHangAfterHandlingBatchResponse);
+MONGO_FAIL_POINT_DEFINE(TenantFileClonerHangDuringFileCloneBackup);
+MONGO_FAIL_POINT_DEFINE(TenantFileClonerDisableExhaust);
+TenantFileCloner::TenantFileCloner(const UUID& backupId,
+ const UUID& migrationId,
+ const std::string& remoteFileName,
+ size_t remoteFileSize,
+ const std::string& relativePath,
+ TenantMigrationSharedData* sharedData,
+ const HostAndPort& source,
+ DBClientConnection* client,
+ StorageInterface* storageInterface,
+ ThreadPool* dbPool)
+ : TenantBaseCloner("TenantFileCloner"_sd, sharedData, source, client, storageInterface, dbPool),
+ _backupId(backupId),
+ _migrationId(migrationId),
+ _remoteFileName(remoteFileName),
+ _remoteFileSize(remoteFileSize),
+ _relativePathString(relativePath),
+ _queryStage("query", this, &TenantFileCloner::queryStage),
+ _fsWorkTaskRunner(dbPool),
+ _scheduleFsWorkFn([this](executor::TaskExecutor::CallbackFn work) {
+ auto task = [ this, work = std::move(work) ](
+ OperationContext * opCtx,
+ const Status& status) mutable noexcept->TaskRunner::NextAction {
+ try {
+ work(executor::TaskExecutor::CallbackArgs(nullptr, {}, status, opCtx));
+ } catch (const DBException& e) {
+ setSyncFailedStatus(e.toStatus());
+ }
+ return TaskRunner::NextAction::kDisposeOperationContext;
+ };
+ _fsWorkTaskRunner.schedule(std::move(task));
+ return executor::TaskExecutor::CallbackHandle();
+ }),
+ _progressMeter(remoteFileSize,
+ kProgressMeterSecondsBetween,
+ kProgressMeterCheckInterval,
+ "bytes copied",
+ str::stream() << _remoteFileName << " Tenant migration file clone progress") {
+ _stats.filePath = _relativePathString;
+ _stats.fileSize = _remoteFileSize;
+}
+
+BaseCloner::ClonerStages TenantFileCloner::getStages() {
+ return {&_queryStage};
+}
+
+void TenantFileCloner::preStage() {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _stats.start = getSharedData()->getClock()->now();
+
+ // Construct local path name from the relative path and the temp dbpath.
+ boost::filesystem::path relativePath(_relativePathString);
+ uassert(6113300,
+ str::stream() << "Path " << _relativePathString << " should be a relative path",
+ relativePath.is_relative());
+
+ auto syncTargetTempDBPath = shard_merge_utils::fileClonerTempDir(_migrationId);
+ _localFilePath = syncTargetTempDBPath;
+
+ _localFilePath /= relativePath;
+ _localFilePath = _localFilePath.lexically_normal();
+ uassert(6113301,
+ str::stream() << "Path " << _relativePathString
+ << " must not escape its parent directory.",
+ StringData(_localFilePath.generic_string())
+ .startsWith(syncTargetTempDBPath.generic_string()));
+
+ // Create and open files and any parent directories.
+ if (boost::filesystem::exists(_localFilePath)) {
+ LOGV2(6113302,
+ "Local file exists at start of TenantFileCloner; truncating.",
+ "localFilePath"_attr = _localFilePath.string());
+ } else {
+ auto localFileDir = _localFilePath.parent_path();
+ boost::system::error_code ec;
+ boost::filesystem::create_directories(localFileDir, ec);
+ uassert(6113303,
+ str::stream() << "Failed to create directory " << localFileDir.string() << " Error "
+ << ec.message(),
+ !ec);
+ }
+ _localFile.open(_localFilePath.string(),
+ std::ios_base::out | std::ios_base::binary | std::ios_base::trunc);
+ uassert(ErrorCodes::FileOpenFailed,
+ str::stream() << "Failed to open file " << _localFilePath.string(),
+ !_localFile.fail());
+ _fileOffset = 0;
+}
+
+void TenantFileCloner::postStage() {
+ _localFile.close();
+ stdx::lock_guard<Latch> lk(_mutex);
+ _stats.end = getSharedData()->getClock()->now();
+}
+
+BaseCloner::AfterStageBehavior TenantFileCloner::queryStage() {
+ // Since the query stage may be re-started, we need to make sure all the file system work
+ // from the previous run is done before running the query again.
+ waitForFilesystemWorkToComplete();
+ _sawEof = false;
+ runQuery();
+ waitForFilesystemWorkToComplete();
+ uassert(
+ 6113304,
+ str::stream()
+ << "Received entire file, but did not get end of file marker. File may be incomplete "
+ << _localFilePath.string(),
+ _sawEof);
+ return kContinueNormally;
+}
+
+size_t TenantFileCloner::getFileOffset() {
+ stdx::lock_guard<Latch> lk(_mutex);
+ return _fileOffset;
+}
+
+void TenantFileCloner::runQuery() {
+ auto backupFileStage = BSON(
+ "$_backupFile" << BSON("backupId" << _backupId << "file" << _remoteFileName << "byteOffset"
+ << static_cast<int64_t>(getFileOffset())));
+ AggregateCommandRequest aggRequest(
+ NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb),
+ {backupFileStage});
+ aggRequest.setReadConcern(ReadConcernArgs::kImplicitDefault);
+ aggRequest.setWriteConcern(WriteConcernOptions());
+
+ LOGV2_DEBUG(6113305,
+ 2,
+ "TenantFileCloner running aggregation",
+ "source"_attr = getSource(),
+ "aggRequest"_attr = aggregation_request_helper::serializeToCommandObj(aggRequest));
+ const bool useExhaust = !MONGO_unlikely(TenantFileClonerDisableExhaust.shouldFail());
+ std::unique_ptr<DBClientCursor> cursor = uassertStatusOK(DBClientCursor::fromAggregationRequest(
+ getClient(), std::move(aggRequest), true /* secondaryOk */, useExhaust));
+ try {
+ while (cursor->more()) {
+ DBClientCursorBatchIterator iter(*cursor);
+ handleNextBatch(iter);
+ }
+ } catch (const DBException& e) {
+ // We cannot continue after an error when processing exhaust cursors. Instead we must
+ // reconnect, which is handled by the BaseCloner.
+ LOGV2_DEBUG(6113306,
+ 1,
+ "TenantFileCloner received an exception while downloading data",
+ "error"_attr = e.toStatus(),
+ "source"_attr = getSource(),
+ "backupId"_attr = _backupId,
+ "remoteFile"_attr = _remoteFileName,
+ "fileOffset"_attr = getFileOffset());
+ getClient()->shutdown();
+ throw;
+ }
+}
+
+void TenantFileCloner::handleNextBatch(DBClientCursorBatchIterator& iter) {
+ LOGV2_DEBUG(6113307,
+ 3,
+ "TenantFileCloner handleNextBatch",
+ "source"_attr = getSource(),
+ "backupId"_attr = _backupId,
+ "remoteFile"_attr = _remoteFileName,
+ "fileOffset"_attr = getFileOffset(),
+ "moreInCurrentBatch"_attr = iter.moreInCurrentBatch());
+ {
+ stdx::lock_guard<TenantMigrationSharedData> lk(*getSharedData());
+ if (!getSharedData()->getStatus(lk).isOK()) {
+ static constexpr char message[] = "BackupFile cloning cancelled due to cloning failure";
+ LOGV2(6113323, message, "error"_attr = getSharedData()->getStatus(lk));
+ uasserted(ErrorCodes::CallbackCanceled,
+ str::stream() << message << ": " << getSharedData()->getStatus(lk));
+ }
+ }
+ while (iter.moreInCurrentBatch()) {
+ stdx::lock_guard<Latch> lk(_mutex);
+ _stats.receivedBatches++;
+ while (iter.moreInCurrentBatch()) {
+ _dataToWrite.emplace_back(iter.nextSafe());
+ }
+ }
+
+ // Schedule the next set of writes.
+ auto&& scheduleResult = _scheduleFsWorkFn([=](const executor::TaskExecutor::CallbackArgs& cbd) {
+ writeDataToFilesystemCallback(cbd);
+ });
+
+ if (!scheduleResult.isOK()) {
+ Status newStatus = scheduleResult.getStatus().withContext(
+ str::stream() << "Error copying file '" << _remoteFileName << "'");
+ // We must throw an exception to terminate query.
+ uassertStatusOK(newStatus);
+ }
+
+ TenantFileClonerHangAfterHandlingBatchResponse.executeIf(
+ [&](const BSONObj&) {
+ while (MONGO_unlikely(TenantFileClonerHangAfterHandlingBatchResponse.shouldFail()) &&
+ !mustExit()) {
+ LOGV2(6113308,
+ "TenantFileClonerHangAfterHandlingBatchResponse fail point "
+ "enabled. Blocking until fail point is disabled",
+ "remoteFile"_attr = _remoteFileName);
+ mongo::sleepmillis(100);
+ }
+ },
+ [&](const BSONObj& data) {
+ // Only hang when copying the specified file, or if no file was specified.
+ auto filename = data["remoteFile"].str();
+ return filename.empty() || filename == _remoteFileName;
+ });
+}
+
+void TenantFileCloner::writeDataToFilesystemCallback(
+ const executor::TaskExecutor::CallbackArgs& cbd) {
+ LOGV2_DEBUG(6113309,
+ 3,
+ "TenantFileCloner writeDataToFilesystemCallback",
+ "backupId"_attr = _backupId,
+ "remoteFile"_attr = _remoteFileName,
+ "localFile"_attr = _localFilePath.string(),
+ "fileOffset"_attr = getFileOffset());
+ uassertStatusOK(cbd.status);
+ {
+ stdx::lock_guard<Latch> lk(_mutex);
+ if (_dataToWrite.size() == 0) {
+ LOGV2_WARNING(6113310,
+ "writeDataToFilesystemCallback, but no data to write",
+ "remoteFile"_attr = _remoteFileName);
+ }
+ for (const auto& doc : _dataToWrite) {
+ uassert(6113311,
+ str::stream() << "Saw multiple end-of-file-markers in file " << _remoteFileName,
+ !_sawEof);
+ // Received file data should always be in sync with the stream and where we think
+ // our next input should be coming from.
+ const auto byteOffset = doc["byteOffset"].safeNumberLong();
+ invariant(byteOffset == _localFile.tellp());
+ invariant(byteOffset == _fileOffset);
+ const auto& dataElem = doc["data"];
+ uassert(6113312,
+ str::stream() << "Expected file data to be type BinDataGeneral. " << doc,
+ dataElem.type() == BinData && dataElem.binDataType() == BinDataGeneral);
+ int dataLength;
+ auto data = dataElem.binData(dataLength);
+ _localFile.write(data, dataLength);
+ uassert(ErrorCodes::FileStreamFailed,
+ str::stream() << "Unable to write file data for file " << _remoteFileName
+ << " at offset " << _fileOffset,
+ !_localFile.fail());
+ _progressMeter.hit(dataLength);
+ _fileOffset += dataLength;
+ _stats.bytesCopied += dataLength;
+ _sawEof = doc["endOfFile"].booleanSafe();
+ }
+ _dataToWrite.clear();
+ _stats.writtenBatches++;
+ }
+
+ TenantFileClonerHangDuringFileCloneBackup.executeIf(
+ [&](const BSONObj&) {
+ LOGV2(6113313,
+ "TenantFileClonerHangDuringFileCloneBackup fail point "
+ "enabled. Blocking until fail point is disabled");
+ while (MONGO_unlikely(TenantFileClonerHangDuringFileCloneBackup.shouldFail()) &&
+ !mustExit()) {
+ mongo::sleepmillis(100);
+ }
+ },
+ [&](const BSONObj& data) {
+ return (data["remoteFile"].eoo() || data["remoteFile"].str() == _remoteFileName) &&
+ (data["fileOffset"].eoo() || data["fileOffset"].safeNumberLong() <= _fileOffset);
+ });
+}
+
+bool TenantFileCloner::isMyFailPoint(const BSONObj& data) const {
+ auto remoteFile = data["remoteFile"].str();
+ return (remoteFile.empty() || remoteFile == _remoteFileName) && BaseCloner::isMyFailPoint(data);
+}
+
+void TenantFileCloner::waitForFilesystemWorkToComplete() {
+ _fsWorkTaskRunner.join();
+}
+
+TenantFileCloner::Stats TenantFileCloner::getStats() const {
+ stdx::lock_guard<Latch> lk(_mutex);
+ return _stats;
+}
+
+std::string TenantFileCloner::Stats::toString() const {
+ return toBSON().toString();
+}
+
+BSONObj TenantFileCloner::Stats::toBSON() const {
+ BSONObjBuilder bob;
+ append(&bob);
+ return bob.obj();
+}
+
+void TenantFileCloner::Stats::append(BSONObjBuilder* builder) const {
+ builder->append("filePath", filePath);
+ builder->appendNumber("fileSize", static_cast<long long>(fileSize));
+ builder->appendNumber("bytesCopied", static_cast<long long>(bytesCopied));
+ if (start != Date_t()) {
+ builder->appendDate("start", start);
+ if (end != Date_t()) {
+ builder->appendDate("end", end);
+ auto elapsed = end - start;
+ long long elapsedMillis = duration_cast<Milliseconds>(elapsed).count();
+ builder->appendNumber("elapsedMillis", elapsedMillis);
+ }
+ }
+ builder->appendNumber("receivedBatches", static_cast<long long>(receivedBatches));
+ builder->appendNumber("writtenBatches", static_cast<long long>(writtenBatches));
+}
+
+} // namespace mongo::repl
diff --git a/src/mongo/db/repl/tenant_file_cloner.h b/src/mongo/db/repl/tenant_file_cloner.h
new file mode 100644
index 00000000000..def09528953
--- /dev/null
+++ b/src/mongo/db/repl/tenant_file_cloner.h
@@ -0,0 +1,219 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <boost/filesystem.hpp>
+#include <memory>
+#include <vector>
+
+#include "mongo/db/repl/base_cloner.h"
+#include "mongo/db/repl/task_runner.h"
+#include "mongo/db/repl/tenant_base_cloner.h"
+#include "mongo/db/repl/tenant_migration_shared_data.h"
+#include "mongo/util/progress_meter.h"
+
+namespace mongo::repl {
+
+class TenantFileCloner final : public TenantBaseCloner {
+public:
+ struct Stats {
+ std::string filePath;
+ size_t fileSize;
+ Date_t start;
+ Date_t end;
+ size_t receivedBatches{0};
+ size_t writtenBatches{0};
+ size_t bytesCopied{0};
+
+ std::string toString() const;
+ BSONObj toBSON() const;
+ void append(BSONObjBuilder* builder) const;
+ };
+
+ /**
+ * Type of function to schedule file system tasks with the executor.
+ *
+ * Used for testing only.
+ */
+ using ScheduleFsWorkFn = unique_function<StatusWith<executor::TaskExecutor::CallbackHandle>(
+ executor::TaskExecutor::CallbackFn)>;
+
+ /**
+ * Constructor for TenantFileCloner.
+ *
+ * remoteFileName: Path of file to copy on remote system.
+ * remoteFileSize: Size of remote file in bytes, used for progress messages and stats only.
+ * relativePath: Path of file relative to dbpath on the remote system, as a
+ * boost::filesystem::path generic path.
+ */
+ TenantFileCloner(const UUID& backupId,
+ const UUID& migrationId,
+ const std::string& remoteFileName,
+ size_t remoteFileSize,
+ const std::string& relativePath,
+ TenantMigrationSharedData* sharedData,
+ const HostAndPort& source,
+ DBClientConnection* client,
+ StorageInterface* storageInterface,
+ ThreadPool* dbPool);
+
+ virtual ~TenantFileCloner() = default;
+
+ /**
+ * Waits for any file system work to finish or fail.
+ */
+ void waitForFilesystemWorkToComplete();
+
+ Stats getStats() const;
+
+ std::string toString() const;
+
+protected:
+ ClonerStages getStages() final;
+
+ bool isMyFailPoint(const BSONObj& data) const final;
+
+private:
+ friend class TenantFileClonerTest;
+
+ class TenantFileClonerQueryStage : public ClonerStage<TenantFileCloner> {
+ public:
+ TenantFileClonerQueryStage(std::string name,
+ TenantFileCloner* cloner,
+ ClonerRunFn stageFunc)
+ : ClonerStage<TenantFileCloner>(name, cloner, stageFunc) {}
+
+ bool checkSyncSourceValidityOnRetry() override {
+ // Sync source validity is assured by the backup ID not existing if the sync source
+ // is restarted or otherwise becomes invalid.
+ return false;
+ }
+
+ bool isTransientError(const Status& status) override {
+ if (isCursorError(status)) {
+ return true;
+ }
+ return ErrorCodes::isRetriableError(status);
+ }
+
+ static bool isCursorError(const Status& status) {
+ // Our cursor was killed on the sync source.
+ if ((status == ErrorCodes::CursorNotFound) || (status == ErrorCodes::OperationFailed) ||
+ (status == ErrorCodes::QueryPlanKilled)) {
+ return true;
+ }
+ return false;
+ }
+ };
+
+ std::string describeForFuzzer(BaseClonerStage* stage) const final {
+ // We do not have a fuzzer for tenant backup file cloner.
+ MONGO_UNREACHABLE;
+ }
+
+ /**
+ * The preStage sets the begin time in _stats and makes sure the destination file
+ * can be created.
+ */
+ void preStage() final;
+
+ /**
+ * The postStage sets the end time in _stats.
+ */
+ void postStage() final;
+
+ /**
+ * Stage function that executes a query to retrieve the file data. For each
+ * batch returned by the upstream node, handleNextBatch will be called with the data. This
+ * stage will finish when the entire query is finished or failed.
+ */
+ AfterStageBehavior queryStage();
+
+ /**
+ * Put all results from a query batch into a buffer, and schedule it to be written to disk.
+ */
+ void handleNextBatch(DBClientCursorBatchIterator& iter);
+
+ /**
+ * Called whenever there is a new batch of documents ready from the DBClientConnection.
+ *
+ * Each document returned will be inserted via the storage interfaceRequest storage
+ * interface.
+ */
+ void writeDataToFilesystemCallback(const executor::TaskExecutor::CallbackArgs& cbd);
+
+ /**
+ * Sends an (aggregation) query command to the source. That query command with be parameterized
+ * based on copy progress.
+ */
+ void runQuery();
+
+ /**
+ * Convenience call to get the file offset under a lock.
+ */
+ size_t getFileOffset();
+
+ // All member variables are labeled with one of the following codes indicating the
+ // synchronization rules for accessing them.
+ //
+ // (R) Read-only in concurrent operation; no synchronization required.
+ // (S) Self-synchronizing; access according to class's own rules.
+ // (M) Reads and writes guarded by _mutex (defined in base class).
+ // (X) Access only allowed from the main flow of control called from run() or constructor.
+ const UUID _backupId; // (R)
+ const UUID _migrationId; // (R)
+ const std::string _remoteFileName; // (R)
+ size_t _remoteFileSize; // (R)
+ const std::string _relativePathString; // (R)
+ boost::filesystem::path _localFilePath; // (X)
+
+ TenantFileClonerQueryStage _queryStage; // (R)
+
+ std::ofstream _localFile; // (M)
+ // File offset we will request from the remote side in the next query.
+ off_t _fileOffset = 0; // (M)
+ bool _sawEof = false; // (X)
+
+ // Data read from source to insert.
+ std::vector<BSONObj> _dataToWrite; // (M)
+ // Putting _fsWorkTaskRunner last ensures anything the database work threads depend on
+ // like _dataToWrite, is destroyed after those threads exit.
+ TaskRunner _fsWorkTaskRunner; // (R)
+ // Function for scheduling filesystem work using the executor.
+ ScheduleFsWorkFn _scheduleFsWorkFn; // (R)
+
+ ProgressMeter _progressMeter; // (X) progress meter for this instance.
+ Stats _stats; // (M)
+
+ static constexpr int kProgressMeterSecondsBetween = 60;
+ static constexpr int kProgressMeterCheckInterval = 128;
+};
+
+} // namespace mongo::repl
diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
index 3de36b59ce3..589658b3308 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/repl/tenant_migration_access_blocker_util.h"
#include "mongo/db/repl/tenant_migration_recipient_access_blocker.h"
#include "mongo/db/repl/tenant_migration_recipient_op_observer.h"
+#include "mongo/db/repl/tenant_migration_shard_merge_util.h"
#include "mongo/db/repl/tenant_migration_state_machine_gen.h"
#include "mongo/db/repl/tenant_migration_util.h"
#include "mongo/logv2/log.h"
@@ -84,9 +85,72 @@ void onSetRejectReadsBeforeTimestamp(OperationContext* opCtx,
mtab->startRejectingReadsBefore(recipientStateDoc.getRejectReadsBeforeTimestamp().get());
}
-
} // namespace
+void TenantMigrationRecipientOpObserver::onCreateCollection(OperationContext* opCtx,
+ const CollectionPtr& coll,
+ const NamespaceString& collectionName,
+ const CollectionOptions& options,
+ const BSONObj& idIndex,
+ const OplogSlot& createOpTime) {
+ if (!shard_merge_utils::isDonatedFilesCollection(collectionName))
+ return;
+
+ auto collString = collectionName.coll().toString();
+ auto migrationUUID =
+ UUID(uassertStatusOK(UUID::parse(collString.substr(collString.find('.') + 1))));
+ auto fileClonerTempDirPath = shard_merge_utils::fileClonerTempDir(migrationUUID);
+
+ // This is possible when a secondary restarts or rollback and the donated files collection
+ // is created as part of oplog replay.
+ if (boost::filesystem::exists(fileClonerTempDirPath)) {
+ LOGV2_DEBUG(6113316,
+ 1,
+ "File cloner temp directory already exists",
+ "directory"_attr = fileClonerTempDirPath.generic_string());
+
+ // Ignoring the errors because if this step fails, then the following step
+ // create_directory() will fail and that will throw an exception.
+ boost::system::error_code ec;
+ boost::filesystem::remove_all(fileClonerTempDirPath, ec);
+ }
+
+ try {
+ boost::filesystem::create_directory(fileClonerTempDirPath);
+ } catch (std::exception& e) {
+ LOGV2_ERROR(6113317,
+ "Error creating file cloner temp directory",
+ "directory"_attr = fileClonerTempDirPath.generic_string(),
+ "error"_attr = e.what());
+ throw;
+ }
+}
+
+void TenantMigrationRecipientOpObserver::onInserts(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const UUID& uuid,
+ std::vector<InsertStatement>::const_iterator first,
+ std::vector<InsertStatement>::const_iterator last,
+ bool fromMigrate) {
+
+ if (!shard_merge_utils::isDonatedFilesCollection(nss))
+ return;
+
+ try {
+ uassertStatusOK(shard_merge_utils::cloneFiles(opCtx, first, last));
+ } catch (const DBException& ex) {
+ invariant(first != last);
+ auto migrationId = UUID(
+ uassertStatusOK(UUID::parse(first->doc[shard_merge_utils::kMigrationIdFieldName])));
+ LOGV2_ERROR(6113318,
+ "Error cloning files",
+ "migrationUUID"_attr = migrationId,
+ "error"_attr = ex.toStatus());
+ // TODO SERVER-63120: On error, vote shard merge abort to recipient primary.
+ }
+}
+
void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx,
const OplogUpdateEntryArgs& args) {
if (args.nss == NamespaceString::kTenantMigrationRecipientsNamespace &&
diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.h b/src/mongo/db/repl/tenant_migration_recipient_op_observer.h
index fe7178819d6..76567db5b95 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.h
@@ -83,7 +83,7 @@ public:
const UUID& uuid,
std::vector<InsertStatement>::const_iterator first,
std::vector<InsertStatement>::const_iterator last,
- bool fromMigrate) final {}
+ bool fromMigrate) final;
void onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) final;
@@ -113,7 +113,7 @@ public:
const NamespaceString& collectionName,
const CollectionOptions& options,
const BSONObj& idIndex,
- const OplogSlot& createOpTime) final {}
+ const OplogSlot& createOpTime) final;
void onCollMod(OperationContext* opCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 7cb86ffe27a..7868ecac516 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -90,7 +90,6 @@ using namespace fmt;
const std::string kTTLIndexName = "TenantMigrationRecipientTTLIndex";
const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max());
constexpr StringData kOplogBufferPrefix = "repl.migration.oplog_"_sd;
-constexpr StringData kDonatedFilesPrefix = "donatedFiles."_sd;
constexpr int kBackupCursorFileFetcherRetryAttempts = 10;
NamespaceString getOplogBufferNs(const UUID& migrationUUID) {
@@ -98,11 +97,6 @@ NamespaceString getOplogBufferNs(const UUID& migrationUUID) {
kOplogBufferPrefix + migrationUUID.toString());
}
-NamespaceString getDonatedFilesNs(const UUID& migrationUUID) {
- return NamespaceString(NamespaceString::kConfigDb,
- kDonatedFilesPrefix + migrationUUID.toString());
-}
-
boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext* opCtx) {
StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
@@ -141,11 +135,6 @@ bool isRetriableOplogFetcherError(Status oplogFetcherStatus) {
oplogFetcherStatus == ErrorCodes::ShutdownInProgress;
}
-boost::filesystem::path fileClonerTempDir(UUID migrationId) {
- return boost::filesystem::path(storageGlobalParams.dbpath) /
- ("migrationTmpFiles.{}"_format(migrationId.toString()));
-}
-
} // namespace
// A convenient place to set test-specific parameters.
@@ -981,10 +970,14 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_getDonorFilenam
}();
auto fetchStatus = std::make_shared<boost::optional<Status>>();
- auto fetcherCallback = [this, self = shared_from_this(), fetchStatus, token](
- const Fetcher::QueryResponseStatus& dataStatus,
- Fetcher::NextAction* nextAction,
- BSONObjBuilder* getMoreBob) {
+ auto uniqueMetadataInfo = std::make_unique<boost::optional<shard_merge_utils::MetadataInfo>>();
+ auto fetcherCallback = [this,
+ self = shared_from_this(),
+ fetchStatus,
+ metadataInfoPtr = uniqueMetadataInfo.get(),
+ token](const Fetcher::QueryResponseStatus& dataStatus,
+ Fetcher::NextAction* nextAction,
+ BSONObjBuilder* getMoreBob) {
if (!dataStatus.isOK()) {
*fetchStatus = dataStatus.getStatus();
LOGV2_ERROR(6113003, "backup cursor failed", "error"_attr = dataStatus.getStatus());
@@ -1012,7 +1005,13 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_getDonorFilenam
const auto& metadata = doc["metadata"].Obj();
auto startApplyingDonorOpTime =
OpTime(metadata["checkpointTimestamp"].timestamp(), OpTime::kUninitializedTerm);
- // TODO (SERVER-61133) Uncomment the following lines when we skip
+
+
+ invariant(metadataInfoPtr && !*metadataInfoPtr);
+ (*metadataInfoPtr) = shard_merge_utils::MetadataInfo::constructMetadataInfo(
+ getMigrationUUID(), _client->getServerAddress(), metadata);
+
+ // TODO (SERVER-61145) Uncomment the following lines when we skip
// _getStartopTimesFromDonor entirely
// _stateDoc.setStartApplyingDonorOpTime(startApplyingDonorOpTime);
// _stateDoc.setStartFetchingDonorOpTime(startApplyingDonorOpTime);
@@ -1029,36 +1028,28 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_getDonorFilenam
"filename"_attr = doc["filename"].String(),
"backupCursorId"_attr = data.cursorId);
- auto donatedFilesNs = getDonatedFilesNs(getMigrationUUID());
- auto status = writeConflictRetry(
- opCtx,
- "insertBackupCursorEntry",
- donatedFilesNs.ns(),
- [opCtx, donatedFilesNs, doc]() -> Status {
- // Disabling internal document validation because the fetcher batch size
- // can exceed the max data size limit BSONObjMaxUserSize with the
- // additional fields we add to documents.
- DisableDocumentValidation documentValidationDisabler(
- opCtx, DocumentValidationSettings::kDisableInternalValidation);
-
- auto obj = std::vector<mongo::BSONObj>{
- doc.addField(BSON("_id" << OID::gen()).firstElement()).getOwned()};
- write_ops::InsertCommandRequest insertOp(donatedFilesNs);
- insertOp.setDocuments(std::move(obj));
- insertOp.setWriteCommandRequestBase([] {
- write_ops::WriteCommandRequestBase wcb;
- wcb.setOrdered(true);
- return wcb;
- }());
-
- auto writeResult = write_ops_exec::performInserts(opCtx, insertOp);
- invariant(!writeResult.results.empty());
- // Writes are ordered, check only the last writeOp result.
- uassertStatusOK(writeResult.results.back());
-
- return Status::OK();
- });
- uassertStatusOK(status);
+ invariant(metadataInfoPtr && *metadataInfoPtr);
+ auto docs = std::vector<mongo::BSONObj>{(*metadataInfoPtr)->toBSON(doc).getOwned()};
+
+ // Disabling internal document validation because the fetcher batch size
+ // can exceed the max data size limit BSONObjMaxUserSize with the
+ // additional fields we add to documents.
+ DisableDocumentValidation documentValidationDisabler(
+ opCtx, DocumentValidationSettings::kDisableInternalValidation);
+
+ write_ops::InsertCommandRequest insertOp(
+ shard_merge_utils::getDonatedFilesNs(getMigrationUUID()));
+ insertOp.setDocuments(std::move(docs));
+ insertOp.setWriteCommandRequestBase([] {
+ write_ops::WriteCommandRequestBase wcb;
+ wcb.setOrdered(true);
+ return wcb;
+ }());
+
+ auto writeResult = write_ops_exec::performInserts(opCtx, insertOp);
+ invariant(!writeResult.results.empty());
+ // Writes are ordered, check only the last writeOp result.
+ uassertStatusOK(writeResult.results.back());
}
}
@@ -1092,7 +1083,7 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_getDonorFilenam
return _donorFilenameBackupCursorFileFetcher->onCompletion()
.thenRunOn(**_scopedExecutor)
- .then([fetchStatus] {
+ .then([fetchStatus, uniqueMetadataInfo = std::move(uniqueMetadataInfo)] {
if (!*fetchStatus) {
// the callback was never invoked
uasserted(6113007, "Internal error running cursor callback in command");
@@ -1112,7 +1103,7 @@ void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLo
// We only expect to already have start optimes populated if we are not
// resuming a migration and this is a multitenant migration.
- // TODO (SERVER-61133) Eventually we'll skip _getStartopTimesFromDonor entirely
+ // TODO (SERVER-61145) Eventually we'll skip _getStartopTimesFromDonor entirely
// for shard merge, but currently _getDonorFilenames will populate optimes for
// the shard merge case. We can just overwrite here since we aren't doing anything
// with the backup cursor results yet.
@@ -1902,7 +1893,6 @@ TenantMigrationRecipientService::Instance::_advanceStableTimestampToStartApplyin
// We do not have a mechanism to wait on the stableTimestamp reaching a specific ts, so
// we wait on the majority commit point instead.
- // TODO SERVER-61731 Retry importing donor collections on failure
return WaitForMajorityService::get(opCtx->getServiceContext())
.waitUntilMajority(noOpTs, CancellationToken::uncancelable());
})
@@ -1947,25 +1937,42 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_importCopiedFiles()
return SemiFuture<void>::makeReady();
}
- stdx::lock_guard lk(_mutex);
-
return ExecutorFuture(**_scopedExecutor)
- .then([this, self = shared_from_this(), stateDoc = _stateDoc] {
- auto tempWTDirectory = fileClonerTempDir(stateDoc.getId());
- if (!boost::filesystem::exists(tempWTDirectory)) {
- // TODO (SERVER-61133): Abort the merge if no files.
- LOGV2_WARNING(6113701,
- "No temp directory of donor files to import",
- "tempDirectory"_attr = tempWTDirectory.string());
- return SemiFuture<void>::makeReady();
- }
+ .then([this, self = shared_from_this()] {
+ auto tempWTDirectory = shard_merge_utils::fileClonerTempDir(getMigrationUUID());
+
+ uassert(6113315,
+ str::stream() << "Missing file cloner's temporary dbpath directory: "
+ << tempWTDirectory.string(),
+ boost::filesystem::exists(tempWTDirectory));
+
+ // TODO SERVER-63204: Reevaluate if this is the correct place to remove the temporary
+ // WT dbpath.
+ ON_BLOCK_EXIT([&tempWTDirectory, &migrationId = getMigrationUUID()] {
+ LOGV2_DEBUG(6113324,
+ 1,
+ "Done importing files, removing the temporary WT dbpath",
+ "migrationId"_attr = migrationId,
+ "tempDbPath"_attr = tempWTDirectory.string());
+ boost::system::error_code ec;
+ boost::filesystem::remove_all(tempWTDirectory, ec);
+ });
auto opCtx = cc().makeOperationContext();
// TODO (SERVER-62054): do this after file-copy, on primary and secondaries.
auto metadatas =
wiredTigerRollbackToStableAndGetMetadata(opCtx.get(), tempWTDirectory.string());
- wiredTigerImportFromBackupCursor(opCtx.get(), metadatas, tempWTDirectory.string());
+ // TODO SERVER-63122: Remove the try-catch block once logical cloning is removed for
+ // shard merge protocol.
+ try {
+ shard_merge_utils::wiredTigerImportFromBackupCursor(
+ opCtx.get(), metadatas, tempWTDirectory.string());
+ } catch (const ExceptionFor<ErrorCodes::NamespaceExists>& ex) {
+ LOGV2_WARNING(
+ 6113314, "Temporarily ignoring the error", "error"_attr = ex.toStatus());
+ }
+
return SemiFuture<void>::makeReady();
})
.semi();
@@ -2268,6 +2275,29 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_updateStateDocForMa
.semi();
}
+SemiFuture<void> TenantMigrationRecipientService::Instance::_updateStateDocForAllVotingNodes(
+ WithLock lk, const CancellationToken& abortToken) const {
+ return ExecutorFuture(**_scopedExecutor)
+ .then([this, self = shared_from_this(), stateDoc = _stateDoc, abortToken] {
+ auto opCtx = cc().makeOperationContext();
+ uassertStatusOK(
+ tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), stateDoc));
+
+ auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+
+ auto votingMembersWriteConcern =
+ WriteConcernOptions(repl::ReplSetConfig::kConfigAllWriteConcernName,
+ WriteConcernOptions::SyncMode::NONE,
+ WriteConcernOptions::kNoTimeout);
+
+ auto writeConcernFuture =
+ repl::ReplicationCoordinator::get(opCtx->getServiceContext())
+ ->awaitReplicationAsyncNoWTimeout(writeOpTime, votingMembersWriteConcern);
+ return future_util::withCancellation(std::move(writeConcernFuture), abortToken);
+ })
+ .semi();
+}
+
void TenantMigrationRecipientService::Instance::_fetchAndStoreDonorClusterTimeKeyDocs(
const CancellationToken& token) {
std::vector<ExternalKeysCollectionDocument> keyDocs;
@@ -2576,24 +2606,30 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run(
"backup cursor alive.");
stdx::lock_guard lk(_mutex);
_backupCursorKeepAliveCancellation = CancellationSource(token);
- _backupCursorKeepAliveFuture =
- keepBackupCursorAlive(_backupCursorKeepAliveCancellation,
- **_scopedExecutor,
- _client->getServerHostAndPort(),
- _donorFilenameBackupCursorId,
- _donorFilenameBackupCursorNamespaceString);
+ _backupCursorKeepAliveFuture = shard_merge_utils::keepBackupCursorAlive(
+ _backupCursorKeepAliveCancellation,
+ **_scopedExecutor,
+ _client->getServerHostAndPort(),
+ _donorFilenameBackupCursorId,
+ _donorFilenameBackupCursorNamespaceString);
})
- .then([this, self = shared_from_this()] {
+ .then([this, self = shared_from_this(), token] {
if (_stateDoc.getProtocol() != MigrationProtocolEnum::kShardMerge) {
return SemiFuture<void>::makeReady();
}
stdx::lock_guard lk(_mutex);
_stateDoc.setState(TenantMigrationRecipientStateEnum::kLearnedFilenames);
- return _updateStateDocForMajority(lk);
+ // Since we are doing synchronous file cloning via op-observer, no need to
+ // wait explicitly for shard merge commit votes from all nodes to
+ // move to next step in this chain. Just waiting for the state doc with state
+ // "TenantMigrationRecipientStateEnum::kLearnedFilenames" to get replicated
+ // to all nodes should be sufficient to safely move to next step in this
+ // chain.
+ return _updateStateDocForAllVotingNodes(lk, token);
})
.then([this, self = shared_from_this()] {
- // TODO (SERVER-61133) temporarily stop fetcher/backup cursor here for
+ // TODO (SERVER-62734) temporarily stop fetcher/backup cursor here for
// now. We'll need to move this call elsewhere eventually to shut down
// the backup cursor and/or keepalive.
// Some tests fail unless we do this here, punting on dealing with those
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h
index 5c7f26e8d4b..8284514cf71 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.h
@@ -509,6 +509,13 @@ public:
*/
SemiFuture<void> _updateStateDocForMajority(WithLock lk) const;
+ /**
+ * Updates the state doc in the database and waits for that to be propagated to all nodes in
+ * the replica set.
+ */
+ SemiFuture<void> _updateStateDocForAllVotingNodes(
+ WithLock lk, const CancellationToken& abortToken) const;
+
/*
* Returns the majority OpTime on the donor node that 'client' is connected to.
*/
diff --git a/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp b/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp
index 030edd19a15..5aa5c89d828 100644
--- a/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp
+++ b/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp
@@ -44,6 +44,9 @@
#include "mongo/db/cursor_server_params_gen.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/multitenancy.h"
+#include "mongo/db/repl/oplog_applier.h"
+#include "mongo/db/repl/tenant_file_cloner.h"
+#include "mongo/db/repl/tenant_migration_shared_data.h"
#include "mongo/db/storage/durable_catalog.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_import.h"
#include "mongo/db/views/view_catalog.h"
@@ -54,7 +57,7 @@
// cursor timeout.
constexpr int kBackupCursorKeepAliveIntervalMillis = mongo::kCursorTimeoutMillisDefault / 2;
-namespace mongo::repl {
+namespace mongo::repl::shard_merge_utils {
namespace {
using namespace fmt::literals;
@@ -88,8 +91,111 @@ std::string constructDestinationPath(const std::string& ident) {
filePath /= (ident + kTableExtension);
return filePath.string();
}
+
+/**
+ * Computes a boost::filesystem::path generic-style relative path (always uses slashes)
+ * from a base path and a relative path.
+ */
+std::string _getPathRelativeTo(const std::string& path, const std::string& basePath) {
+ if (basePath.empty() || path.find(basePath) != 0) {
+ uasserted(6113319,
+ str::stream() << "The file " << path << " is not a subdirectory of " << basePath);
+ }
+
+ auto result = path.substr(basePath.size());
+ // Skip separators at the beginning of the relative part.
+ if (!result.empty() && (result[0] == '/' || result[0] == '\\')) {
+ result.erase(result.begin());
+ }
+
+ std::replace(result.begin(), result.end(), '\\', '/');
+ return result;
+}
+
+/**
+ * Makes a connection to the provided 'source'.
+ */
+Status connect(const HostAndPort& source, DBClientConnection* client) {
+ Status status = client->connect(source, "TenantFileCloner", boost::none);
+ if (!status.isOK())
+ return status;
+ return replAuthenticate(client).withContext(str::stream()
+ << "Failed to authenticate to " << source);
+}
} // namespace
+Status cloneFiles(OperationContext* opCtx,
+ std::vector<InsertStatement>::const_iterator first,
+ std::vector<InsertStatement>::const_iterator last) {
+
+ std::unique_ptr<DBClientConnection> client;
+ std::unique_ptr<TenantMigrationSharedData> sharedData;
+ auto writerPool =
+ makeReplWriterPool(tenantApplierThreadCount, "TenantMigrationFileClonerWriter"_sd);
+
+ ON_BLOCK_EXIT([&] {
+ client->shutdownAndDisallowReconnect();
+
+ writerPool->shutdown();
+ writerPool->join();
+ });
+
+ for (auto it = first; it != last; it++) {
+ const auto& metadataDoc = it->doc;
+ auto fileName = metadataDoc["filename"].str();
+ auto migrationId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kMigrationIdFieldName])));
+
+ LOGV2_DEBUG(6113320, 1, "Attempting to clone file", "metadata"_attr = metadataDoc);
+
+ auto backupId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kBackupIdFieldName])));
+ auto remoteDbpath = metadataDoc["remoteDbpath"].str();
+ size_t fileSize = metadataDoc["fileSize"].safeNumberLong();
+ auto relativePath = _getPathRelativeTo(fileName, metadataDoc[kDonorDbPathFieldName].str());
+ invariant(!relativePath.empty());
+
+ // Connect the client.
+ if (!client) {
+ auto donor = HostAndPort::parseThrowing(metadataDoc[kDonorFieldName].str());
+ client = std::make_unique<DBClientConnection>(true /* autoReconnect */);
+ uassertStatusOK(connect(donor, client.get()));
+ }
+
+ if (!sharedData) {
+ sharedData = std::make_unique<TenantMigrationSharedData>(
+ getGlobalServiceContext()->getFastClockSource(), migrationId);
+ }
+
+ auto currentBackupFileCloner = std::make_unique<TenantFileCloner>(
+ backupId,
+ migrationId,
+ fileName,
+ fileSize,
+ relativePath,
+ sharedData.get(),
+ client->getServerHostAndPort(),
+ client.get(),
+ repl::StorageInterface::get(cc().getServiceContext()),
+ writerPool.get());
+
+ auto cloneStatus = currentBackupFileCloner->run();
+ if (!cloneStatus.isOK()) {
+ LOGV2_WARNING(6113321,
+ "Failed to clone file ",
+ "migrationUUID"_attr = migrationId,
+ "fileName"_attr = fileName,
+ "error"_attr = cloneStatus);
+ return cloneStatus;
+ } else {
+ LOGV2_DEBUG(6113322,
+ 1,
+ "Successfully cloned file",
+ "migrationUUID"_attr = migrationId,
+ "fileName"_attr = fileName);
+ }
+ }
+ return Status::OK();
+}
+
void wiredTigerImportFromBackupCursor(OperationContext* opCtx,
const std::vector<CollectionImportMetadata>& metadatas,
const std::string& importPath) {
@@ -194,4 +300,4 @@ SemiFuture<void> keepBackupCursorAlive(CancellationSource cancellationSource,
.onCompletion([](auto&&) {})
.semi();
}
-} // namespace mongo::repl
+} // namespace mongo::repl::shard_merge_utils
diff --git a/src/mongo/db/repl/tenant_migration_shard_merge_util.h b/src/mongo/db/repl/tenant_migration_shard_merge_util.h
index 9c220d01088..3ba8318b8bf 100644
--- a/src/mongo/db/repl/tenant_migration_shard_merge_util.h
+++ b/src/mongo/db/repl/tenant_migration_shard_merge_util.h
@@ -30,15 +30,85 @@
#include <string>
#include <vector>
+
+#include <boost/filesystem/operations.hpp>
+#include <fmt/format.h>
+
#include "mongo/client/dbclient_connection.h"
#include "mongo/db/cursor_id.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/oplog.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_import.h"
#include "mongo/executor/scoped_task_executor.h"
#include "mongo/util/cancellation.h"
-namespace mongo::repl {
+namespace mongo::repl::shard_merge_utils {
+
+inline constexpr StringData kDonatedFilesPrefix = "donatedFiles."_sd;
+inline constexpr StringData kMigrationTmpDirPrefix = "migrationTmpFiles"_sd;
+inline constexpr StringData kMigrationIdFieldName = "migrationId"_sd;
+inline constexpr StringData kBackupIdFieldName = "backupId"_sd;
+inline constexpr StringData kDonorFieldName = "donor"_sd;
+inline constexpr StringData kDonorDbPathFieldName = "dbpath"_sd;
+
+inline bool isDonatedFilesCollection(const NamespaceString& ns) {
+ return ns.isConfigDB() && ns.coll().startsWith(kDonatedFilesPrefix);
+}
+
+inline NamespaceString getDonatedFilesNs(const UUID& migrationUUID) {
+ return NamespaceString(NamespaceString::kConfigDb,
+ kDonatedFilesPrefix + migrationUUID.toString());
+}
+
+inline boost::filesystem::path fileClonerTempDir(const UUID& migrationId) {
+ return boost::filesystem::path(storageGlobalParams.dbpath) /
+ fmt::format("{}.{}", kMigrationTmpDirPrefix.toString(), migrationId.toString());
+}
+
+/**
+ * Represents the document structure of config.donatedFiles_<MigrationUUID> collection.
+ */
+struct MetadataInfo {
+ explicit MetadataInfo(const UUID& backupId,
+ const UUID& migrationId,
+ const std::string& donor,
+ const std::string& donorDbPath)
+ : backupId(backupId), migrationId(migrationId), donor(donor), donorDbPath(donorDbPath) {}
+ UUID backupId;
+ UUID migrationId;
+ std::string donor;
+ std::string donorDbPath;
+
+ static MetadataInfo constructMetadataInfo(const UUID& migrationId,
+ const std::string& donor,
+ const BSONObj& obj) {
+ auto backupId = UUID(uassertStatusOK(UUID::parse(obj[kBackupIdFieldName])));
+ auto donorDbPath = obj[kDonorDbPathFieldName].String();
+ return MetadataInfo{backupId, migrationId, donor, donorDbPath};
+ }
+
+ BSONObj toBSON(const BSONObj& extraFields) const {
+ BSONObjBuilder bob;
+
+ migrationId.appendToBuilder(&bob, kMigrationIdFieldName);
+ backupId.appendToBuilder(&bob, kBackupIdFieldName);
+ bob.append(kDonorFieldName, donor);
+ bob.append(kDonorDbPathFieldName, donorDbPath);
+ bob.append("_id", OID::gen());
+ bob.appendElements(extraFields);
+
+ return bob.obj();
+ }
+};
+
+/**
+ * Uses the TenantFileCloner to copy the files from the donor.
+ */
+Status cloneFiles(OperationContext* opCtx,
+ std::vector<InsertStatement>::const_iterator first,
+ std::vector<InsertStatement>::const_iterator last);
+
/**
* After calling wiredTigerRollbackToStableAndGetMetadata, use this function to import files.
*/
@@ -51,4 +121,4 @@ SemiFuture<void> keepBackupCursorAlive(CancellationSource cancellationSource,
HostAndPort hostAndPort,
CursorId cursorId,
NamespaceString namespaceString);
-} // namespace mongo::repl
+} // namespace mongo::repl::shard_merge_utils