diff options
author | Suganthi Mani <suganthi.mani@mongodb.com> | 2022-02-03 06:42:19 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-02-03 07:47:41 +0000 |
commit | ec97c7f137a73faa8579c5616e364b86e6cf56c8 (patch) | |
tree | ddcd276d6c2bf878f8cb9cc830b3a33af21f26a4 | |
parent | 71287a3a8f033923ca9c2735c72da4460fbbf06d (diff) | |
download | mongo-ec97c7f137a73faa8579c5616e364b86e6cf56c8.tar.gz |
SERVER-61133 Copy donor files to temp directory.
16 files changed, 1072 insertions, 165 deletions
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_ese.yml b/buildscripts/resmokeconfig/suites/replica_sets_ese.yml index 6c3eb45f26b..ceb21aa094e 100644 --- a/buildscripts/resmokeconfig/suites/replica_sets_ese.yml +++ b/buildscripts/resmokeconfig/suites/replica_sets_ese.yml @@ -11,6 +11,8 @@ selector: exclude_with_any_tags: - does_not_support_encrypted_storage_engine - disabled_due_to_server_61671 + # Shard merge protocol won't work with encrypted storage engines. + - serverless executor: config: shell_options: diff --git a/buildscripts/resmokeconfig/suites/replica_sets_ese_gcm.yml b/buildscripts/resmokeconfig/suites/replica_sets_ese_gcm.yml index 356d3cda0af..a333024c529 100644 --- a/buildscripts/resmokeconfig/suites/replica_sets_ese_gcm.yml +++ b/buildscripts/resmokeconfig/suites/replica_sets_ese_gcm.yml @@ -11,6 +11,8 @@ selector: exclude_with_any_tags: - does_not_support_encrypted_storage_engine - disabled_due_to_server_61671 + # Shard merge protocol won't work with encrypted storage engines. + - serverless executor: config: shell_options: diff --git a/jstests/replsets/libs/tenant_migration_test.js b/jstests/replsets/libs/tenant_migration_test.js index 0e70ca11dbc..0e8ac919704 100644 --- a/jstests/replsets/libs/tenant_migration_test.js +++ b/jstests/replsets/libs/tenant_migration_test.js @@ -306,7 +306,14 @@ function TenantMigrationTest({ recipientNodes.forEach(node => { const configRecipientsColl = node.getCollection(TenantMigrationTest.kConfigRecipientsNS); - assert.soon(() => 0 === configRecipientsColl.count({_id: migrationId})); + assert.soon(() => 0 === configRecipientsColl.count({_id: migrationId}), tojson(node)); + + let mtab; + assert.soon(() => { + mtab = + this.getTenantMigrationAccessBlocker({recipientNode: node, tenantId: tenantId}); + return !mtab; + }, tojson(mtab)); }); }; @@ -350,6 +357,11 @@ function TenantMigrationTest({ return (mtab.donor.state === expectedAccessState); }; + function buildErrorMsg( + migrationId, expectedState, expectedAccessState, configDoc, recipientMtab) { + return tojson({migrationId, expectedState, expectedAccessState, configDoc, recipientMtab}); + } + /** * Asserts that the migration 'migrationId' and 'tenantId' eventually goes to the expected state * on all the given recipient nodes. @@ -357,8 +369,21 @@ function TenantMigrationTest({ this.waitForRecipientNodesToReachState = function( nodes, migrationId, tenantId, expectedState, expectedAccessState) { nodes.forEach(node => { - assert.soon(() => this.isRecipientNodeInExpectedState( - node, migrationId, tenantId, expectedState, expectedAccessState)); + let result = {}; + assert.soon( + () => { + result = this.isRecipientNodeInExpectedState( + node, migrationId, tenantId, expectedState, expectedAccessState); + return result.value; + }, + () => { + return "waitForRecipientNodesToReachState failed: " + + buildErrorMsg(migrationId, + expectedState, + expectedAccessState, + result.configDoc, + result.recipientMtab); + }); }); }; @@ -369,8 +394,16 @@ function TenantMigrationTest({ this.assertRecipientNodesInExpectedState = function( nodes, migrationId, tenantId, expectedState, expectedAccessState) { nodes.forEach(node => { - assert(this.isRecipientNodeInExpectedState( - node, migrationId, tenantId, expectedState, expectedAccessState)); + let result = this.isRecipientNodeInExpectedState( + node, migrationId, tenantId, expectedState, expectedAccessState); + assert(result.value, () => { + return "assertRecipientNodesInExpectedState failed: " + + buildErrorMsg(migrationId, + expectedState, + expectedAccessState, + result.configDoc, + result.recipientMtab); + }); }); }; @@ -383,12 +416,16 @@ function TenantMigrationTest({ const configRecipientsColl = this.getRecipientPrimary().getCollection("config.tenantMigrationRecipients"); const configDoc = configRecipientsColl.findOne({_id: migrationId}); - if (!configDoc || configDoc.state !== expectedState) { - return false; - } - const mtab = this.getTenantMigrationAccessBlocker({recipientNode: node, tenantId}); - return (mtab.recipient.state === expectedAccessState); + + let checkStates = () => { + if (!configDoc || configDoc.state !== expectedState) { + return false; + } + return (mtab.recipient.state === expectedAccessState); + }; + + return {value: checkStates(), configDoc: configDoc, recipientMtab: mtab.recipient}; }; function loadDummyData() { diff --git a/jstests/replsets/tenant_migration_concurrent_writes_on_donor.js b/jstests/replsets/tenant_migration_concurrent_writes_on_donor.js index 7ea82a20796..8435731f460 100644 --- a/jstests/replsets/tenant_migration_concurrent_writes_on_donor.js +++ b/jstests/replsets/tenant_migration_concurrent_writes_on_donor.js @@ -31,8 +31,8 @@ const tenantMigrationTest = new TenantMigrationTest({ }); const donorRst = tenantMigrationTest.getDonorRst(); +const donorPrimary = donorRst.getPrimary(); -const primary = donorRst.getPrimary(); const kCollName = "testColl"; const kTenantDefinedDbName = "0"; @@ -205,6 +205,16 @@ function makeTestOptions( }; } +function cleanUp(dbName) { + const donorDB = donorPrimary.getDB(dbName); + + assert.commandWorked(donorDB.dropDatabase()); + donorRst.awaitLastOpCommitted(); + // TODO SERVER-62934: Remove this fsync once we run fsync command before the migration + // start for shard merge protocol. + assert.commandWorked(donorPrimary.adminCommand({fsync: 1})); +} + function runTest( primary, testCase, testFunc, dbName, collName, {testInTransaction, testAsRetryableWrite} = {}) { const testOpts = makeTestOptions( @@ -220,6 +230,9 @@ function runTest( } testFunc(testCase, testOpts); + + // This cleanup step is necessary for the shard merge protocol to work correctly. + cleanUp(dbName); } function runCommand(testOpts, expectedError) { @@ -1015,11 +1028,14 @@ for (const [testName, testFunc] of Object.entries(testFuncs)) { continue; } - runTest( - primary, testCase, testFunc, baseDbName + "Basic_" + kTenantDefinedDbName, kCollName); + runTest(donorPrimary, + testCase, + testFunc, + baseDbName + "Basic_" + kTenantDefinedDbName, + kCollName); if (testCase.testInTransaction) { - runTest(primary, + runTest(donorPrimary, testCase, testFunc, baseDbName + "Txn_" + kTenantDefinedDbName, @@ -1028,7 +1044,7 @@ for (const [testName, testFunc] of Object.entries(testFuncs)) { } if (testCase.testAsRetryableWrite) { - runTest(primary, + runTest(donorPrimary, testCase, testFunc, baseDbName + "Retryable_" + kTenantDefinedDbName, diff --git a/jstests/replsets/tenant_migration_file_import.js b/jstests/replsets/tenant_migration_file_import.js index 75770b9a9c7..c67e529c589 100644 --- a/jstests/replsets/tenant_migration_file_import.js +++ b/jstests/replsets/tenant_migration_file_import.js @@ -2,8 +2,6 @@ * Test the shard merge rollback-to-stable algorithm. This test was written before we implemented * file copy, so the script opens a backup cursor and copies files itself. * - * TODO (SERVER-61133): Adapt or delete this test once file copy works. - * * @tags: [ * does_not_support_encrypted_storage_engine, * featureFlagShardMerge, @@ -31,12 +29,19 @@ const migrationId = UUID(); const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); const donorPrimary = tenantMigrationTest.getDonorPrimary(); const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); + +if (!TenantMigrationUtil.isShardMergeEnabled(recipientPrimary.getDB("admin"))) { + tenantMigrationTest.stop(); + jsTestLog("Skipping Shard Merge-specific test"); + return; +} + const kDataDir = `${recipientPrimary.dbpath}/migrationTmpFiles.${extractUUIDFromObject(migrationId)}`; assert.eq(runNonMongoProgram("mkdir", "-p", kDataDir), 0); (function() { -jsTestLog("Generate test data: open a backup cursor on the donor and copy files"); +jsTestLog("Generate test data"); const db = donorPrimary.getDB("myDatabase"); const collection = db["myCollection"]; @@ -54,35 +59,6 @@ assert.commandWorked(db.runCommand({ // Ensure our new collections appear in the backup cursor's checkpoint. assert.commandWorked(db.adminCommand({fsync: 1})); - -const reply = assert.commandWorked( - donorPrimary.adminCommand({aggregate: 1, cursor: {}, pipeline: [{"$backupCursor": {}}]})); -const cursor = reply.cursor; - -jsTestLog(`Backup cursor metadata: ${tojson(cursor.firstBatch[0].metadata)}`); -jsTestLog("Copy files to local data dir"); -for (let f of cursor.firstBatch) { - if (!f.hasOwnProperty("filename")) { - continue; - } - - assert(f.filename.startsWith(donorPrimary.dbpath)); - const suffix = f.filename.slice(donorPrimary.dbpath.length); - - /* - * Create directories as needed, e.g. copy - * /data/db/job0/mongorunner/test-0/journal/WiredTigerLog.01 to - * /data/db/job0/mongorunner/test-1/migrationTmpFiles.migrationId/journal/WiredTigerLog.01, - * by passing "--relative /data/db/job0/mongorunner/test-0/./journal/WiredTigerLog.01". - * Note the "/./" marker. - */ - assert.eq(runNonMongoProgram( - "rsync", "-a", "--relative", `${donorPrimary.dbpath}/.${suffix}`, kDataDir), - 0); -} - -jsTestLog("Kill backup cursor"); -donorPrimary.adminCommand({killCursors: "$cmd.aggregate", cursors: [cursor.id]}); })(); jsTestLog("Run migration"); @@ -94,6 +70,9 @@ const migrationOpts = { tenantId: kTenantId, }; TenantMigrationTest.assertCommitted(tenantMigrationTest.runMigration(migrationOpts)); + +// TODO SERVER-61144: Check on all recipient nodes that the collection documents got imported +// successfully. for (let collectionName of ["myCollection", "myCappedCollection"]) { jsTestLog(`Checking ${collectionName}`); // Use "countDocuments" to check actual docs, "count" to check sizeStorer data. diff --git a/jstests/replsets/tenant_migration_recipient_shard_merge.js b/jstests/replsets/tenant_migration_recipient_shard_merge_learn_files.js index 5e1d3e85c70..8be76547d73 100644 --- a/jstests/replsets/tenant_migration_recipient_shard_merge.js +++ b/jstests/replsets/tenant_migration_recipient_shard_merge_learn_files.js @@ -1,5 +1,5 @@ /** - * Tests recipient behavior for shard merge + * Tests that recipient is able to learn files to be imported from donor for shard merge protocol. * * @tags: [ * incompatible_with_eft, @@ -8,6 +8,7 @@ * requires_majority_read_concern, * requires_persistence, * serverless, + * featureFlagShardMerge, * ] */ @@ -41,15 +42,22 @@ load("jstests/replsets/libs/tenant_migration_util.js"); const donorPrimary = tenantMigrationTest.getDonorPrimary(); const donorSecondary = donorRst.getSecondary(); + // Do a majority write. tenantMigrationTest.insertDonorDB(tenantDB, collName); + // Ensure our new collections appear in the backup cursor's checkpoint. + assert.commandWorked(donorPrimary.adminCommand({fsync: 1})); + const failpoint = "fpAfterRetrievingStartOpTimesMigrationRecipientInstance"; const waitInFailPoint = configureFailPoint(recipientPrimary, failpoint, {action: "hang"}); + // In order to prevent the copying of "testTenantId" databases via logical cloning from donor to + // recipient, start migration on a tenant id which is non-existent on the donor. const migrationUuid = UUID(); + const kDummyTenantId = "nonExistentTenantId"; const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationUuid), - tenantId, + tenantId: kDummyTenantId, readPreference: {mode: 'primary'} }; @@ -58,16 +66,28 @@ load("jstests/replsets/libs/tenant_migration_util.js"); waitInFailPoint.wait(); - const res = - recipientPrimary.adminCommand({currentOp: true, desc: "tenant recipient migration"}); - assert.eq(res.inprog.length, 1); - const [currOp] = res.inprog; - assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kLearnedFilenames, res); + tenantMigrationTest.assertRecipientNodesInExpectedState( + tenantMigrationTest.getRecipientRst().nodes, + migrationUuid, + kDummyTenantId, + TenantMigrationTest.RecipientState.kLearnedFilenames, + TenantMigrationTest.RecipientAccessState.kReject); + waitInFailPoint.off(); TenantMigrationTest.assertCommitted( tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); + // TODO SERVER-61144: Check on all recipient nodes that the collection documents got imported + // successfully. + // Use "countDocuments" to check actual docs, "count" to check sizeStorer data. + assert.eq(donorPrimary.getDB(tenantDB)[collName].countDocuments({}), + recipientPrimary.getDB(tenantDB)[collName].countDocuments({}), + "countDocuments"); + assert.eq(donorPrimary.getDB(tenantDB)[collName].count(), + recipientPrimary.getDB(tenantDB)[collName].count(), + "count"); + tenantMigrationTest.stop(); })(); })(); diff --git a/jstests/replsets/shard_merge_import_write_conflict_retry.js b/jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js index da4dc5cc220..6746b2d7935 100644 --- a/jstests/replsets/shard_merge_import_write_conflict_retry.js +++ b/jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js @@ -3,8 +3,6 @@ * collections. We will get WriteConflict exception if we try to import the files with timestamp * older than the stable timestamp. * - * TODO (SERVER-61133): Adapt or delete this test once file copy works. - * * @tags: [ * does_not_support_encrypted_storage_engine, * featureFlagShardMerge, @@ -16,6 +14,7 @@ * requires_replication, * requires_persistence, * requires_wiredtiger, + * serverless, * ] */ (function() { @@ -32,12 +31,19 @@ const migrationId = UUID(); const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); const donorPrimary = tenantMigrationTest.getDonorPrimary(); const recipientPrimary = tenantMigrationTest.getRecipientPrimary(); + +if (!TenantMigrationUtil.isShardMergeEnabled(recipientPrimary.getDB("admin"))) { + tenantMigrationTest.stop(); + jsTestLog("Skipping Shard Merge-specific test"); + return; +} + const kDataDir = `${recipientPrimary.dbpath}/migrationTmpFiles.${extractUUIDFromObject(migrationId)}`; assert.eq(runNonMongoProgram("mkdir", "-p", kDataDir), 0); (function() { -jsTestLog("Generate test data: open a backup cursor on the donor and copy files"); +jsTestLog("Generate test data"); const db = donorPrimary.getDB("myDatabase"); const collection = db["myCollection"]; @@ -55,35 +61,6 @@ assert.commandWorked(db.runCommand({ // Ensure our new collections appear in the backup cursor's checkpoint. assert.commandWorked(db.adminCommand({fsync: 1})); - -const reply = assert.commandWorked( - donorPrimary.adminCommand({aggregate: 1, cursor: {}, pipeline: [{"$backupCursor": {}}]})); -const cursor = reply.cursor; - -jsTestLog(`Backup cursor metadata: ${tojson(cursor.firstBatch[0].metadata)}`); -jsTestLog("Copy files to local data dir"); -for (let f of cursor.firstBatch) { - if (!f.hasOwnProperty("filename")) { - continue; - } - - assert(f.filename.startsWith(donorPrimary.dbpath)); - const suffix = f.filename.slice(donorPrimary.dbpath.length); - - /* - * Create directories as needed, e.g. copy - * /data/db/job0/mongorunner/test-0/journal/WiredTigerLog.01 to - * /data/db/job0/mongorunner/test-1/migrationTmpFiles.migrationId/journal/WiredTigerLog.01, - * by passing "--relative /data/db/job0/mongorunner/test-0/./journal/WiredTigerLog.01". - * Note the "/./" marker. - */ - assert.eq(runNonMongoProgram( - "rsync", "-a", "--relative", `${donorPrimary.dbpath}/.${suffix}`, kDataDir), - 0); -} - -jsTestLog("Kill backup cursor"); -donorPrimary.adminCommand({killCursors: "$cmd.aggregate", cursors: [cursor.id]}); })(); // Enable Failpoints to simulate WriteConflict exception while importing donor files. @@ -101,6 +78,9 @@ const migrationOpts = { tenantId: kTenantId, }; TenantMigrationTest.assertCommitted(tenantMigrationTest.runMigration(migrationOpts)); + +// TODO SERVER-61144: Check on all recipient nodes that the collection documents got imported +// successfully. for (let collectionName of ["myCollection", "myCappedCollection"]) { jsTestLog(`Checking ${collectionName}`); // Use "countDocuments" to check actual docs, "count" to check sizeStorer data. diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index b4c4fc98b68..0df82726b10 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -1347,7 +1347,9 @@ env.Library( env.Library( target='tenant_migration_recipient_service', source=[ + 'tenant_file_cloner.cpp', 'tenant_migration_recipient_coordinator.cpp', + 'tenant_migration_recipient_op_observer.cpp', 'tenant_migration_recipient_service.cpp', 'tenant_migration_shard_merge_util.cpp', 'vote_commit_migration_progress.cpp', @@ -1372,6 +1374,7 @@ env.Library( '$BUILD_DIR/mongo/db/transaction', 'cloner_utils', 'oplog', + 'oplog_application_interface', 'oplog_buffer_collection', 'oplog_entry', 'oplog_fetcher', @@ -1391,9 +1394,7 @@ env.Library( 'tenant_migration_access_blocker_server_status_section.cpp', 'tenant_migration_access_blocker_util.cpp', 'tenant_migration_donor_access_blocker.cpp', - 'tenant_migration_donor_op_observer.cpp', 'tenant_migration_recipient_access_blocker.cpp', - 'tenant_migration_recipient_op_observer.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', @@ -1421,6 +1422,7 @@ env.Library( env.Library( target='tenant_migration_donor_service', source=[ + 'tenant_migration_donor_op_observer.cpp', 'tenant_migration_donor_service.cpp', ], LIBDEPS=[ diff --git a/src/mongo/db/repl/tenant_file_cloner.cpp b/src/mongo/db/repl/tenant_file_cloner.cpp new file mode 100644 index 00000000000..6a683ddc03e --- /dev/null +++ b/src/mongo/db/repl/tenant_file_cloner.cpp @@ -0,0 +1,367 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTenantMigration + +#include "mongo/platform/basic.h" + +#include <fstream> + +#include "mongo/base/string_data.h" +#include "mongo/db/pipeline/aggregate_command_gen.h" +#include "mongo/db/pipeline/aggregation_request_helper.h" +#include "mongo/db/repl/repl_server_parameters_gen.h" +#include "mongo/db/repl/tenant_file_cloner.h" +#include "mongo/db/repl/tenant_migration_shard_merge_util.h" +#include "mongo/logv2/log.h" + +#include "mongo/util/assert_util.h" + +namespace mongo::repl { + +// TODO SERVER-63119: Verify if we need the below failpoints for TenantFileCloner unit testing. +// Failpoint which causes the file cloner to hang after handling the next batch of results +// from the DBClientConnection, optionally limited to a specific file name. +MONGO_FAIL_POINT_DEFINE(TenantFileClonerHangAfterHandlingBatchResponse); +MONGO_FAIL_POINT_DEFINE(TenantFileClonerHangDuringFileCloneBackup); +MONGO_FAIL_POINT_DEFINE(TenantFileClonerDisableExhaust); +TenantFileCloner::TenantFileCloner(const UUID& backupId, + const UUID& migrationId, + const std::string& remoteFileName, + size_t remoteFileSize, + const std::string& relativePath, + TenantMigrationSharedData* sharedData, + const HostAndPort& source, + DBClientConnection* client, + StorageInterface* storageInterface, + ThreadPool* dbPool) + : TenantBaseCloner("TenantFileCloner"_sd, sharedData, source, client, storageInterface, dbPool), + _backupId(backupId), + _migrationId(migrationId), + _remoteFileName(remoteFileName), + _remoteFileSize(remoteFileSize), + _relativePathString(relativePath), + _queryStage("query", this, &TenantFileCloner::queryStage), + _fsWorkTaskRunner(dbPool), + _scheduleFsWorkFn([this](executor::TaskExecutor::CallbackFn work) { + auto task = [ this, work = std::move(work) ]( + OperationContext * opCtx, + const Status& status) mutable noexcept->TaskRunner::NextAction { + try { + work(executor::TaskExecutor::CallbackArgs(nullptr, {}, status, opCtx)); + } catch (const DBException& e) { + setSyncFailedStatus(e.toStatus()); + } + return TaskRunner::NextAction::kDisposeOperationContext; + }; + _fsWorkTaskRunner.schedule(std::move(task)); + return executor::TaskExecutor::CallbackHandle(); + }), + _progressMeter(remoteFileSize, + kProgressMeterSecondsBetween, + kProgressMeterCheckInterval, + "bytes copied", + str::stream() << _remoteFileName << " Tenant migration file clone progress") { + _stats.filePath = _relativePathString; + _stats.fileSize = _remoteFileSize; +} + +BaseCloner::ClonerStages TenantFileCloner::getStages() { + return {&_queryStage}; +} + +void TenantFileCloner::preStage() { + stdx::lock_guard<Latch> lk(_mutex); + _stats.start = getSharedData()->getClock()->now(); + + // Construct local path name from the relative path and the temp dbpath. + boost::filesystem::path relativePath(_relativePathString); + uassert(6113300, + str::stream() << "Path " << _relativePathString << " should be a relative path", + relativePath.is_relative()); + + auto syncTargetTempDBPath = shard_merge_utils::fileClonerTempDir(_migrationId); + _localFilePath = syncTargetTempDBPath; + + _localFilePath /= relativePath; + _localFilePath = _localFilePath.lexically_normal(); + uassert(6113301, + str::stream() << "Path " << _relativePathString + << " must not escape its parent directory.", + StringData(_localFilePath.generic_string()) + .startsWith(syncTargetTempDBPath.generic_string())); + + // Create and open files and any parent directories. + if (boost::filesystem::exists(_localFilePath)) { + LOGV2(6113302, + "Local file exists at start of TenantFileCloner; truncating.", + "localFilePath"_attr = _localFilePath.string()); + } else { + auto localFileDir = _localFilePath.parent_path(); + boost::system::error_code ec; + boost::filesystem::create_directories(localFileDir, ec); + uassert(6113303, + str::stream() << "Failed to create directory " << localFileDir.string() << " Error " + << ec.message(), + !ec); + } + _localFile.open(_localFilePath.string(), + std::ios_base::out | std::ios_base::binary | std::ios_base::trunc); + uassert(ErrorCodes::FileOpenFailed, + str::stream() << "Failed to open file " << _localFilePath.string(), + !_localFile.fail()); + _fileOffset = 0; +} + +void TenantFileCloner::postStage() { + _localFile.close(); + stdx::lock_guard<Latch> lk(_mutex); + _stats.end = getSharedData()->getClock()->now(); +} + +BaseCloner::AfterStageBehavior TenantFileCloner::queryStage() { + // Since the query stage may be re-started, we need to make sure all the file system work + // from the previous run is done before running the query again. + waitForFilesystemWorkToComplete(); + _sawEof = false; + runQuery(); + waitForFilesystemWorkToComplete(); + uassert( + 6113304, + str::stream() + << "Received entire file, but did not get end of file marker. File may be incomplete " + << _localFilePath.string(), + _sawEof); + return kContinueNormally; +} + +size_t TenantFileCloner::getFileOffset() { + stdx::lock_guard<Latch> lk(_mutex); + return _fileOffset; +} + +void TenantFileCloner::runQuery() { + auto backupFileStage = BSON( + "$_backupFile" << BSON("backupId" << _backupId << "file" << _remoteFileName << "byteOffset" + << static_cast<int64_t>(getFileOffset()))); + AggregateCommandRequest aggRequest( + NamespaceString::makeCollectionlessAggregateNSS(NamespaceString::kAdminDb), + {backupFileStage}); + aggRequest.setReadConcern(ReadConcernArgs::kImplicitDefault); + aggRequest.setWriteConcern(WriteConcernOptions()); + + LOGV2_DEBUG(6113305, + 2, + "TenantFileCloner running aggregation", + "source"_attr = getSource(), + "aggRequest"_attr = aggregation_request_helper::serializeToCommandObj(aggRequest)); + const bool useExhaust = !MONGO_unlikely(TenantFileClonerDisableExhaust.shouldFail()); + std::unique_ptr<DBClientCursor> cursor = uassertStatusOK(DBClientCursor::fromAggregationRequest( + getClient(), std::move(aggRequest), true /* secondaryOk */, useExhaust)); + try { + while (cursor->more()) { + DBClientCursorBatchIterator iter(*cursor); + handleNextBatch(iter); + } + } catch (const DBException& e) { + // We cannot continue after an error when processing exhaust cursors. Instead we must + // reconnect, which is handled by the BaseCloner. + LOGV2_DEBUG(6113306, + 1, + "TenantFileCloner received an exception while downloading data", + "error"_attr = e.toStatus(), + "source"_attr = getSource(), + "backupId"_attr = _backupId, + "remoteFile"_attr = _remoteFileName, + "fileOffset"_attr = getFileOffset()); + getClient()->shutdown(); + throw; + } +} + +void TenantFileCloner::handleNextBatch(DBClientCursorBatchIterator& iter) { + LOGV2_DEBUG(6113307, + 3, + "TenantFileCloner handleNextBatch", + "source"_attr = getSource(), + "backupId"_attr = _backupId, + "remoteFile"_attr = _remoteFileName, + "fileOffset"_attr = getFileOffset(), + "moreInCurrentBatch"_attr = iter.moreInCurrentBatch()); + { + stdx::lock_guard<TenantMigrationSharedData> lk(*getSharedData()); + if (!getSharedData()->getStatus(lk).isOK()) { + static constexpr char message[] = "BackupFile cloning cancelled due to cloning failure"; + LOGV2(6113323, message, "error"_attr = getSharedData()->getStatus(lk)); + uasserted(ErrorCodes::CallbackCanceled, + str::stream() << message << ": " << getSharedData()->getStatus(lk)); + } + } + while (iter.moreInCurrentBatch()) { + stdx::lock_guard<Latch> lk(_mutex); + _stats.receivedBatches++; + while (iter.moreInCurrentBatch()) { + _dataToWrite.emplace_back(iter.nextSafe()); + } + } + + // Schedule the next set of writes. + auto&& scheduleResult = _scheduleFsWorkFn([=](const executor::TaskExecutor::CallbackArgs& cbd) { + writeDataToFilesystemCallback(cbd); + }); + + if (!scheduleResult.isOK()) { + Status newStatus = scheduleResult.getStatus().withContext( + str::stream() << "Error copying file '" << _remoteFileName << "'"); + // We must throw an exception to terminate query. + uassertStatusOK(newStatus); + } + + TenantFileClonerHangAfterHandlingBatchResponse.executeIf( + [&](const BSONObj&) { + while (MONGO_unlikely(TenantFileClonerHangAfterHandlingBatchResponse.shouldFail()) && + !mustExit()) { + LOGV2(6113308, + "TenantFileClonerHangAfterHandlingBatchResponse fail point " + "enabled. Blocking until fail point is disabled", + "remoteFile"_attr = _remoteFileName); + mongo::sleepmillis(100); + } + }, + [&](const BSONObj& data) { + // Only hang when copying the specified file, or if no file was specified. + auto filename = data["remoteFile"].str(); + return filename.empty() || filename == _remoteFileName; + }); +} + +void TenantFileCloner::writeDataToFilesystemCallback( + const executor::TaskExecutor::CallbackArgs& cbd) { + LOGV2_DEBUG(6113309, + 3, + "TenantFileCloner writeDataToFilesystemCallback", + "backupId"_attr = _backupId, + "remoteFile"_attr = _remoteFileName, + "localFile"_attr = _localFilePath.string(), + "fileOffset"_attr = getFileOffset()); + uassertStatusOK(cbd.status); + { + stdx::lock_guard<Latch> lk(_mutex); + if (_dataToWrite.size() == 0) { + LOGV2_WARNING(6113310, + "writeDataToFilesystemCallback, but no data to write", + "remoteFile"_attr = _remoteFileName); + } + for (const auto& doc : _dataToWrite) { + uassert(6113311, + str::stream() << "Saw multiple end-of-file-markers in file " << _remoteFileName, + !_sawEof); + // Received file data should always be in sync with the stream and where we think + // our next input should be coming from. + const auto byteOffset = doc["byteOffset"].safeNumberLong(); + invariant(byteOffset == _localFile.tellp()); + invariant(byteOffset == _fileOffset); + const auto& dataElem = doc["data"]; + uassert(6113312, + str::stream() << "Expected file data to be type BinDataGeneral. " << doc, + dataElem.type() == BinData && dataElem.binDataType() == BinDataGeneral); + int dataLength; + auto data = dataElem.binData(dataLength); + _localFile.write(data, dataLength); + uassert(ErrorCodes::FileStreamFailed, + str::stream() << "Unable to write file data for file " << _remoteFileName + << " at offset " << _fileOffset, + !_localFile.fail()); + _progressMeter.hit(dataLength); + _fileOffset += dataLength; + _stats.bytesCopied += dataLength; + _sawEof = doc["endOfFile"].booleanSafe(); + } + _dataToWrite.clear(); + _stats.writtenBatches++; + } + + TenantFileClonerHangDuringFileCloneBackup.executeIf( + [&](const BSONObj&) { + LOGV2(6113313, + "TenantFileClonerHangDuringFileCloneBackup fail point " + "enabled. Blocking until fail point is disabled"); + while (MONGO_unlikely(TenantFileClonerHangDuringFileCloneBackup.shouldFail()) && + !mustExit()) { + mongo::sleepmillis(100); + } + }, + [&](const BSONObj& data) { + return (data["remoteFile"].eoo() || data["remoteFile"].str() == _remoteFileName) && + (data["fileOffset"].eoo() || data["fileOffset"].safeNumberLong() <= _fileOffset); + }); +} + +bool TenantFileCloner::isMyFailPoint(const BSONObj& data) const { + auto remoteFile = data["remoteFile"].str(); + return (remoteFile.empty() || remoteFile == _remoteFileName) && BaseCloner::isMyFailPoint(data); +} + +void TenantFileCloner::waitForFilesystemWorkToComplete() { + _fsWorkTaskRunner.join(); +} + +TenantFileCloner::Stats TenantFileCloner::getStats() const { + stdx::lock_guard<Latch> lk(_mutex); + return _stats; +} + +std::string TenantFileCloner::Stats::toString() const { + return toBSON().toString(); +} + +BSONObj TenantFileCloner::Stats::toBSON() const { + BSONObjBuilder bob; + append(&bob); + return bob.obj(); +} + +void TenantFileCloner::Stats::append(BSONObjBuilder* builder) const { + builder->append("filePath", filePath); + builder->appendNumber("fileSize", static_cast<long long>(fileSize)); + builder->appendNumber("bytesCopied", static_cast<long long>(bytesCopied)); + if (start != Date_t()) { + builder->appendDate("start", start); + if (end != Date_t()) { + builder->appendDate("end", end); + auto elapsed = end - start; + long long elapsedMillis = duration_cast<Milliseconds>(elapsed).count(); + builder->appendNumber("elapsedMillis", elapsedMillis); + } + } + builder->appendNumber("receivedBatches", static_cast<long long>(receivedBatches)); + builder->appendNumber("writtenBatches", static_cast<long long>(writtenBatches)); +} + +} // namespace mongo::repl diff --git a/src/mongo/db/repl/tenant_file_cloner.h b/src/mongo/db/repl/tenant_file_cloner.h new file mode 100644 index 00000000000..def09528953 --- /dev/null +++ b/src/mongo/db/repl/tenant_file_cloner.h @@ -0,0 +1,219 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <boost/filesystem.hpp> +#include <memory> +#include <vector> + +#include "mongo/db/repl/base_cloner.h" +#include "mongo/db/repl/task_runner.h" +#include "mongo/db/repl/tenant_base_cloner.h" +#include "mongo/db/repl/tenant_migration_shared_data.h" +#include "mongo/util/progress_meter.h" + +namespace mongo::repl { + +class TenantFileCloner final : public TenantBaseCloner { +public: + struct Stats { + std::string filePath; + size_t fileSize; + Date_t start; + Date_t end; + size_t receivedBatches{0}; + size_t writtenBatches{0}; + size_t bytesCopied{0}; + + std::string toString() const; + BSONObj toBSON() const; + void append(BSONObjBuilder* builder) const; + }; + + /** + * Type of function to schedule file system tasks with the executor. + * + * Used for testing only. + */ + using ScheduleFsWorkFn = unique_function<StatusWith<executor::TaskExecutor::CallbackHandle>( + executor::TaskExecutor::CallbackFn)>; + + /** + * Constructor for TenantFileCloner. + * + * remoteFileName: Path of file to copy on remote system. + * remoteFileSize: Size of remote file in bytes, used for progress messages and stats only. + * relativePath: Path of file relative to dbpath on the remote system, as a + * boost::filesystem::path generic path. + */ + TenantFileCloner(const UUID& backupId, + const UUID& migrationId, + const std::string& remoteFileName, + size_t remoteFileSize, + const std::string& relativePath, + TenantMigrationSharedData* sharedData, + const HostAndPort& source, + DBClientConnection* client, + StorageInterface* storageInterface, + ThreadPool* dbPool); + + virtual ~TenantFileCloner() = default; + + /** + * Waits for any file system work to finish or fail. + */ + void waitForFilesystemWorkToComplete(); + + Stats getStats() const; + + std::string toString() const; + +protected: + ClonerStages getStages() final; + + bool isMyFailPoint(const BSONObj& data) const final; + +private: + friend class TenantFileClonerTest; + + class TenantFileClonerQueryStage : public ClonerStage<TenantFileCloner> { + public: + TenantFileClonerQueryStage(std::string name, + TenantFileCloner* cloner, + ClonerRunFn stageFunc) + : ClonerStage<TenantFileCloner>(name, cloner, stageFunc) {} + + bool checkSyncSourceValidityOnRetry() override { + // Sync source validity is assured by the backup ID not existing if the sync source + // is restarted or otherwise becomes invalid. + return false; + } + + bool isTransientError(const Status& status) override { + if (isCursorError(status)) { + return true; + } + return ErrorCodes::isRetriableError(status); + } + + static bool isCursorError(const Status& status) { + // Our cursor was killed on the sync source. + if ((status == ErrorCodes::CursorNotFound) || (status == ErrorCodes::OperationFailed) || + (status == ErrorCodes::QueryPlanKilled)) { + return true; + } + return false; + } + }; + + std::string describeForFuzzer(BaseClonerStage* stage) const final { + // We do not have a fuzzer for tenant backup file cloner. + MONGO_UNREACHABLE; + } + + /** + * The preStage sets the begin time in _stats and makes sure the destination file + * can be created. + */ + void preStage() final; + + /** + * The postStage sets the end time in _stats. + */ + void postStage() final; + + /** + * Stage function that executes a query to retrieve the file data. For each + * batch returned by the upstream node, handleNextBatch will be called with the data. This + * stage will finish when the entire query is finished or failed. + */ + AfterStageBehavior queryStage(); + + /** + * Put all results from a query batch into a buffer, and schedule it to be written to disk. + */ + void handleNextBatch(DBClientCursorBatchIterator& iter); + + /** + * Called whenever there is a new batch of documents ready from the DBClientConnection. + * + * Each document returned will be inserted via the storage interfaceRequest storage + * interface. + */ + void writeDataToFilesystemCallback(const executor::TaskExecutor::CallbackArgs& cbd); + + /** + * Sends an (aggregation) query command to the source. That query command with be parameterized + * based on copy progress. + */ + void runQuery(); + + /** + * Convenience call to get the file offset under a lock. + */ + size_t getFileOffset(); + + // All member variables are labeled with one of the following codes indicating the + // synchronization rules for accessing them. + // + // (R) Read-only in concurrent operation; no synchronization required. + // (S) Self-synchronizing; access according to class's own rules. + // (M) Reads and writes guarded by _mutex (defined in base class). + // (X) Access only allowed from the main flow of control called from run() or constructor. + const UUID _backupId; // (R) + const UUID _migrationId; // (R) + const std::string _remoteFileName; // (R) + size_t _remoteFileSize; // (R) + const std::string _relativePathString; // (R) + boost::filesystem::path _localFilePath; // (X) + + TenantFileClonerQueryStage _queryStage; // (R) + + std::ofstream _localFile; // (M) + // File offset we will request from the remote side in the next query. + off_t _fileOffset = 0; // (M) + bool _sawEof = false; // (X) + + // Data read from source to insert. + std::vector<BSONObj> _dataToWrite; // (M) + // Putting _fsWorkTaskRunner last ensures anything the database work threads depend on + // like _dataToWrite, is destroyed after those threads exit. + TaskRunner _fsWorkTaskRunner; // (R) + // Function for scheduling filesystem work using the executor. + ScheduleFsWorkFn _scheduleFsWorkFn; // (R) + + ProgressMeter _progressMeter; // (X) progress meter for this instance. + Stats _stats; // (M) + + static constexpr int kProgressMeterSecondsBetween = 60; + static constexpr int kProgressMeterCheckInterval = 128; +}; + +} // namespace mongo::repl diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp index 3de36b59ce3..589658b3308 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp @@ -34,6 +34,7 @@ #include "mongo/db/repl/tenant_migration_access_blocker_util.h" #include "mongo/db/repl/tenant_migration_recipient_access_blocker.h" #include "mongo/db/repl/tenant_migration_recipient_op_observer.h" +#include "mongo/db/repl/tenant_migration_shard_merge_util.h" #include "mongo/db/repl/tenant_migration_state_machine_gen.h" #include "mongo/db/repl/tenant_migration_util.h" #include "mongo/logv2/log.h" @@ -84,9 +85,72 @@ void onSetRejectReadsBeforeTimestamp(OperationContext* opCtx, mtab->startRejectingReadsBefore(recipientStateDoc.getRejectReadsBeforeTimestamp().get()); } - } // namespace +void TenantMigrationRecipientOpObserver::onCreateCollection(OperationContext* opCtx, + const CollectionPtr& coll, + const NamespaceString& collectionName, + const CollectionOptions& options, + const BSONObj& idIndex, + const OplogSlot& createOpTime) { + if (!shard_merge_utils::isDonatedFilesCollection(collectionName)) + return; + + auto collString = collectionName.coll().toString(); + auto migrationUUID = + UUID(uassertStatusOK(UUID::parse(collString.substr(collString.find('.') + 1)))); + auto fileClonerTempDirPath = shard_merge_utils::fileClonerTempDir(migrationUUID); + + // This is possible when a secondary restarts or rollback and the donated files collection + // is created as part of oplog replay. + if (boost::filesystem::exists(fileClonerTempDirPath)) { + LOGV2_DEBUG(6113316, + 1, + "File cloner temp directory already exists", + "directory"_attr = fileClonerTempDirPath.generic_string()); + + // Ignoring the errors because if this step fails, then the following step + // create_directory() will fail and that will throw an exception. + boost::system::error_code ec; + boost::filesystem::remove_all(fileClonerTempDirPath, ec); + } + + try { + boost::filesystem::create_directory(fileClonerTempDirPath); + } catch (std::exception& e) { + LOGV2_ERROR(6113317, + "Error creating file cloner temp directory", + "directory"_attr = fileClonerTempDirPath.generic_string(), + "error"_attr = e.what()); + throw; + } +} + +void TenantMigrationRecipientOpObserver::onInserts( + OperationContext* opCtx, + const NamespaceString& nss, + const UUID& uuid, + std::vector<InsertStatement>::const_iterator first, + std::vector<InsertStatement>::const_iterator last, + bool fromMigrate) { + + if (!shard_merge_utils::isDonatedFilesCollection(nss)) + return; + + try { + uassertStatusOK(shard_merge_utils::cloneFiles(opCtx, first, last)); + } catch (const DBException& ex) { + invariant(first != last); + auto migrationId = UUID( + uassertStatusOK(UUID::parse(first->doc[shard_merge_utils::kMigrationIdFieldName]))); + LOGV2_ERROR(6113318, + "Error cloning files", + "migrationUUID"_attr = migrationId, + "error"_attr = ex.toStatus()); + // TODO SERVER-63120: On error, vote shard merge abort to recipient primary. + } +} + void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { if (args.nss == NamespaceString::kTenantMigrationRecipientsNamespace && diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.h b/src/mongo/db/repl/tenant_migration_recipient_op_observer.h index fe7178819d6..76567db5b95 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.h +++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.h @@ -83,7 +83,7 @@ public: const UUID& uuid, std::vector<InsertStatement>::const_iterator first, std::vector<InsertStatement>::const_iterator last, - bool fromMigrate) final {} + bool fromMigrate) final; void onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) final; @@ -113,7 +113,7 @@ public: const NamespaceString& collectionName, const CollectionOptions& options, const BSONObj& idIndex, - const OplogSlot& createOpTime) final {} + const OplogSlot& createOpTime) final; void onCollMod(OperationContext* opCtx, const NamespaceString& nss, diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 7cb86ffe27a..7868ecac516 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -90,7 +90,6 @@ using namespace fmt; const std::string kTTLIndexName = "TenantMigrationRecipientTTLIndex"; const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max()); constexpr StringData kOplogBufferPrefix = "repl.migration.oplog_"_sd; -constexpr StringData kDonatedFilesPrefix = "donatedFiles."_sd; constexpr int kBackupCursorFileFetcherRetryAttempts = 10; NamespaceString getOplogBufferNs(const UUID& migrationUUID) { @@ -98,11 +97,6 @@ NamespaceString getOplogBufferNs(const UUID& migrationUUID) { kOplogBufferPrefix + migrationUUID.toString()); } -NamespaceString getDonatedFilesNs(const UUID& migrationUUID) { - return NamespaceString(NamespaceString::kConfigDb, - kDonatedFilesPrefix + migrationUUID.toString()); -} - boost::intrusive_ptr<ExpressionContext> makeExpressionContext(OperationContext* opCtx) { StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; @@ -141,11 +135,6 @@ bool isRetriableOplogFetcherError(Status oplogFetcherStatus) { oplogFetcherStatus == ErrorCodes::ShutdownInProgress; } -boost::filesystem::path fileClonerTempDir(UUID migrationId) { - return boost::filesystem::path(storageGlobalParams.dbpath) / - ("migrationTmpFiles.{}"_format(migrationId.toString())); -} - } // namespace // A convenient place to set test-specific parameters. @@ -981,10 +970,14 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_getDonorFilenam }(); auto fetchStatus = std::make_shared<boost::optional<Status>>(); - auto fetcherCallback = [this, self = shared_from_this(), fetchStatus, token]( - const Fetcher::QueryResponseStatus& dataStatus, - Fetcher::NextAction* nextAction, - BSONObjBuilder* getMoreBob) { + auto uniqueMetadataInfo = std::make_unique<boost::optional<shard_merge_utils::MetadataInfo>>(); + auto fetcherCallback = [this, + self = shared_from_this(), + fetchStatus, + metadataInfoPtr = uniqueMetadataInfo.get(), + token](const Fetcher::QueryResponseStatus& dataStatus, + Fetcher::NextAction* nextAction, + BSONObjBuilder* getMoreBob) { if (!dataStatus.isOK()) { *fetchStatus = dataStatus.getStatus(); LOGV2_ERROR(6113003, "backup cursor failed", "error"_attr = dataStatus.getStatus()); @@ -1012,7 +1005,13 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_getDonorFilenam const auto& metadata = doc["metadata"].Obj(); auto startApplyingDonorOpTime = OpTime(metadata["checkpointTimestamp"].timestamp(), OpTime::kUninitializedTerm); - // TODO (SERVER-61133) Uncomment the following lines when we skip + + + invariant(metadataInfoPtr && !*metadataInfoPtr); + (*metadataInfoPtr) = shard_merge_utils::MetadataInfo::constructMetadataInfo( + getMigrationUUID(), _client->getServerAddress(), metadata); + + // TODO (SERVER-61145) Uncomment the following lines when we skip // _getStartopTimesFromDonor entirely // _stateDoc.setStartApplyingDonorOpTime(startApplyingDonorOpTime); // _stateDoc.setStartFetchingDonorOpTime(startApplyingDonorOpTime); @@ -1029,36 +1028,28 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_getDonorFilenam "filename"_attr = doc["filename"].String(), "backupCursorId"_attr = data.cursorId); - auto donatedFilesNs = getDonatedFilesNs(getMigrationUUID()); - auto status = writeConflictRetry( - opCtx, - "insertBackupCursorEntry", - donatedFilesNs.ns(), - [opCtx, donatedFilesNs, doc]() -> Status { - // Disabling internal document validation because the fetcher batch size - // can exceed the max data size limit BSONObjMaxUserSize with the - // additional fields we add to documents. - DisableDocumentValidation documentValidationDisabler( - opCtx, DocumentValidationSettings::kDisableInternalValidation); - - auto obj = std::vector<mongo::BSONObj>{ - doc.addField(BSON("_id" << OID::gen()).firstElement()).getOwned()}; - write_ops::InsertCommandRequest insertOp(donatedFilesNs); - insertOp.setDocuments(std::move(obj)); - insertOp.setWriteCommandRequestBase([] { - write_ops::WriteCommandRequestBase wcb; - wcb.setOrdered(true); - return wcb; - }()); - - auto writeResult = write_ops_exec::performInserts(opCtx, insertOp); - invariant(!writeResult.results.empty()); - // Writes are ordered, check only the last writeOp result. - uassertStatusOK(writeResult.results.back()); - - return Status::OK(); - }); - uassertStatusOK(status); + invariant(metadataInfoPtr && *metadataInfoPtr); + auto docs = std::vector<mongo::BSONObj>{(*metadataInfoPtr)->toBSON(doc).getOwned()}; + + // Disabling internal document validation because the fetcher batch size + // can exceed the max data size limit BSONObjMaxUserSize with the + // additional fields we add to documents. + DisableDocumentValidation documentValidationDisabler( + opCtx, DocumentValidationSettings::kDisableInternalValidation); + + write_ops::InsertCommandRequest insertOp( + shard_merge_utils::getDonatedFilesNs(getMigrationUUID())); + insertOp.setDocuments(std::move(docs)); + insertOp.setWriteCommandRequestBase([] { + write_ops::WriteCommandRequestBase wcb; + wcb.setOrdered(true); + return wcb; + }()); + + auto writeResult = write_ops_exec::performInserts(opCtx, insertOp); + invariant(!writeResult.results.empty()); + // Writes are ordered, check only the last writeOp result. + uassertStatusOK(writeResult.results.back()); } } @@ -1092,7 +1083,7 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_getDonorFilenam return _donorFilenameBackupCursorFileFetcher->onCompletion() .thenRunOn(**_scopedExecutor) - .then([fetchStatus] { + .then([fetchStatus, uniqueMetadataInfo = std::move(uniqueMetadataInfo)] { if (!*fetchStatus) { // the callback was never invoked uasserted(6113007, "Internal error running cursor callback in command"); @@ -1112,7 +1103,7 @@ void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLo // We only expect to already have start optimes populated if we are not // resuming a migration and this is a multitenant migration. - // TODO (SERVER-61133) Eventually we'll skip _getStartopTimesFromDonor entirely + // TODO (SERVER-61145) Eventually we'll skip _getStartopTimesFromDonor entirely // for shard merge, but currently _getDonorFilenames will populate optimes for // the shard merge case. We can just overwrite here since we aren't doing anything // with the backup cursor results yet. @@ -1902,7 +1893,6 @@ TenantMigrationRecipientService::Instance::_advanceStableTimestampToStartApplyin // We do not have a mechanism to wait on the stableTimestamp reaching a specific ts, so // we wait on the majority commit point instead. - // TODO SERVER-61731 Retry importing donor collections on failure return WaitForMajorityService::get(opCtx->getServiceContext()) .waitUntilMajority(noOpTs, CancellationToken::uncancelable()); }) @@ -1947,25 +1937,42 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_importCopiedFiles() return SemiFuture<void>::makeReady(); } - stdx::lock_guard lk(_mutex); - return ExecutorFuture(**_scopedExecutor) - .then([this, self = shared_from_this(), stateDoc = _stateDoc] { - auto tempWTDirectory = fileClonerTempDir(stateDoc.getId()); - if (!boost::filesystem::exists(tempWTDirectory)) { - // TODO (SERVER-61133): Abort the merge if no files. - LOGV2_WARNING(6113701, - "No temp directory of donor files to import", - "tempDirectory"_attr = tempWTDirectory.string()); - return SemiFuture<void>::makeReady(); - } + .then([this, self = shared_from_this()] { + auto tempWTDirectory = shard_merge_utils::fileClonerTempDir(getMigrationUUID()); + + uassert(6113315, + str::stream() << "Missing file cloner's temporary dbpath directory: " + << tempWTDirectory.string(), + boost::filesystem::exists(tempWTDirectory)); + + // TODO SERVER-63204: Reevaluate if this is the correct place to remove the temporary + // WT dbpath. + ON_BLOCK_EXIT([&tempWTDirectory, &migrationId = getMigrationUUID()] { + LOGV2_DEBUG(6113324, + 1, + "Done importing files, removing the temporary WT dbpath", + "migrationId"_attr = migrationId, + "tempDbPath"_attr = tempWTDirectory.string()); + boost::system::error_code ec; + boost::filesystem::remove_all(tempWTDirectory, ec); + }); auto opCtx = cc().makeOperationContext(); // TODO (SERVER-62054): do this after file-copy, on primary and secondaries. auto metadatas = wiredTigerRollbackToStableAndGetMetadata(opCtx.get(), tempWTDirectory.string()); - wiredTigerImportFromBackupCursor(opCtx.get(), metadatas, tempWTDirectory.string()); + // TODO SERVER-63122: Remove the try-catch block once logical cloning is removed for + // shard merge protocol. + try { + shard_merge_utils::wiredTigerImportFromBackupCursor( + opCtx.get(), metadatas, tempWTDirectory.string()); + } catch (const ExceptionFor<ErrorCodes::NamespaceExists>& ex) { + LOGV2_WARNING( + 6113314, "Temporarily ignoring the error", "error"_attr = ex.toStatus()); + } + return SemiFuture<void>::makeReady(); }) .semi(); @@ -2268,6 +2275,29 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_updateStateDocForMa .semi(); } +SemiFuture<void> TenantMigrationRecipientService::Instance::_updateStateDocForAllVotingNodes( + WithLock lk, const CancellationToken& abortToken) const { + return ExecutorFuture(**_scopedExecutor) + .then([this, self = shared_from_this(), stateDoc = _stateDoc, abortToken] { + auto opCtx = cc().makeOperationContext(); + uassertStatusOK( + tenantMigrationRecipientEntryHelpers::updateStateDoc(opCtx.get(), stateDoc)); + + auto writeOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + + auto votingMembersWriteConcern = + WriteConcernOptions(repl::ReplSetConfig::kConfigAllWriteConcernName, + WriteConcernOptions::SyncMode::NONE, + WriteConcernOptions::kNoTimeout); + + auto writeConcernFuture = + repl::ReplicationCoordinator::get(opCtx->getServiceContext()) + ->awaitReplicationAsyncNoWTimeout(writeOpTime, votingMembersWriteConcern); + return future_util::withCancellation(std::move(writeConcernFuture), abortToken); + }) + .semi(); +} + void TenantMigrationRecipientService::Instance::_fetchAndStoreDonorClusterTimeKeyDocs( const CancellationToken& token) { std::vector<ExternalKeysCollectionDocument> keyDocs; @@ -2576,24 +2606,30 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( "backup cursor alive."); stdx::lock_guard lk(_mutex); _backupCursorKeepAliveCancellation = CancellationSource(token); - _backupCursorKeepAliveFuture = - keepBackupCursorAlive(_backupCursorKeepAliveCancellation, - **_scopedExecutor, - _client->getServerHostAndPort(), - _donorFilenameBackupCursorId, - _donorFilenameBackupCursorNamespaceString); + _backupCursorKeepAliveFuture = shard_merge_utils::keepBackupCursorAlive( + _backupCursorKeepAliveCancellation, + **_scopedExecutor, + _client->getServerHostAndPort(), + _donorFilenameBackupCursorId, + _donorFilenameBackupCursorNamespaceString); }) - .then([this, self = shared_from_this()] { + .then([this, self = shared_from_this(), token] { if (_stateDoc.getProtocol() != MigrationProtocolEnum::kShardMerge) { return SemiFuture<void>::makeReady(); } stdx::lock_guard lk(_mutex); _stateDoc.setState(TenantMigrationRecipientStateEnum::kLearnedFilenames); - return _updateStateDocForMajority(lk); + // Since we are doing synchronous file cloning via op-observer, no need to + // wait explicitly for shard merge commit votes from all nodes to + // move to next step in this chain. Just waiting for the state doc with state + // "TenantMigrationRecipientStateEnum::kLearnedFilenames" to get replicated + // to all nodes should be sufficient to safely move to next step in this + // chain. + return _updateStateDocForAllVotingNodes(lk, token); }) .then([this, self = shared_from_this()] { - // TODO (SERVER-61133) temporarily stop fetcher/backup cursor here for + // TODO (SERVER-62734) temporarily stop fetcher/backup cursor here for // now. We'll need to move this call elsewhere eventually to shut down // the backup cursor and/or keepalive. // Some tests fail unless we do this here, punting on dealing with those diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h index 5c7f26e8d4b..8284514cf71 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.h +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -509,6 +509,13 @@ public: */ SemiFuture<void> _updateStateDocForMajority(WithLock lk) const; + /** + * Updates the state doc in the database and waits for that to be propagated to all nodes in + * the replica set. + */ + SemiFuture<void> _updateStateDocForAllVotingNodes( + WithLock lk, const CancellationToken& abortToken) const; + /* * Returns the majority OpTime on the donor node that 'client' is connected to. */ diff --git a/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp b/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp index 030edd19a15..5aa5c89d828 100644 --- a/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp +++ b/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp @@ -44,6 +44,9 @@ #include "mongo/db/cursor_server_params_gen.h" #include "mongo/db/db_raii.h" #include "mongo/db/multitenancy.h" +#include "mongo/db/repl/oplog_applier.h" +#include "mongo/db/repl/tenant_file_cloner.h" +#include "mongo/db/repl/tenant_migration_shared_data.h" #include "mongo/db/storage/durable_catalog.h" #include "mongo/db/storage/wiredtiger/wiredtiger_import.h" #include "mongo/db/views/view_catalog.h" @@ -54,7 +57,7 @@ // cursor timeout. constexpr int kBackupCursorKeepAliveIntervalMillis = mongo::kCursorTimeoutMillisDefault / 2; -namespace mongo::repl { +namespace mongo::repl::shard_merge_utils { namespace { using namespace fmt::literals; @@ -88,8 +91,111 @@ std::string constructDestinationPath(const std::string& ident) { filePath /= (ident + kTableExtension); return filePath.string(); } + +/** + * Computes a boost::filesystem::path generic-style relative path (always uses slashes) + * from a base path and a relative path. + */ +std::string _getPathRelativeTo(const std::string& path, const std::string& basePath) { + if (basePath.empty() || path.find(basePath) != 0) { + uasserted(6113319, + str::stream() << "The file " << path << " is not a subdirectory of " << basePath); + } + + auto result = path.substr(basePath.size()); + // Skip separators at the beginning of the relative part. + if (!result.empty() && (result[0] == '/' || result[0] == '\\')) { + result.erase(result.begin()); + } + + std::replace(result.begin(), result.end(), '\\', '/'); + return result; +} + +/** + * Makes a connection to the provided 'source'. + */ +Status connect(const HostAndPort& source, DBClientConnection* client) { + Status status = client->connect(source, "TenantFileCloner", boost::none); + if (!status.isOK()) + return status; + return replAuthenticate(client).withContext(str::stream() + << "Failed to authenticate to " << source); +} } // namespace +Status cloneFiles(OperationContext* opCtx, + std::vector<InsertStatement>::const_iterator first, + std::vector<InsertStatement>::const_iterator last) { + + std::unique_ptr<DBClientConnection> client; + std::unique_ptr<TenantMigrationSharedData> sharedData; + auto writerPool = + makeReplWriterPool(tenantApplierThreadCount, "TenantMigrationFileClonerWriter"_sd); + + ON_BLOCK_EXIT([&] { + client->shutdownAndDisallowReconnect(); + + writerPool->shutdown(); + writerPool->join(); + }); + + for (auto it = first; it != last; it++) { + const auto& metadataDoc = it->doc; + auto fileName = metadataDoc["filename"].str(); + auto migrationId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kMigrationIdFieldName]))); + + LOGV2_DEBUG(6113320, 1, "Attempting to clone file", "metadata"_attr = metadataDoc); + + auto backupId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kBackupIdFieldName]))); + auto remoteDbpath = metadataDoc["remoteDbpath"].str(); + size_t fileSize = metadataDoc["fileSize"].safeNumberLong(); + auto relativePath = _getPathRelativeTo(fileName, metadataDoc[kDonorDbPathFieldName].str()); + invariant(!relativePath.empty()); + + // Connect the client. + if (!client) { + auto donor = HostAndPort::parseThrowing(metadataDoc[kDonorFieldName].str()); + client = std::make_unique<DBClientConnection>(true /* autoReconnect */); + uassertStatusOK(connect(donor, client.get())); + } + + if (!sharedData) { + sharedData = std::make_unique<TenantMigrationSharedData>( + getGlobalServiceContext()->getFastClockSource(), migrationId); + } + + auto currentBackupFileCloner = std::make_unique<TenantFileCloner>( + backupId, + migrationId, + fileName, + fileSize, + relativePath, + sharedData.get(), + client->getServerHostAndPort(), + client.get(), + repl::StorageInterface::get(cc().getServiceContext()), + writerPool.get()); + + auto cloneStatus = currentBackupFileCloner->run(); + if (!cloneStatus.isOK()) { + LOGV2_WARNING(6113321, + "Failed to clone file ", + "migrationUUID"_attr = migrationId, + "fileName"_attr = fileName, + "error"_attr = cloneStatus); + return cloneStatus; + } else { + LOGV2_DEBUG(6113322, + 1, + "Successfully cloned file", + "migrationUUID"_attr = migrationId, + "fileName"_attr = fileName); + } + } + return Status::OK(); +} + void wiredTigerImportFromBackupCursor(OperationContext* opCtx, const std::vector<CollectionImportMetadata>& metadatas, const std::string& importPath) { @@ -194,4 +300,4 @@ SemiFuture<void> keepBackupCursorAlive(CancellationSource cancellationSource, .onCompletion([](auto&&) {}) .semi(); } -} // namespace mongo::repl +} // namespace mongo::repl::shard_merge_utils diff --git a/src/mongo/db/repl/tenant_migration_shard_merge_util.h b/src/mongo/db/repl/tenant_migration_shard_merge_util.h index 9c220d01088..3ba8318b8bf 100644 --- a/src/mongo/db/repl/tenant_migration_shard_merge_util.h +++ b/src/mongo/db/repl/tenant_migration_shard_merge_util.h @@ -30,15 +30,85 @@ #include <string> #include <vector> + +#include <boost/filesystem/operations.hpp> +#include <fmt/format.h> + #include "mongo/client/dbclient_connection.h" #include "mongo/db/cursor_id.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/oplog.h" #include "mongo/db/storage/wiredtiger/wiredtiger_import.h" #include "mongo/executor/scoped_task_executor.h" #include "mongo/util/cancellation.h" -namespace mongo::repl { +namespace mongo::repl::shard_merge_utils { + +inline constexpr StringData kDonatedFilesPrefix = "donatedFiles."_sd; +inline constexpr StringData kMigrationTmpDirPrefix = "migrationTmpFiles"_sd; +inline constexpr StringData kMigrationIdFieldName = "migrationId"_sd; +inline constexpr StringData kBackupIdFieldName = "backupId"_sd; +inline constexpr StringData kDonorFieldName = "donor"_sd; +inline constexpr StringData kDonorDbPathFieldName = "dbpath"_sd; + +inline bool isDonatedFilesCollection(const NamespaceString& ns) { + return ns.isConfigDB() && ns.coll().startsWith(kDonatedFilesPrefix); +} + +inline NamespaceString getDonatedFilesNs(const UUID& migrationUUID) { + return NamespaceString(NamespaceString::kConfigDb, + kDonatedFilesPrefix + migrationUUID.toString()); +} + +inline boost::filesystem::path fileClonerTempDir(const UUID& migrationId) { + return boost::filesystem::path(storageGlobalParams.dbpath) / + fmt::format("{}.{}", kMigrationTmpDirPrefix.toString(), migrationId.toString()); +} + +/** + * Represents the document structure of config.donatedFiles_<MigrationUUID> collection. + */ +struct MetadataInfo { + explicit MetadataInfo(const UUID& backupId, + const UUID& migrationId, + const std::string& donor, + const std::string& donorDbPath) + : backupId(backupId), migrationId(migrationId), donor(donor), donorDbPath(donorDbPath) {} + UUID backupId; + UUID migrationId; + std::string donor; + std::string donorDbPath; + + static MetadataInfo constructMetadataInfo(const UUID& migrationId, + const std::string& donor, + const BSONObj& obj) { + auto backupId = UUID(uassertStatusOK(UUID::parse(obj[kBackupIdFieldName]))); + auto donorDbPath = obj[kDonorDbPathFieldName].String(); + return MetadataInfo{backupId, migrationId, donor, donorDbPath}; + } + + BSONObj toBSON(const BSONObj& extraFields) const { + BSONObjBuilder bob; + + migrationId.appendToBuilder(&bob, kMigrationIdFieldName); + backupId.appendToBuilder(&bob, kBackupIdFieldName); + bob.append(kDonorFieldName, donor); + bob.append(kDonorDbPathFieldName, donorDbPath); + bob.append("_id", OID::gen()); + bob.appendElements(extraFields); + + return bob.obj(); + } +}; + +/** + * Uses the TenantFileCloner to copy the files from the donor. + */ +Status cloneFiles(OperationContext* opCtx, + std::vector<InsertStatement>::const_iterator first, + std::vector<InsertStatement>::const_iterator last); + /** * After calling wiredTigerRollbackToStableAndGetMetadata, use this function to import files. */ @@ -51,4 +121,4 @@ SemiFuture<void> keepBackupCursorAlive(CancellationSource cancellationSource, HostAndPort hostAndPort, CursorId cursorId, NamespaceString namespaceString); -} // namespace mongo::repl +} // namespace mongo::repl::shard_merge_utils |