diff options
14 files changed, 352 insertions, 249 deletions
diff --git a/jstests/replsets/tenant_migration_donor_interrupt_on_stepdown_and_shutdown.js b/jstests/replsets/tenant_migration_donor_interrupt_on_stepdown_and_shutdown.js index f488c9176c9..0a624a8688f 100644 --- a/jstests/replsets/tenant_migration_donor_interrupt_on_stepdown_and_shutdown.js +++ b/jstests/replsets/tenant_migration_donor_interrupt_on_stepdown_and_shutdown.js @@ -34,12 +34,21 @@ const kMigrationFpNames = [ * donor using the 'interruptFunc', and verifies the command response using the * 'verifyCmdResponseFunc'. */ -function testDonorStartMigrationInterrupt(interruptFunc, verifyCmdResponseFunc) { +function testDonorStartMigrationInterrupt( + interruptFunc, verifyCmdResponseFunc, {skipForShardMerge = false} = {}) { const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); const donorRst = tenantMigrationTest.getDonorRst(); const donorPrimary = tenantMigrationTest.getDonorPrimary(); + if (skipForShardMerge && TenantMigrationUtil.isShardMergeEnabled(donorPrimary.getDB("admin"))) { + // TODO SERVER-63390: Remove this conditional and ensure test(s) run + // successfully for shard merge. + jsTestLog("Skipping Shard Merge-incompatible test"); + tenantMigrationTest.stop(); + return; + } + const migrationId = UUID(); const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), @@ -109,12 +118,21 @@ function testDonorForgetMigrationInterrupt(interruptFunc, verifyCmdResponseFunc) * donorAbortMigration, and interrupts the donor using the 'interruptFunc', and verifies the command * response using the 'verifyCmdResponseFunc'. */ -function testDonorAbortMigrationInterrupt(interruptFunc, verifyCmdResponseFunc, fpName) { +function testDonorAbortMigrationInterrupt( + interruptFunc, verifyCmdResponseFunc, fpName, {skipForShardMerge = false} = {}) { const tenantMigrationTest = new TenantMigrationTest({name: jsTestName()}); const donorRst = tenantMigrationTest.getDonorRst(); const donorPrimary = tenantMigrationTest.getDonorPrimary(); + if (skipForShardMerge && TenantMigrationUtil.isShardMergeEnabled(donorPrimary.getDB("admin"))) { + // TODO SERVER-63390: Remove this conditional and ensure test(s) run + // successfully for shard merge. + jsTestLog("Skipping Shard Merge-incompatible test"); + tenantMigrationTest.stop(); + return; + } + const migrationId = UUID(); const migrationOpts = { migrationIdString: extractUUIDFromObject(migrationId), @@ -186,7 +204,7 @@ function assertCmdSucceededOrInterruptedDueToShutDown(cmdThread) { jsTest.log("Test that the donorStartMigration command is interrupted successfully on shutdown"); testDonorStartMigrationInterrupt((donorRst) => { donorRst.stopSet(); - }, assertCmdSucceededOrInterruptedDueToShutDown); + }, assertCmdSucceededOrInterruptedDueToShutDown, {skipForShardMerge: true}); })(); (() => { @@ -231,7 +249,7 @@ function assertCmdSucceededOrInterruptedDueToShutDown(cmdThread) { testDonorAbortMigrationInterrupt((donorRst) => { donorRst.stopSet(); - }, assertCmdSucceededOrInterruptedDueToShutDown, fpName); + }, assertCmdSucceededOrInterruptedDueToShutDown, fpName, {skipForShardMerge: true}); }); })(); })(); diff --git a/jstests/replsets/tenant_migration_file_import.js b/jstests/replsets/tenant_migration_file_import.js index cc7d064ebed..11d10bdd870 100644 --- a/jstests/replsets/tenant_migration_file_import.js +++ b/jstests/replsets/tenant_migration_file_import.js @@ -18,9 +18,6 @@ (function() { "use strict"; -// TODO (SERVER-61144): Recipient secondaries don't import donor files yet, so dbhash will mismatch. -TestData.skipCheckDBHashes = true; - load("jstests/libs/uuid_util.js"); load("jstests/replsets/libs/tenant_migration_test.js"); @@ -70,18 +67,18 @@ const migrationOpts = { }; TenantMigrationTest.assertCommitted(tenantMigrationTest.runMigration(migrationOpts)); -// TODO SERVER-61144: Check on all recipient nodes that the collection documents got imported -// successfully. -for (let collectionName of ["myCollection", "myCappedCollection"]) { - jsTestLog(`Checking ${collectionName}`); - // Use "countDocuments" to check actual docs, "count" to check sizeStorer data. - assert.eq(donorPrimary.getDB("myDatabase")[collectionName].countDocuments({}), - recipientPrimary.getDB("myDatabase")[collectionName].countDocuments({}), - "countDocuments"); - assert.eq(donorPrimary.getDB("myDatabase")[collectionName].count(), - recipientPrimary.getDB("myDatabase")[collectionName].count(), - "count"); -} +tenantMigrationTest.getRecipientRst().nodes.forEach(node => { + for (let collectionName of ["myCollection", "myCappedCollection"]) { + jsTestLog(`Checking ${collectionName} on donor vs. recipient on port '${node.port}'`); + // Use "countDocuments" to check actual docs, "count" to check sizeStorer data. + assert.eq(donorPrimary.getDB("myDatabase")[collectionName].countDocuments({}), + node.getDB("myDatabase")[collectionName].countDocuments({}), + "countDocuments"); + assert.eq(donorPrimary.getDB("myDatabase")[collectionName].count(), + node.getDB("myDatabase")[collectionName].count(), + "count"); + } +}); tenantMigrationTest.stop(); })(); diff --git a/jstests/replsets/tenant_migration_recipient_current_op.js b/jstests/replsets/tenant_migration_recipient_current_op.js index 5b277c0f500..4c6d1f80c61 100644 --- a/jstests/replsets/tenant_migration_recipient_current_op.js +++ b/jstests/replsets/tenant_migration_recipient_current_op.js @@ -155,11 +155,7 @@ checkStandardFieldsOK(res); currOp = res.inprog[0]; assert.gt(new Date(), currOp.receiveStart, tojson(res)); -if (shardMergeIsEnabled) { - assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kCopiedFiles, res); -} else { - assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kStarted, res); -} +assert.eq(currOp.state, TenantMigrationTest.RecipientStateEnum.kStarted, res); assert.eq(currOp.migrationCompleted, false, res); assert.eq(currOp.dataSyncCompleted, false, res); diff --git a/jstests/replsets/tenant_migration_recipient_shard_merge_learn_files.js b/jstests/replsets/tenant_migration_recipient_shard_merge_learn_files.js index b006d0718a3..834c28a7309 100644 --- a/jstests/replsets/tenant_migration_recipient_shard_merge_learn_files.js +++ b/jstests/replsets/tenant_migration_recipient_shard_merge_learn_files.js @@ -47,7 +47,7 @@ tenantMigrationTest.insertDonorDB(tenantDB, collName); // Ensure our new collections appear in the backup cursor's checkpoint. assert.commandWorked(donorPrimary.adminCommand({fsync: 1})); -const failpoint = "fpAfterRetrievingStartOpTimesMigrationRecipientInstance"; +const failpoint = "fpAfterStartingOplogApplierMigrationRecipientInstance"; const waitInFailPoint = configureFailPoint(recipientPrimary, failpoint, {action: "hang"}); // In order to prevent the copying of "testTenantId" databases via logical cloning from donor to @@ -76,15 +76,16 @@ waitInFailPoint.off(); TenantMigrationTest.assertCommitted(tenantMigrationTest.waitForMigrationToComplete(migrationOpts)); -// TODO SERVER-61144: Check on all recipient nodes that the collection documents got imported -// successfully. -// Use "countDocuments" to check actual docs, "count" to check sizeStorer data. -assert.eq(donorPrimary.getDB(tenantDB)[collName].countDocuments({}), - recipientPrimary.getDB(tenantDB)[collName].countDocuments({}), - "countDocuments"); -assert.eq(donorPrimary.getDB(tenantDB)[collName].count(), - recipientPrimary.getDB(tenantDB)[collName].count(), - "count"); +const donorPrimaryCountDocumentsResult = donorPrimary.getDB(tenantDB)[collName].countDocuments({}); +const donorPrimaryCountResult = donorPrimary.getDB(tenantDB)[collName].count(); + +tenantMigrationTest.getRecipientRst().nodes.forEach(node => { + // Use "countDocuments" to check actual docs, "count" to check sizeStorer data. + assert.eq(donorPrimaryCountDocumentsResult, + node.getDB(tenantDB)[collName].countDocuments({}), + "countDocuments"); + assert.eq(donorPrimaryCountResult, node.getDB(tenantDB)[collName].count(), "count"); +}); tenantMigrationTest.stop(); })(); diff --git a/jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js b/jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js index 6746b2d7935..35c3d125ff6 100644 --- a/jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js +++ b/jstests/replsets/tenant_migration_shard_merge_import_write_conflict_retry.js @@ -20,9 +20,6 @@ (function() { "use strict"; -// TODO (SERVER-61144): Recipient secondaries don't import donor files yet, so dbhash will mismatch. -TestData.skipCheckDBHashes = true; - load("jstests/libs/uuid_util.js"); load("jstests/replsets/libs/tenant_migration_test.js"); load("jstests/libs/fail_point_util.js"); @@ -38,6 +35,13 @@ if (!TenantMigrationUtil.isShardMergeEnabled(recipientPrimary.getDB("admin"))) { return; } +if (TenantMigrationUtil.isShardMergeEnabled(recipientPrimary.getDB("admin"))) { + // TODO SERVER-63789: Re-enable this test for Shard Merge + tenantMigrationTest.stop(); + jsTestLog("Temporarily skipping Shard Merge test, dependent on SERVER-63789."); + return; +} + const kDataDir = `${recipientPrimary.dbpath}/migrationTmpFiles.${extractUUIDFromObject(migrationId)}`; assert.eq(runNonMongoProgram("mkdir", "-p", kDataDir), 0); @@ -68,6 +72,7 @@ configureFailPoint( recipientPrimary, "WTWriteConflictExceptionForImportCollection", {} /* data */, {times: 1}); configureFailPoint( recipientPrimary, "WTWriteConflictExceptionForImportIndex", {} /* data */, {times: 1}); +configureFailPoint(recipientPrimary, "skipDeleteTempDBPath"); jsTestLog("Run migration"); // The old multitenant migrations won't copy myDatabase since it doesn't start with testTenantId, @@ -79,18 +84,18 @@ const migrationOpts = { }; TenantMigrationTest.assertCommitted(tenantMigrationTest.runMigration(migrationOpts)); -// TODO SERVER-61144: Check on all recipient nodes that the collection documents got imported -// successfully. -for (let collectionName of ["myCollection", "myCappedCollection"]) { - jsTestLog(`Checking ${collectionName}`); - // Use "countDocuments" to check actual docs, "count" to check sizeStorer data. - assert.eq(donorPrimary.getDB("myDatabase")[collectionName].countDocuments({}), - recipientPrimary.getDB("myDatabase")[collectionName].countDocuments({}), - "countDocuments"); - assert.eq(donorPrimary.getDB("myDatabase")[collectionName].count(), - recipientPrimary.getDB("myDatabase")[collectionName].count(), - "count"); -} +tenantMigrationTest.getRecipientRst().nodes.forEach(node => { + for (let collectionName of ["myCollection", "myCappedCollection"]) { + jsTestLog(`Checking ${collectionName}`); + // Use "countDocuments" to check actual docs, "count" to check sizeStorer data. + assert.eq(donorPrimary.getDB("myDatabase")[collectionName].countDocuments({}), + recipientPrimary.getDB("myDatabase")[collectionName].countDocuments({}), + "countDocuments"); + assert.eq(donorPrimary.getDB("myDatabase")[collectionName].count(), + recipientPrimary.getDB("myDatabase")[collectionName].count(), + "count"); + } +}); tenantMigrationTest.stop(); })(); diff --git a/jstests/replsets/tenant_migration_vote_progress.js b/jstests/replsets/tenant_migration_vote_progress.js index 677cbcbda6e..f593d503018 100644 --- a/jstests/replsets/tenant_migration_vote_progress.js +++ b/jstests/replsets/tenant_migration_vote_progress.js @@ -75,15 +75,15 @@ fpAfterCollectionClonerDone.wait(); fpAfterCollectionClonerDone.off(); if (TenantMigrationUtil.isShardMergeEnabled(recipientPrimary.getDB("admin"))) { - jsTestLog("Test that voteCommitMigrationProgress succeeds with step 'copied files'"); - voteShouldSucceed(migrationId, ["copied files"]); + jsTestLog("Test that voteCommitMigrationProgress succeeds with step 'imported files'"); + voteShouldSucceed(migrationId, ["imported files"]); } else { jsTestLog("Test that voteCommitMigrationProgress fails with shard merge disabled"); - voteShouldFail(migrationId, ["copied files"]); + voteShouldFail(migrationId, ["imported files"]); } jsTestLog("Test that voteCommitMigrationProgress fails with wrong 'step'"); -voteShouldFail(migrationId, ["imported files"]); +voteShouldFail(migrationId, ["copied files"]); fpAfterDataConsistentMigrationRecipientInstance.wait(); fpAfterDataConsistentMigrationRecipientInstance.off(); diff --git a/src/mongo/db/repl/tenant_file_importer_service.cpp b/src/mongo/db/repl/tenant_file_importer_service.cpp index 8d4cde77fa8..6f479cd9133 100644 --- a/src/mongo/db/repl/tenant_file_importer_service.cpp +++ b/src/mongo/db/repl/tenant_file_importer_service.cpp @@ -75,13 +75,18 @@ void TenantFileImporterService::onStartup(OperationContext*) { void TenantFileImporterService::learnedFilename(const UUID& migrationId, const BSONObj& metadataDoc) { auto opCtx = cc().getOperationContext(); - stdx::lock_guard lk(_mutex); - if (migrationId != _migrationId) { - _reset(lk); - _migrationId = migrationId; - _scopedExecutor = std::make_shared<executor::ScopedTaskExecutor>( - _executor, - Status{ErrorCodes::CallbackCanceled, "TenantFileImporterService executor cancelled"}); + { + stdx::lock_guard lk(_mutex); + if (migrationId != _migrationId) { + _reset(lk); + _migrationId = migrationId; + _scopedExecutor = std::make_shared<executor::ScopedTaskExecutor>( + _executor, + Status{ErrorCodes::CallbackCanceled, + "TenantFileImporterService executor cancelled"}); + } + + _state.setState(ImporterState::State::kCopyingFiles); } try { @@ -112,15 +117,27 @@ void TenantFileImporterService::learnedAllFilenames(const UUID& migrationId) { "Called learnedAllFilenames with migrationId {}, but {} is active"_format( migrationId.toString(), _migrationId->toString())); } - if (_toldPrimaryAllFilesAreCopied) { + + if (!_state.is(ImporterState::State::kCopyingFiles)) { return; } - _toldPrimaryAllFilesAreCopied = true; + _state.setState(ImporterState::State::kCopiedFiles); } - // TODO (SERVER-62734): Keep count of files remaining to copy, wait before voting. - _voteCommitMigrationProgress(MigrationProgressStepEnum::kCopiedFiles); + auto opCtx = cc().getOperationContext(); + // TODO SERVER-63789: Revisit use of AllowLockAcquisitionOnTimestampedUnitOfWork and + // remove if possible. + // No other threads will try to acquire conflicting locks: we are acquiring + // database/collection locks for new tenants. + AllowLockAcquisitionOnTimestampedUnitOfWork allowLockAcquisition(opCtx->lockState()); + shard_merge_utils::importCopiedFiles(opCtx, migrationId); + + // TODO (SERVER-62734): Keep count of files remaining to import, wait before voting. + _voteCommitMigrationProgress(MigrationProgressStepEnum::kImportedFiles); + + stdx::lock_guard lk(_mutex); + _state.setState(ImporterState::State::kImportedFiles); } void TenantFileImporterService::reset() { @@ -177,6 +194,6 @@ void TenantFileImporterService::_voteCommitMigrationProgress(MigrationProgressSt void TenantFileImporterService::_reset(WithLock lk) { _scopedExecutor.reset(); // Shuts down and joins the executor. _migrationId.reset(); - _toldPrimaryAllFilesAreCopied = false; + _state.setState(ImporterState::State::kUninitialized); } } // namespace mongo::repl diff --git a/src/mongo/db/repl/tenant_file_importer_service.h b/src/mongo/db/repl/tenant_file_importer_service.h index d5b0db7d629..ec3b77a986b 100644 --- a/src/mongo/db/repl/tenant_file_importer_service.h +++ b/src/mongo/db/repl/tenant_file_importer_service.h @@ -89,6 +89,60 @@ private: std::shared_ptr<executor::ScopedTaskExecutor> _scopedExecutor; boost::optional<UUID> _migrationId; Mutex _mutex = MONGO_MAKE_LATCH("TenantFileImporterService::_mutex"); - bool _toldPrimaryAllFilesAreCopied; + + class ImporterState { + public: + enum class State { kUninitialized, kCopyingFiles, kCopiedFiles, kImportedFiles }; + + void setState(State nextState) { + tassert(6114403, + str::stream() << "current state: " << toString(_state) + << ", new state: " << toString(nextState), + isValidTransition(nextState)); + _state = nextState; + } + + bool is(State state) { + return _state == state; + } + + private: + StringData toString(State value) { + switch (value) { + case State::kUninitialized: + return "uninitialized"; + case State::kCopyingFiles: + return "copying files"; + case State::kCopiedFiles: + return "copied files"; + case State::kImportedFiles: + return "imported files"; + } + MONGO_UNREACHABLE; + return StringData(); + } + + bool isValidTransition(State newState) { + if (_state == newState) { + return true; + } + + switch (_state) { + case State::kUninitialized: + return newState == State::kCopyingFiles; + case State::kCopyingFiles: + return newState == State::kCopiedFiles || newState == State::kUninitialized; + case State::kCopiedFiles: + return newState == State::kImportedFiles || newState == State::kUninitialized; + case State::kImportedFiles: + return newState == State::kUninitialized; + } + MONGO_UNREACHABLE; + } + + State _state = State::kUninitialized; + }; + + ImporterState _state; }; } // namespace mongo::repl diff --git a/src/mongo/db/repl/tenant_migration_recipient_coordinator.h b/src/mongo/db/repl/tenant_migration_recipient_coordinator.h index 42ace3d336d..1fddf22c37e 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_coordinator.h +++ b/src/mongo/db/repl/tenant_migration_recipient_coordinator.h @@ -52,7 +52,7 @@ public: // have completed the step, or there was an error. SharedSemiFuture<void> beginAwaitingVotesForStep(UUID migrationId, MigrationProgressStepEnum step); - // TODO (SERVER-61144): use cancelStep, which is currently unused. + // TODO (SERVER-63390): use cancelStep, which is currently unused. void cancelStep(UUID migrationId, MigrationProgressStepEnum step); void receivedVoteForStep(UUID migrationId, MigrationProgressStepEnum step, diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp index 26148732b95..d98d1ca2bbc 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.cpp @@ -192,8 +192,6 @@ void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx, createAccessBlockerIfNeeded(opCtx, recipientStateDoc); break; case TenantMigrationRecipientStateEnum::kLearnedFilenames: - repl::TenantFileImporterService::get(opCtx->getServiceContext()) - ->learnedAllFilenames(recipientStateDoc.getId()); break; case TenantMigrationRecipientStateEnum::kCopiedFiles: break; @@ -206,6 +204,23 @@ void TenantMigrationRecipientOpObserver::onUpdate(OperationContext* opCtx, break; } }); + + // Perform TenantFileImporterService::learnedAllFilenames work outside of the above onCommit + // hook because of work done in a WriteUnitOfWork. + // TODO SERVER-63789: Revisit this when we make file import async and move + // within onCommit hook. + auto state = recipientStateDoc.getState(); + auto protocol = recipientStateDoc.getProtocol().value_or(kDefaultMigrationProtocol); + if (state == TenantMigrationRecipientStateEnum::kLearnedFilenames) { + tassert(6114400, + "Bad state '{}' for protocol '{}'"_format( + TenantMigrationRecipientState_serializer(state), + MigrationProtocol_serializer(protocol)), + protocol == MigrationProtocolEnum::kShardMerge); + + repl::TenantFileImporterService::get(opCtx->getServiceContext()) + ->learnedAllFilenames(recipientStateDoc.getId()); + } } } diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index d366084d60e..4b7611b567a 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -1017,10 +1017,7 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_getDonorFilenam (*metadataInfoPtr) = shard_merge_utils::MetadataInfo::constructMetadataInfo( getMigrationUUID(), _client->getServerAddress(), metadata); - // TODO (SERVER-61145) Uncomment the following lines when we skip - // _getStartopTimesFromDonor entirely - // _stateDoc.setStartApplyingDonorOpTime(startApplyingDonorOpTime); - // _stateDoc.setStartFetchingDonorOpTime(startApplyingDonorOpTime); + _stateDoc.setStartApplyingDonorOpTime(startApplyingDonorOpTime); LOGV2_INFO(6113001, "Opened backup cursor on donor", "migrationId"_attr = getMigrationUUID(), @@ -1100,7 +1097,7 @@ ExecutorFuture<void> TenantMigrationRecipientService::Instance::_getDonorFilenam } void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLock lk) { - // Get the last oplog entry at the read concern majority optime in the remote oplog. It + // Get the last oplog entry at the read concern majority optime in the remote oplog. It // does not matter which tenant it is for. if (_sharedData->isResuming()) { // We are resuming a migration. @@ -1159,6 +1156,9 @@ void TenantMigrationRecipientService::Instance::_getStartOpTimesFromDonor(WithLo "migrationId"_attr = getMigrationUUID(), "tenantId"_attr = _stateDoc.getTenantId(), "lastOplogEntry"_attr = lastOplogEntry2OpTime.toBSON()); + + // TODO (SERVER-61145) We'll want to skip this as well for shard merge, since this should + // be set in _getDonorFilenames. _stateDoc.setStartApplyingDonorOpTime(lastOplogEntry2OpTime); OpTime startFetchingDonorOpTime = lastOplogEntry1OpTime; @@ -1938,52 +1938,6 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::_onCloneSuccess() { .semi(); } -SemiFuture<void> TenantMigrationRecipientService::Instance::_importCopiedFiles() { - if (getProtocol() != MigrationProtocolEnum::kShardMerge) { - return SemiFuture<void>::makeReady(); - } - - return ExecutorFuture(**_scopedExecutor) - .then([this, self = shared_from_this()] { - auto tempWTDirectory = shard_merge_utils::fileClonerTempDir(getMigrationUUID()); - - uassert(6113315, - str::stream() << "Missing file cloner's temporary dbpath directory: " - << tempWTDirectory.string(), - boost::filesystem::exists(tempWTDirectory)); - - // TODO SERVER-63204: Reevaluate if this is the correct place to remove the temporary - // WT dbpath. - ON_BLOCK_EXIT([&tempWTDirectory, &migrationId = getMigrationUUID()] { - LOGV2_DEBUG(6113324, - 1, - "Done importing files, removing the temporary WT dbpath", - "migrationId"_attr = migrationId, - "tempDbPath"_attr = tempWTDirectory.string()); - boost::system::error_code ec; - boost::filesystem::remove_all(tempWTDirectory, ec); - }); - - auto opCtx = cc().makeOperationContext(); - // TODO (SERVER-62054): do this after file-copy, on primary and secondaries. - auto metadatas = - wiredTigerRollbackToStableAndGetMetadata(opCtx.get(), tempWTDirectory.string()); - - // TODO SERVER-63122: Remove the try-catch block once logical cloning is removed for - // shard merge protocol. - try { - shard_merge_utils::wiredTigerImportFromBackupCursor( - opCtx.get(), metadatas, tempWTDirectory.string()); - } catch (const ExceptionFor<ErrorCodes::NamespaceExists>& ex) { - LOGV2_WARNING( - 6113314, "Temporarily ignoring the error", "error"_attr = ex.toStatus()); - } - - return SemiFuture<void>::makeReady(); - }) - .semi(); -} - SemiFuture<void> TenantMigrationRecipientService::Instance::_getDataConsistentFuture() { stdx::lock_guard lk(_mutex); // PrimaryOnlyService::onStepUp() before starting instance makes sure that the state doc @@ -2394,7 +2348,7 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( // Any FCV changes after this point will abort this migration. // - // The 'AsyncTry' is run on the cleanup executor as opposed to the scoped executor as we rely + // The 'AsyncTry' is run on the cleanup executor as opposed to the scoped executor as we rely // on the 'PrimaryService' to interrupt the operation contexts based on thread pool and not the // executor. return AsyncTry([this, self = shared_from_this(), executor, token, cancelWhenDurable] { @@ -2558,16 +2512,6 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( return SemiFuture<void>::makeReady().thenRunOn(**_scopedExecutor); } - // Before we tell all nodes what files to copy, prepare to receive their - // voteCommitMigrationProgress commands telling the primary when they finish. - { - stdx::lock_guard lk(_mutex); - _copiedFilesFuture = - TenantMigrationRecipientCoordinator::get(_serviceContext) - ->beginAwaitingVotesForStep( - getMigrationUUID(), MigrationProgressStepEnum::kCopiedFiles); - } - return AsyncTry([this, self = shared_from_this(), token] { return _getDonorFilenames(token); }) @@ -2605,34 +2549,8 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( _donorFilenameBackupCursorId, _donorFilenameBackupCursorNamespaceString); }) - .then([this, self = shared_from_this(), token] { - if (_stateDoc.getProtocol() != MigrationProtocolEnum::kShardMerge) { - return SemiFuture<void>::makeReady(); - } - - stdx::lock_guard lk(_mutex); - _stateDoc.setState(TenantMigrationRecipientStateEnum::kLearnedFilenames); - return _updateStateDocForMajority(lk); - }) .then([this, self = shared_from_this()] { - if (_stateDoc.getProtocol() != MigrationProtocolEnum::kShardMerge) { - return SemiFuture<void>::makeReady(); - } - - LOGV2_INFO(6113402, - "Waiting for all nodes to call voteCommitMigrationProgress with " - "step 'copied files'"); - return std::move(_copiedFilesFuture).semi(); - }) - .then([this, self = shared_from_this()] { return _killBackupCursor(); }) - .then([this, self = shared_from_this()] { - stdx::lock_guard lk(_mutex); - if (_stateDoc.getProtocol() == MigrationProtocolEnum::kShardMerge) { - _stateDoc.setState(TenantMigrationRecipientStateEnum::kCopiedFiles); - return _updateStateDocForMajority(lk); - } - - return SemiFuture<void>::makeReady(); + return _advanceStableTimestampToStartApplyingDonorOpTime(); }) .then([this, self = shared_from_this()] { stdx::lock_guard lk(_mutex); @@ -2730,10 +2648,42 @@ SemiFuture<void> TenantMigrationRecipientService::Instance::run( return clonerFuture; }) .then([this, self = shared_from_this()] { return _onCloneSuccess(); }) + .then([this, self = shared_from_this(), token] { + if (_stateDoc.getProtocol() != MigrationProtocolEnum::kShardMerge) { + return SemiFuture<void>::makeReady(); + } + + stdx::lock_guard lk(_mutex); + // Before we tell all nodes what files to copy, prepare to receive their + // voteCommitMigrationProgress commands telling the primary when they finish. + _importedFilesFuture = + TenantMigrationRecipientCoordinator::get(_serviceContext) + ->beginAwaitingVotesForStep( + getMigrationUUID(), MigrationProgressStepEnum::kImportedFiles); + + _stateDoc.setState(TenantMigrationRecipientStateEnum::kLearnedFilenames); + return _updateStateDocForMajority(lk); + }) .then([this, self = shared_from_this()] { - return _advanceStableTimestampToStartApplyingDonorOpTime(); + if (_stateDoc.getProtocol() != MigrationProtocolEnum::kShardMerge) { + return SemiFuture<void>::makeReady(); + } + + LOGV2_INFO(6113402, + "Waiting for all nodes to call voteCommitMigrationProgress with " + "step 'imported files'"); + return std::move(_importedFilesFuture).semi(); + }) + .then([this, self = shared_from_this()] { return _killBackupCursor(); }) + .then([this, self = shared_from_this()] { + stdx::lock_guard lk(_mutex); + if (_stateDoc.getProtocol() == MigrationProtocolEnum::kShardMerge) { + _stateDoc.setState(TenantMigrationRecipientStateEnum::kCopiedFiles); + return _updateStateDocForMajority(lk); + } + + return SemiFuture<void>::makeReady(); }) - .then([this, self = shared_from_this()] { return _importCopiedFiles(); }) .then([this, self = shared_from_this()] { { auto opCtx = cc().makeOperationContext(); diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.h b/src/mongo/db/repl/tenant_migration_recipient_service.h index 5f3898e8b0b..a31506e20a4 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.h +++ b/src/mongo/db/repl/tenant_migration_recipient_service.h @@ -466,11 +466,6 @@ public: SemiFuture<void> _onCloneSuccess(); /* - * For protocol "shard merge", import donor data files. - */ - SemiFuture<void> _importCopiedFiles(); - - /* * Returns a future that will be fulfilled when the tenant migration reaches consistent * state. */ @@ -595,8 +590,6 @@ public: // Promise that is resolved Signaled when the instance has started tenant database cloner // and tenant oplog fetcher. SharedPromise<void> _dataSyncStartedPromise; // (W) - // Future that is resolved when all recipient nodes have copied all donor files. - SharedSemiFuture<void> _copiedFilesFuture; // (W) // Future that is resolved when all recipient nodes have imported all donor files. SharedSemiFuture<void> _importedFilesFuture; // (W) // Promise that is resolved Signaled when the tenant data sync has reached consistent point. diff --git a/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp b/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp index 5c39e53af19..700e0d34916 100644 --- a/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp +++ b/src/mongo/db/repl/tenant_migration_shard_merge_util.cpp @@ -59,10 +59,16 @@ constexpr int kBackupCursorKeepAliveIntervalMillis = mongo::kCursorTimeoutMillis namespace mongo::repl::shard_merge_utils { namespace { +MONGO_FAIL_POINT_DEFINE(skipDeleteTempDBPath); using namespace fmt::literals; void moveFile(const std::string& src, const std::string& dst) { LOGV2_DEBUG(6114304, 1, "Moving file", "src"_attr = src, "dst"_attr = dst); + + tassert(6114401, + "Destination file '{}' already exists"_format(dst), + !boost::filesystem::exists(dst)); + // Boost filesystem functions clear "ec" on success. boost::system::error_code ec; boost::filesystem::rename(src, dst, ec); @@ -122,75 +128,6 @@ Status connect(const HostAndPort& source, DBClientConnection* client) { return replAuthenticate(client).withContext(str::stream() << "Failed to authenticate to " << source); } -} // namespace - -void cloneFile(OperationContext* opCtx, const BSONObj& metadataDoc) { - std::unique_ptr<DBClientConnection> client; - std::unique_ptr<TenantMigrationSharedData> sharedData; - auto writerPool = - makeReplWriterPool(tenantApplierThreadCount, "TenantMigrationFileClonerWriter"_sd); - - ON_BLOCK_EXIT([&] { - client->shutdownAndDisallowReconnect(); - - writerPool->shutdown(); - writerPool->join(); - }); - - auto fileName = metadataDoc["filename"].str(); - auto migrationId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kMigrationIdFieldName]))); - LOGV2_DEBUG(6113320, - 1, - "Cloning file", - "migrationId"_attr = migrationId, - "metadata"_attr = metadataDoc); - auto backupId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kBackupIdFieldName]))); - auto remoteDbpath = metadataDoc["remoteDbpath"].str(); - size_t fileSize = metadataDoc["fileSize"].safeNumberLong(); - auto relativePath = _getPathRelativeTo(fileName, metadataDoc[kDonorDbPathFieldName].str()); - invariant(!relativePath.empty()); - - // Connect the client. - if (!client) { - auto donor = HostAndPort::parseThrowing(metadataDoc[kDonorFieldName].str()); - client = std::make_unique<DBClientConnection>(true /* autoReconnect */); - uassertStatusOK(connect(donor, client.get())); - } - - if (!sharedData) { - sharedData = std::make_unique<TenantMigrationSharedData>( - getGlobalServiceContext()->getFastClockSource(), migrationId); - } - - auto currentBackupFileCloner = - std::make_unique<TenantFileCloner>(backupId, - migrationId, - fileName, - fileSize, - relativePath, - sharedData.get(), - client->getServerHostAndPort(), - client.get(), - repl::StorageInterface::get(cc().getServiceContext()), - writerPool.get()); - - auto cloneStatus = currentBackupFileCloner->run(); - if (!cloneStatus.isOK()) { - LOGV2_WARNING(6113321, - "Failed to clone file ", - "migrationId"_attr = migrationId, - "fileName"_attr = fileName, - "error"_attr = cloneStatus); - } else { - LOGV2_DEBUG(6113322, - 1, - "Cloned file", - "migrationId"_attr = migrationId, - "fileName"_attr = fileName); - } - - uassertStatusOK(cloneStatus); -} void wiredTigerImportFromBackupCursor(OperationContext* opCtx, const std::vector<CollectionImportMetadata>& metadatas, @@ -199,12 +136,26 @@ void wiredTigerImportFromBackupCursor(OperationContext* opCtx, /* * Move one collection file and one or more index files from temp dir to dbpath. */ - moveFile(constructSourcePath(importPath, collectionMetadata.importArgs.ident), - constructDestinationPath(collectionMetadata.importArgs.ident)); + auto collFileSourcePath = + constructSourcePath(importPath, collectionMetadata.importArgs.ident); + auto collFileDestPath = constructDestinationPath(collectionMetadata.importArgs.ident); + + moveFile(collFileSourcePath, collFileDestPath); + + ScopeGuard revertCollFileMove([&] { moveFile(collFileDestPath, collFileSourcePath); }); + + auto indexPaths = std::vector<std::tuple<std::string, std::string>>(); + ScopeGuard revertIndexFileMove([&] { + for (const auto& pathTuple : indexPaths) { + moveFile(std::get<1>(pathTuple), std::get<0>(pathTuple)); + } + }); for (auto&& indexImportArgs : collectionMetadata.indexes) { - moveFile(constructSourcePath(importPath, indexImportArgs.ident), - constructDestinationPath(indexImportArgs.ident)); + auto indexFileSourcePath = constructSourcePath(importPath, indexImportArgs.ident); + auto indexFileDestPath = constructDestinationPath(indexImportArgs.ident); + moveFile(indexFileSourcePath, indexFileDestPath); + indexPaths.push_back(std::tuple(indexFileSourcePath, indexFileDestPath)); } /* @@ -222,7 +173,9 @@ void wiredTigerImportFromBackupCursor(OperationContext* opCtx, AutoGetDb autoDb(opCtx, nss.db(), MODE_IX); Lock::CollectionLock collLock(opCtx, nss, MODE_X); auto catalog = CollectionCatalog::get(opCtx); - WriteUnitOfWork wunit(opCtx); + // TODO SERVER-63789 Uncomment WriteUnitOfWork declaration below when we + // make file import async. + // WriteUnitOfWork wunit(opCtx); AutoStatsTracker statsTracker(opCtx, nss, Top::LockType::NotLocked, @@ -264,15 +217,89 @@ void wiredTigerImportFromBackupCursor(OperationContext* opCtx, makeCountsChange(ownedCollection->getRecordStore(), collectionMetadata)); UncommittedCollections::addToTxn(opCtx, std::move(ownedCollection)); - wunit.commit(); + // TODO SERVER-63789 Uncomment wunit.commit() call below when we + // make file copy/import async. + // wunit.commit(); LOGV2(6114300, "Imported donor collection", "ns"_attr = nss, "numRecordsApprox"_attr = collectionMetadata.numRecords, "dataSizeApprox"_attr = collectionMetadata.dataSize); }); + + revertCollFileMove.dismiss(); + revertIndexFileMove.dismiss(); } } +} // namespace + +void cloneFile(OperationContext* opCtx, const BSONObj& metadataDoc) { + std::unique_ptr<DBClientConnection> client; + std::unique_ptr<TenantMigrationSharedData> sharedData; + auto writerPool = + makeReplWriterPool(tenantApplierThreadCount, "TenantMigrationFileClonerWriter"_sd); + + ON_BLOCK_EXIT([&] { + client->shutdownAndDisallowReconnect(); + + writerPool->shutdown(); + writerPool->join(); + }); + + auto fileName = metadataDoc["filename"].str(); + auto migrationId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kMigrationIdFieldName]))); + LOGV2_DEBUG(6113320, + 1, + "Cloning file", + "migrationId"_attr = migrationId, + "metadata"_attr = metadataDoc); + auto backupId = UUID(uassertStatusOK(UUID::parse(metadataDoc[kBackupIdFieldName]))); + auto remoteDbpath = metadataDoc["remoteDbpath"].str(); + size_t fileSize = metadataDoc["fileSize"].safeNumberLong(); + auto relativePath = _getPathRelativeTo(fileName, metadataDoc[kDonorDbPathFieldName].str()); + invariant(!relativePath.empty()); + + // Connect the client. + if (!client) { + auto donor = HostAndPort::parseThrowing(metadataDoc[kDonorFieldName].str()); + client = std::make_unique<DBClientConnection>(true /* autoReconnect */); + uassertStatusOK(connect(donor, client.get())); + } + + if (!sharedData) { + sharedData = std::make_unique<TenantMigrationSharedData>( + getGlobalServiceContext()->getFastClockSource(), migrationId); + } + + auto currentBackupFileCloner = + std::make_unique<TenantFileCloner>(backupId, + migrationId, + fileName, + fileSize, + relativePath, + sharedData.get(), + client->getServerHostAndPort(), + client.get(), + repl::StorageInterface::get(cc().getServiceContext()), + writerPool.get()); + + auto cloneStatus = currentBackupFileCloner->run(); + if (!cloneStatus.isOK()) { + LOGV2_WARNING(6113321, + "Failed to clone file ", + "migrationId"_attr = migrationId, + "fileName"_attr = fileName, + "error"_attr = cloneStatus); + } else { + LOGV2_DEBUG(6113322, + 1, + "Cloned file", + "migrationId"_attr = migrationId, + "fileName"_attr = fileName); + } + + uassertStatusOK(cloneStatus); +} SemiFuture<void> keepBackupCursorAlive(CancellationSource cancellationSource, std::shared_ptr<executor::TaskExecutor> executor, @@ -296,4 +323,40 @@ SemiFuture<void> keepBackupCursorAlive(CancellationSource cancellationSource, .onCompletion([](auto&&) {}) .semi(); } + +void importCopiedFiles(OperationContext* opCtx, UUID migrationId) { + auto tempWTDirectory = fileClonerTempDir(migrationId); + uassert(6113315, + str::stream() << "Missing file cloner's temporary dbpath directory: " + << tempWTDirectory.string(), + boost::filesystem::exists(tempWTDirectory)); + + // TODO SERVER-63204: Evaluate correct place to remove the temporary + // WT dbpath. + ON_BLOCK_EXIT([&tempWTDirectory, &migrationId] { + // TODO SERVER-63789: Delete skipDeleteTempDBPath failpoint + if (MONGO_unlikely(skipDeleteTempDBPath.shouldFail())) { + LOGV2(6114402, + "skipDeleteTempDBPath failpoint enabled, skipping temp directory cleanup."); + return; + } + LOGV2_DEBUG(6113324, + 1, + "Done importing files, removing the temporary WT dbpath", + "migrationId"_attr = migrationId, + "tempDbPath"_attr = tempWTDirectory.string()); + boost::system::error_code ec; + boost::filesystem::remove_all(tempWTDirectory, ec); + }); + + auto metadatas = wiredTigerRollbackToStableAndGetMetadata(opCtx, tempWTDirectory.string()); + + // TODO SERVER-63122: Remove the try-catch block once logical cloning is removed for + // shard merge protocol. + try { + wiredTigerImportFromBackupCursor(opCtx, metadatas, tempWTDirectory.string()); + } catch (const ExceptionFor<ErrorCodes::NamespaceExists>& ex) { + LOGV2_WARNING(6113314, "Temporarily ignoring the error", "error"_attr = ex.toStatus()); + } +} } // namespace mongo::repl::shard_merge_utils diff --git a/src/mongo/db/repl/tenant_migration_shard_merge_util.h b/src/mongo/db/repl/tenant_migration_shard_merge_util.h index 217e601b70a..1c51325bb72 100644 --- a/src/mongo/db/repl/tenant_migration_shard_merge_util.h +++ b/src/mongo/db/repl/tenant_migration_shard_merge_util.h @@ -39,7 +39,6 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/oplog.h" -#include "mongo/db/storage/wiredtiger/wiredtiger_import.h" #include "mongo/executor/scoped_task_executor.h" #include "mongo/util/cancellation.h" @@ -107,12 +106,7 @@ struct MetadataInfo { */ void cloneFile(OperationContext* opCtx, const BSONObj& metadataDoc); -/** - * After calling wiredTigerRollbackToStableAndGetMetadata, use this function to import files. - */ -void wiredTigerImportFromBackupCursor(OperationContext* opCtx, - const std::vector<CollectionImportMetadata>& metadatas, - const std::string& importPath); +void importCopiedFiles(OperationContext* opCtx, UUID uuid); SemiFuture<void> keepBackupCursorAlive(CancellationSource cancellationSource, std::shared_ptr<executor::TaskExecutor> executor, |